The world is drowning in data.
Every day, we generate more data than ever before. From our smartphones to our cars to our homes, data is being collected at an unprecedented rate. This data can be a valuable resource for businesses, governments & individuals. However, it can also be overwhelming and difficult to manage.
That’s where ELT pipelines come in.
This process of moving data from one system to another is called an Extract, Load, and Transform (ELT) pipeline. ELT pipelines are used to move data from operational databases to data warehouses, data lakes, or other analytics systems. In this article, we will learn as well as “build an ELT pipeline using Python and Snowflake“.
Snowflake is a cloud-based data warehousing platform that allows for scalable, secure, and efficient storage and analysis of large amounts of data. Python is a famous programming language used for various data processing and analysis tasks.
Prerequisites
To follow along with this tutorial, you will need:
- A Snowflake account
- Python 3 installed on your machine
- A basic understanding of SQL
- A basic understanding of Python programming language
- Basic knowledge of the Extract, Load & Transform (ELT) pipeline.
Create a Snowflake Account and Configure it via Python
To get started, create a Snowflake account and set up a database and schema. Once you have done that, you can connect to Snowflake using Python.
Snowflake provides a Python library called Snowflake Connector for Python. Install this library using pip:
!pip install snowflake-connector-python
The snowflake-connector-python
is a Python package that provides a connection to the Snowflake cloud data platform. It allows to interact with Snowflake from your Python applications, enables to extract, transform & load data into and out of Snowflake
Next, create a Snowflake connection object in Python:
import snowflake.connector
conn = snowflake.connector.connect(
user='<your_username>',
password='<your_password>',
account='<your_account_name>'
)
Replace <your_username>
, <your_password>
, and <your_account_name>
with your Snowflake account details.
Extract Data from a Source System
The first step in an ELT pipeline is to extract data from a source system. In this tutorial, we will extract data from a PostgreSQL database. To extract data from a PostgreSQL database, it is mandatory to have psycopg2
library on your system. If you don’t have this library installed use !pip install psycopg2
command to install.
Next, create a PostgreSQL connection object in Python:
import psycopg2
conn_pg = psycopg2.connect(
host="<host>",
database="<database>",
user="<user>",
password="<password>"
)
Replace <host>
, <database>
, <user>
, and <password>
with your PostgreSQL connection details. The above filed most of the times are default values in the PostgreSQL database such as default hostname myhost
and supper user is postgres
.
Once you have established a successful connection to the PostgreSQL database, you can extract data from it using a SQL query:
import pandas as pd
query = "SELECT * FROM <table_name>"
df = pd.read_sql_query(query, conn_pg)
Substitute <table_name>
with the name of the table, you want to extract data from. Besides, the above query also demands upon the business or program logic how and from where you have to extract data.
Load Data into Snowflake
Once you have extracted data from the source system, the next step is to load it into Snowflake. To load data into Snowflake, you will need to create a Snowflake table and copy the data into it.
To create a Snowflake table, use the following SQL command:
cursor = conn.cursor()
create_table_query = '''
CREATE TABLE <table_name> (
<column_1_name> <column_1_data_type>,
<column_2_name> <column_2_data_type>,
...
)
'''
cursor.execute(create_table_query)
Substitute with the name you want to give to the new Snowflake table. Interchange <column_1_name>
, <column_1_data_type>
, <column_2_name>
, <column_2_data_type>
& so on with the names and data types of the columns in your data.
Once you have created the table, you can load data into it using the COPY command:
copy_command = '''
COPY INTO <table_name>
FROM @<stage_name>/<file_name>
FILE_FORMAT = (TYPE = 'CSV', SKIP_HEADER = 1);
'''
cursor.execute(copy_command)
Substitute <table_name>
with the name of the Snowflake table created in the previous step. Replace <stage_name>
with the name of the Snowflake stage we will use to load data into the table. Replace <file_name>
with the name of the file containing the data that is going to be loaded. Now, it’s time to transform data.
Transform Data
After data is loaded into Snowflake, the next step is to transform it. In this step, you can clean the data, join it with other datasets, calculate new columns, and so on. In this tutorial, we have performed a simple data transformation: we will convert a column containing dates from a string format to a date format.
transform_query = '''
ALTER TABLE <table_name>
MODIFY COLUMN <date_column_name> DATE;
'''
cursor.execute(transform_query)
Schedule the ELT Pipeline
Finally, to make the ELT pipeline run automatically, you can schedule it using a tool like Apache Airflow. Apache Airflow is a platform for programmatically authoring, scheduling, and monitoring workflows. To schedule the ELT pipeline using Apache Airflow, you will need to create a DAG (Directed Acyclic Graph) that defines the steps of the pipeline and the schedule on which it will run.
from datetime import timedelta
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
default_args = {
'owner': 'you',
'depends_on_past': False,
'start_date': datetime(2023, 4, 29),
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
dag = DAG(
'snowflake_elt_pipeline',
default_args=default_args,
description='ELT pipeline for loading data into Snowflake',
schedule_interval=timedelta(days=1),
)
def extract_data():
# code to extract data from PostgreSQL
return df
def load_data():
# code to load data into Snowflake
return None
def transform_data():
# code to transform data in Snowflake
return None
extract_task = PythonOperator(
task_id='extract_data',
python_callable=extract_data,
dag=dag,
)
load_task = PythonOperator(
task_id='load_data',
python_callable=load_data,
dag=dag,
)
transform_task = PythonOperator(
task_id='transform_data',
python_callable=transform_data,
dag=dag,
)
extract_task >> load_task >> transform_task
In this DAG (Directed Acyclic Graph), we define three tasks: extract_task, load_task
, and transform_task
. These tasks call the Python functions we defined earlier to extract, load, and transform data. We also define the dependencies between the tasks using the >>
operator. The DAG runs once a day at the specified time (in this case, at the start of the day).
The Ending Lines: Building an ELT Pipeline using Python and Snowflake
In this data engineering tutorial, we learned how to build an ELT pipeline using Python and Snowflake. We extracted data from a PostgreSQL database, loaded it into Snowflake, transformed it, and scheduled the pipeline to run automatically using Apache Airflow. By using these tools and techniques, it is also possible to build powerful and scalable data pipelines to support businesses’ needs.