ETL Pipelines in Python: Best Practices and Techniques
When building a new Etl Pipeline, it's crucial to consider three key requirements: Generalizability, Scalability, and Maintainability. These pillars play a vital role in the effectiveness and longevity of your data workflows. However, the challenge often lies in finding the right balance among them – sometimes, enhancing one aspect can come at the expense of another. For instance, prioritizing generalizability might lead to reduced maintainability, impacting the overall efficiency of your architecture.
In this blog, we'll delve into the intricacies of these three concepts, exploring how to optimize your ETL pipelines effectively. I'll share practical tools and techniques that can help you enhance the generalizability, scalability, and maintainability of your workflows. Additionally, we'll examine real-world use cases to categorize different scenarios and clearly define the ETL requirements needed to meet your organization's specific needs.
Generalizability
In the context of ETL, generalizability refers to the ability of the pipeline to handle changes in the input data without extensive reconfiguration. This is a highly desirable property because its inherent flexibility can save you significant time in your work. This flexibility is especially beneficial when many data stakeholders are involved in providing input data. The more people involved, the greater the likelihood of changes in the incoming data sources.
Furthermore, if you want to reuse your pipeline for different projects, generalizability allows for quick adaptation to meet the specific requirements of each new project. This capability not only enhances efficiency but also ensures that your ETL processes remain robust and responsive to evolving data needs.
To gain a more concrete understanding of what generalizability actually means, I would like to provide you with a simple example of a small pipeline that is absolutely not generalizable. This example involves sales data for a product that the senior leadership team is interested in reviewing every year.
Python">sales = pd.DataFrame({
'Year': [2019, 2020, 2021, 2022, 2023],
'Sales': [30000, 25000, 35000, 40000, 45000]
})
The leadership team always discusses the sales data for the respective year at the end of each year. At the end of 2023, they ask you to provide the numbers for that year. A non-generalizable approach to delivering this data would involve a simple filtering solution, where you would filter the dataset to only include records for the year 2023.
sales[sales['Year'] == 2023]
This pipeline is not generalizable at all. Next year, you would need to run the code again and manually change the year from 2023 to 2024. For our use case, it would be more generalizable if we constructed our pipeline to automatically retrieve the sales data for the highest available year.
sales[sales['Year'] == sales['Year'].max()]
By making a single adaptation, you can avoid having to go into the code every year to change the year number. This adjustment allows the pipeline to automatically select the sales data for the most recent available year.
Of course, in this example, it's not a significant change. However, in reality, pipelines can be much larger, and adjusting a number or a string in 30 different places can become quite time-consuming. This situation becomes even more complicated when changes are unexpected, such as when columns are renamed in the system or the data type of a column changes. Thus, there are several aspects to consider for generalizability.
But what can we do to make our pipelines more generalizable? Unfortunately, there isn't a one-size-fits-all recipe for enhancing generalizability in our workflows. Each use case is unique, and therefore, solutions must be tailored accordingly. However, there are some tips and tricks that can be applied to various problems.
- Avoid Hardcoding: Hardcoding involves entering specific values directly into your expressions, as we did in our sales example. Instead, aim to use dynamic expressions that can adapt to changes in the data.
- Use Mapping Tables: If you anticipate that column headers may vary, consider creating a dictionary or a mapping table to handle potential variations. This allows you to rename columns dynamically based on the names that could occur in the dataset.
# Mapping table (dictionary) for renaming
column_mapping = {
'SALES': 'Sales',
'Revenue': 'Sales',
'Sales_2023': 'Sales'
}
# Rename columns using the mapping table
df_renamed = df.rename(columns=column_mapping)
- Utilize Regex: Similar to using mapping tables, regex can be employed to rename column headers dynamically. The advantage of the regex approach is that you don't necessarily need to know how the headers will change. Instead, you can define a pattern that captures potential variations and rename all relevant columns to your desired name accordingly.
import pandas as pd
import re
# Regex pattern for possible variations of "Sales"
pattern = r"(?i)(.*sales.*|.*verkäufe.*)" # Case-insensitive, also match "Verkäufe" in German
# Find columns matching the pattern
sales_column = [col for col in df.columns if re.match(pattern, col)]
# Rename the matching columns
for col in sales_column:
df.rename(columns={col: 'Sales'}, inplace=True)
- Force Data Types: ETL pipelines can encounter issues when a column expected to be numerical is changed to a string. This can lead to failures in certain aggregation functions or mathematical operations. To mitigate this, Pandas provides useful functions to enforce specific data types. Any values that cannot be converted will be replaced with NaN. Additionally, you can define the data type when loading the data by using the dtype parameter.
# Convert to numeric, forcing invalid values to NaN
df['col2'] = pd.to_numeric(df['col2'], errors='coerce')
# Convert to datetime, forcing invalid dates to NaT
df['date_col'] = pd.to_datetime(df['date_col'], errors='coerce')
# Specify data types while reading a CSV file
df = pd.read_csv('data.csv', dtype={'col1': int, 'col2': float, 'col3': str})
- Coverage of Different Scenarios: If you have a clear understanding of which parts of your pipeline need to be more generalizable, you can implement if-else logic to handle various inputs. While this can enhance generalizability, it often comes at the cost of scalability and maintainability.
- Use Configuration Files: Store pipeline configurations – such as input/output paths, column names, filters, and other parameters – in external YAML, JSON, or TOML files. This approach separates the logic from the configuration, making it easier to adjust parameters for different environments or datasets without modifying the code.
import yaml
with open('config.yaml') as file:
config = yaml.safe_load(file)
Scalability
Scalability refers to the capacity of an ETL pipeline to handle increasing volumes of data or a growing number of data sources. Non-scalable ETL pipelines encounter challenges when processing large datasets, resulting in higher computing costs and longer processing times. This issue is particularly critical in scenarios where significant data growth is anticipated in the future.
What can we do to prevent this?
- Filter as Early as Possible: Apply filters at the earliest stages of the pipeline to minimize the volume of data processed in subsequent steps.
- Avoid Unnecessary Transformations: Steer clear of unnecessary transformations, such as sorting, which can add overhead and slow down processing.
- Use Incremental Loads: Reloading an entire dataset each time can be extremely inefficient and costly as data volumes grow. Instead, focus on transforming only the new or changed data.
- Optimize Data Formats: Choose efficient data formats for loading and output. For columnar data, consider using Parquet files, which optimize storage and processing time. For semi-structured data, use Avro or JSON with compression.
- Minimize Joins: Joins involving large datasets can be costly in terms of performance. Consider denormalizing tables to reduce the need for complex joins.
- Monitoring: Implement monitoring to estimate the time required for the ETL process with different data volumes. By measuring the duration for executing the ETL process with various row counts, you can better understand performance. Consider encapsulating your ETL process in a function for easier measurement.
To illustrate how we can monitor the data, I will use example data that I previously featured in my blog about Efficient Testing of ETL Pipelines with Python. In this section, I won't delve deeply into the details of this example. If you're interested in a more comprehensive explanation, please refer to the other blog.
We will begin by copying our ETL pipeline and encapsulating it within a function:
import pandas as pd
def run_etl_pipeline(order_df, customer_df):
# 1.2 Adjust column (convert 'order_date' to datetime)
df1_change_datatype_orderdate = order_df.assign(order_date=pd.to_datetime(order_df['order_date']))
# 1.3 Add new column 'year'
df1_add_column_year = df1_change_datatype_orderdate.assign(year=lambda x: x['order_date'].dt.year)
# 1.4 Filter year 2023
df1_filtered_year = df1_add_column_year[df1_add_column_year['year'] == 2023]
# 1.5 Aggregate data
df1_aggregated = df1_filtered_year.groupby(['customer_id']).agg(
total_price=('total_price', 'sum'),
unique_order=('order_id', 'nunique')
).reset_index()
# 1.6 Merge the aggregated data with customer data
merged_df1_df2 = pd.merge(df1_aggregated, customer_df, left_on='customer_id', right_on='id')
return merged_df1_df2
Please note that we won't be reading in the data within the function. Instead, the input DataFrames will be provided as parameters.
# Load data externally
import time
import pandas as pd
order_df = pd.read_csv(r"order_table.csv")
customer_df = pd.read_csv(r"customer_table.csv")
sample_size=[]
duration_list =[]
for i in range(0,1000001,10000):
start_time=time.time()
order_df_sample = order_df.sample(n=i,replace=True)
run_etl_pipeline(order_df_sample, customer_df)
end_time = time.time()
duration = end_time - start_time
sample_size+=[i]
duration_list+=[duration]
scalability_tracker = pd.DataFrame({'Sample':sample_size,'Duration':duration_list})
Next, we will loop from 10,000 to 1,000,000 steps through one of the DataFrames. During this process, we will randomly select 10,000 rows from the DataFrame, allowing for the possibility of selecting the same rows multiple times. We will measure the time taken before execution and after execution using our simulated data. In the end, this will provide us with a scalability tracker that we can also visualize.

In this example, transforming 1 million rows in the order table takes approximately 0.8 seconds. Even more interestingly, we observe a linear relationship in the transformation times. This allows us to easily estimate the time required for the data we expect to receive in the upcoming month, enabling us to proactively prevent any data delivery delays.
Maintainability
Maintainability refers to how easily an ETL pipeline can be updated, modified, or fixed over time. In a rapidly changing world, where data volumes are increasing and data teams are expanding, having maintainable workflows is crucial. Such workflows enable new team members to understand the process more easily and help maintain an overall overview of the pipeline. Additionally, maintainable workflows are much faster to fix when it comes to error handling and are easier to update as requirements evolve.
- Documentation: Write clear and concise documentation for your ETL pipeline. This should include instructions on how to run the pipeline, configurations, dependencies, expected inputs/outputs, and common troubleshooting steps.
- Testing: Implementing tests can save you significant time in finding and fixing errors. There are various testing strategies you can use. I highly recommend checking out my blog on Efficient Testing of ETL Pipelines with Python – you can thank me later!