Efficient Testing of ETL Pipelines with Python
In today's Data-driven world, organizations rely heavily on accurate data to make critical business decisions. As a responsible and trustworthy Data Engineer, ensuring data quality is paramount. Even a brief period of displaying incorrect data on a dashboard can lead to the rapid spread of misinformation throughout the entire organization, much like a highly infectious virus spreads through a living organism.
But how can we prevent this? Ideally, we would avoid data quality issues altogether. However, the sad truth is that it's impossible to completely prevent them. Still, there are two key actions we can take to mitigate the impact.
- Be the first to know when a data quality issue arises
- Minimize the time required to fix the issue
In this blog, I'll show you how to implement the second point directly in your code. I will create a data pipeline in Python using generated data from Mockaroo and leverage Tableau to quickly identify the cause of any failures. If you're looking for an alternative testing framework, check out my article on An Introduction into Great Expectations with python.
All files, code, the Tableau dashboard, as well as the data files used in this blog, you can access on my GitHub account.
Localizing the Issue
Let's start by addressing how we can minimize the time spent fixing the issue. Imagine we already have a data pipeline running, and we receive a notification that the output data is incorrect. The question is: how do we resolve the problem as quickly as possible?
The first step is always to ask: Where? Where did the problem first occur? To minimize the time spent fixing the data quality issue, we need a way to quickly identify where something went wrong. Think of it like having a map that immediately points out, "Here's where the issue is." A useful way to visualize this could be a color-coded flowchart, which highlights problem areas at a glance.

In this example, we have a simple ETL process where we read the data, filter it to relevant columns and rows, change the data types of specific columns, and finally write the data to a target source. Each rectangle represents a part of our workflow, where we've implemented checks to verify if certain conditions are met after each step. While white rectangles indicate that the data is flowing as expected, red rectangles signal that something unexpected occurred at that point.
For our question of where the issue arises, this visualization is quite helpful. The earliest validation step that indicates a problem is the data type change step. For instance, if a numerical column fails to convert from a string to an integer, it may lead to a situation where the numerical column, intended for display in a dashboard via a line chart and for aggregation, becomes non-summable because it is loaded as a string.
Not only can we immediately localize the issue, but we can also define the number of potential causes for the unexpected result. By extending our initial ETL pipeline, we can visualize the increasing complexity and the value it brings through the flowchart.

We are facing quality problems with our loaded data once again. However, this time the issue lies not only in the data type change but also in the reading of Data 3. It's possible that no rows were loaded or that the columns differ from those in previous loads.
We can see that such a visualization for monitoring data quality can save time in identifying and resolving issues. But how can we generate such a chart from our pipeline? To answer this question, let's look at a practical example.
How can we achieve this?
Before we dive deeper into a specific use case, let's consider which information we need to construct the visualization. Ultimately, we aim to connect Tableau to a table to create this type of chart.
Since we want to track test results for each step, we need to have a column for each step's name. We can achieve this by using the names of the variables where we store our DataFrames.
We also need a column for the test results, which can be binary and will be used for coloring the respective squares.
Theoretically, we also need data about the coordinates for placing the squares. This ensures that, for example, Read Data 1 is positioned in the upper left corner, with the subsequent steps to its right. We will assign an x and y value to each variable.
I have already prepared a Python code snippet that you can reuse for all your pipelines to obtain the x and y values, as well as the variable names. The code takes the source code of your Python file as input and extracts the variable names used to store your DataFrames. The only input you need to provide is the file path of your Python file. If you have any questions or need further clarification, please feel free to reach out.
For the test results, we will follow a specific schema consisting of the following steps:
- Define test functions: Each function represents a single test.
- Define a list of variables: This list specifies the variables for which the test results should be collected.
- Collect the test results.
In the second step, we simply create a list of Python DataFrames for which the tests will be executed. The idea is that certain tests should also be applicable to multiple types of steps.

Use Case
Imagine you are a Data Engineer for an online shop, and the Marketing team has a request. They would like to better understand their customers and examine the distribution of sales per age group for the entire year of 2023. Since they are unsure how they want to group the ages, they would greatly appreciate it if you could prepare the data so that sales figures are available for each individual age.
To deliver the results the Marketing team is looking for, we first need to check which data is actually available. Let's start with the order data.

Each order-product combination represents one row in the dataset. Since a customer can have several different products in their shopping basket, there may be duplicate rows for the same order_id, each associated with a different product_id. The customer_id allows us to identify who placed the order, while the order_date provides information about when the order was placed. In this example, the total_price column is the key column for our analysis.
Now that we have the information for the respective orders, what's missing is the information about the age, or more generally, details about the customers. We can obtain this information from the customer table.

Here, we have some information about the customer, specifically their age. Now we have everything we need to proceed and combine the information. Next, we just need to import our libraries and create our pipeline.
import pandas as pd
import ast
import json
import re
#Load Data 1
df1_read = pd.read_csv(r"...order_table.csv")
# Load Data 2
df2_read = pd.read_csv(r"...customer_table.csv")
# 1.2 Adjust column
df1_change_datatype_orderdate = df1_read.assign(order_date=pd.to_datetime(df1_read['order_date']))
#1.3 Add new column
df1_add_column_year = df1_change_datatype_orderdate.assign(year=lambda x: x['order_date'].dt.year)
#1.4 Filter year 2023
df1_filtered_year = df1_add_column_year[df1_add_column_year['year']==2023]
#1.5 Aggregate
df1_aggregated = df1_filtered_year.groupby(['customer_id']).agg(
total_price=('total_price', 'sum'),
unique_order=('order_id', 'nunique')
).reset_index()
# 1.6 merge
merged_df1_df2 = pd.merge(df1_aggregated,df2_read,left_on='customer_id',right_on='id')
After loading both DataFrames, we converted the order_date column to the datetime data type, extracted the year, and then filtered for orders placed in 2023. Next, we aggregated the total price per customer. Finally, we combined the customer information with the sales per customer data. Now, we can either deliver the data directly to the Marketing team or to a data consumer, such as a data analyst.
For the technical background, since we are extracting the variable names from the source code, we need to use a specific naming pattern to mark them. In this case, the pattern consists of "df" followed by the number of the DataFrame we are reading in, then an underscore, followed by the reported action (e.g., "add_new_column"), and ending with another underscore followed by a specification of the action. It is important to adhere to this pattern because it facilitates the extraction of variable names from the source code and helps maintain technical documentation automatically.
Now we simply need to follow our testing methodology. First, we will define our testing function. We will focus on a single function that returns 1 if a DataFrame contains rows and 0 if no rows are present in the DataFrame.
# Define Test Functions
def check_row_count(df):
return 1 if df.shape[0] > 0 else 0
In your case, you would implement more than one function. For example, you could include not only data test functions but also logical tests, such as unit tests.
In the second step, we define our list of variables that we want to test. Since each of our DataFrames should contain rows, we will apply our tests to all of them.
test_variable = [] # List to store the names of the tested variables
test_result = [] # List to store the results of the tests
# List of DataFrames to be tested
tested_dataframes = [
'df1_read',
'df1_change_datatype_orderdate',
'df1_add_column_year',
'df1_filtered_year',
'df1_aggregated',
'merged_df1_df2',
'df2_read'
]
Here, you should note two things. First, we created two lists to store the variables that we tested and their corresponding test results. Second, the names of the tested DataFrames are stored as strings. We need the strings instead of the DataFrames so that we can add the name of each DataFrame to our test_variable list. If we want to retrieve the resulting DataFrame, we can simply use the eval function, which executes the string as code.
Now we simply loop through our DataFrames, apply our function, and collect our test results.
# Iterate over each DataFrame name in the tested_dataframes list
for i in tested_dataframes:
# Append the result of the row count check for the current DataFrame to the test_result list
test_result += [check_row_count(eval(i))]
# Append the current DataFrame name to the test_variable list
test_variable += [i]
# Create a DataFrame to store the test results with variable names and their corresponding results
result_test_dataframe = pd.DataFrame({'Variable': test_variable, 'Test_Result': test_result})
Since we can have multiple tests for one step, we would like to aggregate the results using a minimum function. This way, if at least one test fails for a variable, the overall test result will be 0.
# Summarize the test results by taking the minimum result for each variable
result_test_dataframe_summarized = result_test_dataframe.groupby('Variable').agg({'Test_Result': 'min'}).reset_index()
Finally, we join our result DataFrame with another one called relationship_merged. I mentioned in the introduction that I would not focus on the code that returns the x and y coordinates of the variables. The relationship_merged DataFrame contains exactly that information.
# Merge the summarized test results with the relationship_merged DataFrame
endresult = pd.merge(relationship_merged, result_test_dataframe_summarized, on='Variable', how='left')
After writing out the result, we arrive at our final output, which looks like this:

We can identify that the test failed for three DataFrames. This indicates that there are no rows present after df1_filtered_year. While we could examine the input and output of df1_filtered_year, I can explain exactly what happened. When I filtered for the year 2023, it became apparent that our source data does not contain any data for that year. It's possible that df1_read, which is outputted by another process, never included 2023 in its file. Through this visualization, we can now detect data quality problems at a glance.
Conclusion
In the rapidly evolving landscape of Data Engineering, ensuring data quality is paramount. By implementing effective testing strategies within our ETL pipelines, we can proactively identify and resolve data quality issues before they impact decision-making. The visualization techniques discussed not only streamline the detection of anomalies but also enhance collaboration among data teams. As we continue to refine our processes and leverage tools like Python and Tableau, we empower ourselves to deliver reliable insights that drive business success.
Thank you so much for sticking around until the end! If you found my content helpful or at least mildly entertaining, I'd be over the moon if you could give me a clap or hit that follow button.
For those of you who love diving into code, you can find all the data and the magical scripts over on my GitHub profile. And if you're looking for a good way to implement unit tests in your Etl Code, I can recommend you this video by Jacqueline Bilston. She dives into how she uses PySpark to whip up unit tests in her ETL pipeline – it's worth a watch!