We Built an Open-Source Data Quality Testframework for PySpark

Author:Murphy  |  View: 26749  |  Time: 2025-03-23 11:49:37

Every data scientist knows the classic saying "garbage in, garbage out". Therefore it is essential to measure what the quality of your data is.

At Woonstad Rotterdam, a Dutch social housing association, we use PySpark in Databricks for our ETL. Data from our external software suppliers is loaded into our datalake using APIs. However, not every software supplier is testing on data quality. Consequences of faulty data in the social housing sector can be significant, ranging from tenants being unable to apply for allowances to rents being set at prices that are illegal according to the Affordable Rent Act. Therefore, we built a data quality testframework for PySpark DataFrames to be able to report about data quality to the suppliers and the users of the data.

Consequences of faulty data in the social housing sector can be significant, ranging from tenants being unable to apply for allowances to rents being set at prices that are illegal according to the Affordable Rent Act.

The goal of the testframework, in addition to reporting data quality, is to enable not only data engineers and data scientists but also colleagues with less coding experience to write tests, as they often possess more domain knowledge than programmers. At first, we focussed our tests on testing the data quality of our digital twin, a digital 3D representation of all our property.

Examples of tests are:

  • every house should be linked to a valid address.
  • every house should have at least one bathroom.
  • every bathroom should have at least a toilet, shower or bathtub.

The framework

The framework works as follows. First, select the PySpark DataFrame you want to test the data quality of; here, we'll use an imaginative data set related to houses.

Python">from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
from pyspark.sql import functions as F

# Initialize Spark session
spark = SparkSession.builder.appName("PySparkTestFrameworkTutorial").getOrCreate()

# Define the schema
schema = StructType(
    [
        StructField("primary_key", IntegerType(), True),
        StructField("street", StringType(), True),
        StructField("house_number", IntegerType(), True),
    ]
)

# Define the data
data = [
    (1, "Rochussenstraat", 27),
    (2, "Coolsingel", 31),
    (3, "%Witte de Withstraat", 27),
    (4, "Lijnbaan", -3),
    (5, None, 13),
]

df = spark.createDataFrame(data, schema)

df.show(truncate=False)

### output

+-----------+--------------------+------------+
|primary_key|street              |house_number|
+-----------+--------------------+------------+
|1          |Rochussenstraat     |27          |
|2          |Coolsingel          |31          |
|3          |%Witte de Withstraat|27          |
|4          |Lijnbaan            |-3          |
|5          |null                |13          |
+-----------+--------------------+------------+

We should make sure we have installed the testframework using pip: pip install pyspark-testframework.

Let's start by importing and initializing the DataFrameTester.

from testframework.dataquality import DataFrameTester

df_tester = DataFrameTester(
    df=df,
    primary_key="primary_key",
    spark=spark,
)

Configurable tests

We constructed configurable tests to make writing tests easy, even for people inexperienced with coding.

from testframework.dataquality.tests import ValidNumericRange, RegexTest

Let's configure a test to make sure the street names are valid, by allowing only alphabetic characters and spaces, with a minimal length of two alphabetic characters.

valid_street_format = RegexTest(
    name="ValidStreetFormat",
    pattern=r"^[A-Za-z]+(?: [A-Za-z]+)*$", # such regex is easily generated with A.I.
)

Now, let's add this test to the DataFrameTester

df_tester.test(
    col="street",
    test=valid_street_format,
    nullable=False,  # nullable is False, hence null values are converted to False
    description="Street has valid format.",
).show(truncate=False)

### output

+-----------+--------------------+-------------------------+
|primary_key|street              |street__ValidStreetFormat|
+-----------+--------------------+-------------------------+
|1          |Rochussenstraat     |true                     |
|2          |Coolsingel          |true                     |
|3          |%Witte de Withstraat|false                    |
|4          |Lijnbaan            |true                     |
|5          |null                |false                    |
+-----------+--------------------+-------------------------+

What we see are the test results. true means the value has passed the test, while false means the test has failed. Let's add one more configurable test to illustrate the power of DataFrameTester. Also, notice how we set return_failed_rows to True in the upcoming example to only see failed tests.

df_tester.test(
    col="house_number",
    test=ValidNumericRange(
        min_value=1,
    ),
    nullable=False,
    # description is optional, let's not define it for illustration purposes
    return_failed_rows=True,  # only return the failed rows
).show()

### output

+-----------+------------+-------------------------------+
|primary_key|house_number|house_number__ValidNumericRange|
+-----------+------------+-------------------------------+
|          4|          -3|                          false|
+-----------+------------+-------------------------------+

Now, we've added two configurable tests to the DataFrameTester. In the background, all test results are saved and they can be shown using the .results attribute:

df_tester.results.show(truncate=False)

### output

+-----------+-------------------------+-------------------------------+
|primary_key|street__ValidStreetFormat|house_number__ValidNumericRange|
+-----------+-------------------------+-------------------------------+
|1          |true                     |true                           |
|2          |true                     |true                           |
|3          |false                    |true                           |
|4          |true                     |false                          |
|5          |false                    |true                           |
+-----------+-------------------------+-------------------------------+

With these results, it's easy to start a conversation with the software supplier or data owner about the data quality of the set. Of course, street__ValidStreetFormat and house_number__ValidNumericRange might not be specific enough, which is why the descriptions can be accessed.

df_tester.description_df.show(truncate=False)

### output

+-------------------------------+-------------------------------------------------------------+
|test                           |description                                                  |
+-------------------------------+-------------------------------------------------------------+
|street__ValidStreetName        |Street has valid format.                                     |
|house_number__ValidNumericRange|house_number__ValidNumericRange(min_value=1.0, max_value=inf)|
+-------------------------------+-------------------------------------------------------------+

Through these configurable tests, even people with hardly any programming or Python experience can easily create data quality tests.

However, sometimes data quality checks require more advanced logic than can be provided through a configurable test.

Custom tests

A test we apply at Woonstad Rotterdam is that we want to make sure that every 3D model of a house has at least one bathroom. However, in our situation, bathrooms cannot be found in the house dataframe, they are in a room dataframe. But, the test that every house should have a bathroom is on the house level, not on the room level. To account for this, the DataFrameTester allows custom tests. Let's write a custom test to test whether each house has at least one bathroom.

rooms = [
    (1, "living room"),
    (1, "bath room"),
    (1, "kitchen"),
    (1, "bed room"),
    (2, "living room"),
    (2, "bed room"),
    (2, "kitchen"),
]

schema_rooms = StructType(
    [
        StructField("house", IntegerType(), True),
        StructField("room", StringType(), True),
    ]
)

room_df = spark.createDataFrame(rooms, schema=schema_rooms)

room_df.show(truncate=False)

### output

+-----------+-----------+
|house      |room       |
+-----------+-----------+
|1          |living room|
|1          |bath room  |
|1          |kitchen    |
|1          |bed room   |
|2          |living room|
|2          |bed room   |
|2          |kitchen    |
+-----------+-----------+

We must make sure that a custom test result can join on the primary key with a one-on-one relation. The DataFrameTester has all kinds of checks in runs in the background to account for this.

We can aggregate the room data to check whether every house has a bathroom.

house_has_bath_room = room_df.groupBy("house").agg(
    F.max(F.when(F.col("room") == "bath room", True).otherwise(False)).alias(
        "has_bath_room"
    )
)

house_has_bath_room.show(truncate=False)

### output

+-----------+-------------+
|house      |has_bath_room|
+-----------+-------------+
|1          |true         |
|2          |false        |
+-----------+-------------+

We can add this custom test result to the DataFrameTester using add_custom_test_result. To do that, we must not forget to rename the house column to primary_key to match with the house dataframe we're testing.

df_tester.add_custom_test_result(
    result=house_has_bath_room.withColumnRenamed("house", "primary_key"),
    name="has_bath_room",
    description="House has a bath room",
    fillna_value=None, # also the default value
).show(truncate=False)

### output

+-----------+-------------+
|primary_key|has_bath_room|
+-----------+-------------+
|1          |true         |
|2          |false        |
|3          |null         |
|4          |null         |
|5          |null         |
+-----------+-------------+

Despite that the presence of a bathroom in a house is not present in the house dataframe, we can add the test to its test results as that's the level the presence of a bathroom is tested on.

df_tester.results.show(truncate=False)

### output

+-----------+-------------------------+-------------------------------+-------------+
|primary_key|street__ValidStreetFormat|house_number__ValidNumericRange|has_bath_room|
+-----------+-------------------------+-------------------------------+-------------+
|1          |true                     |true                           |true         |
|2          |true                     |true                           |false        |
|3          |false                    |true                           |null         |
|4          |true                     |false                          |null         |
|5          |false                    |true                           |null         |
+-----------+-------------------------+-------------------------------+-------------+

Reporting

Now, to make reporting easy we've added several properties to DataFrameTester:

  • To see rows where at least one test failed:
df_tester.failed_tests.show(truncate=False)

### output

+-----------+-------------------------+-------------------------------+-------------+
|primary_key|street__ValidStreetFormat|house_number__ValidNumericRange|has_bath_room|
+-----------+-------------------------+-------------------------------+-------------+
|2          |true                     |true                           |false        |
|3          |false                    |true                           |null         |
|4          |true                     |false                          |null         |
|5          |false                    |true                           |null         |
+-----------+-------------------------+-------------------------------+-------------+
  • To see rows where all tests passed:
df_tester.passed_tests.show(truncate=False)

### output

+-----------+-----------------------+-------------------------------+-------------+
|primary_key|street__ValidStreetName|house_number__ValidNumericRange|has_bath_room|
+-----------+-----------------------+-------------------------------+-------------+
|1          |true                   |true                           |true         |
+-----------+-----------------------+-------------------------------+-------------+
  • To see summary statistics:
df_tester.summary.show(truncate=False)

+-------------------------------+-------------------------------------------------------------+-------+--------+-----------------+--------+-----------------+
|test                           |description                                                  |n_tests|n_passed|percentage_passed|n_failed|percentage_failed|
+-------------------------------+-------------------------------------------------------------+-------+--------+-----------------+--------+-----------------+
|street__ValidStreetFormat      |Street has valid format.                                     |5      |3       |60.0             |2       |40.0             |
|house_number__ValidNumericRange|house_number__ValidNumericRange(min_value=1.0, max_value=inf)|5      |4       |80.0             |1       |20.0             |
|has_bath_room                  |House has a bath room                                        |2      |1       |50.0             |1       |50.0             |
+-------------------------------+-------------------------------------------------------------+-------+--------+-----------------+--------+-----------------+

At Woonstad Rotterdam, we track the history of the test results to generate reports of the improvement and decline of data quality over time, which is a great way to evaluate whether the data owner or software supplier is taking action on earlier reported data quality issues.

Conclusion

If you use PySpark and you want to easily measure your data quality by using both easy-to-use configurable tests and more advanced custom tests, make sure to check out [pyspark-testframework](https://github.com/woonstadrotterdam/pyspark-testframework). Its most essential features are laid out in this article, but to see all its features and workings, take a look at its GitHub page. We're still actively developing new features, and of course, you're more than welcome to join the open-source project and develop new features with us!

If you think this article was interesting, make sure to read my other related articles!

How to Store Historical Data Much More Efficiently

How to Setup Your Macbook for Data Science in 2024

Tags: Data Engineering Data Science Databricks Python Software Development

Comment