PySpark Explained: Dealing with Invalid Records When Reading CSV and JSON Files

Author:Murphy  |  View: 23524  |  Time: 2025-03-22 21:03:27

If you are a frequent user of PySpark, one of the most common operations you'll do is reading CSV or JSON data from external files into DataFrames. If your input data has a user-specified schema definition associated with it, you may find that not all the records you are processing will meet the schema specification.

In other words, there may be invalid records present. Perhaps some fields will be missing, there will be extra unaccounted-for fields, or some will contain data that are not of the type specified in the schema. For small files, tracking down these records isn't a problem, but for the huge mega files of the big data world, these can be a real headache to sort out. The question is, what can we do in these scenarios?

As you'll find out shortly, one of the answers to this question is to use the various PySpark parseoptions available when you read CSV or JSON files into a DataFrame.

Accessing a FREE PySpark development environment

Before we continue, if you want to follow along with the code in this article, you'll need access to a PySpark development environment.

If you're lucky enough to have access to PySpark either through your work, via the cloud, or a local install, go ahead and use that. If not, please have a look at the link below, where I go into detail about how you can access a great FREE online PySpark development environment called the Databricks Community Edition.

Databricks is a cloud-based platform for Data Engineering, machine learning, and analytics built around Apache Spark and provides a unified environment for working with big data workloads. The founders of Databricks created Spark, so they know their stuff.

How to access a FREE online Spark development environment

Sample data

For this article, assume we are processing a CSV or JSON data file containing information about the most populous cities in the world. The file contains the city name, the country it's in, its latitude and longitude coordinates, and finally, its population as it was in 2015.

Below is an example of such a file containing a mix of valid records as well as some records that contain invalid data.

City, Country, Latitude,Longitude,Population
Tokyo,Japan,35.6895,139.69171,38001000
Delhi,India,28.66667,77.21667,25703000
Shanghai,China,31.22,121.46,23741000,ABCDE
São Paulo,Brazil,-23.55
Mumbai,India, India,72.880838,21043000

Here is the JSON equivalent of the above CSV data.

{"City": "Tokyo", "Country": "Japan", "Latitude": 35.6895, "Longitude": 139.69171, "Population": 38001000}
{"City": "Delhi", "Country": "India", "Latitude": 28.66667, "Longitude": 77.21667, "Population": 25703000}
{"City": "Shanghai", "Country": "China", "Latitude": 31.22, "Longitude": 121.46, "Population": "23741000,ABCDE"}
{"City": "São Paulo", "Country": "Brazil", }
{"City": "Mumbai","Country": "India","Latitude": "India","Longitude": 72.880838,"Population": 21043000}

I created both of these files locally on my system and then uploaded them to the Databricks file system as cities.csv and cities.json.

Records 1 and 2 are valid, record 3 has a spurious extra text value "ABCDE" at its end. Record 4 has missing data for the latitude and longitude fields, and record 5 has repeated the string "India" where the latitude value should be.

Obviously, in this case, the malformed records are easy to spot and these would be dealt with long before we got around to processing the data with Pyspark. But consider the case where these records are buried in the middle of a multi-megabyte CSV or JSON file. Or it could simply be that the processing of the data is done as part of an automated pipeline, and you don't necessarily have sight of the data beforehand.

Does Spark offer anything that can help us to identify and process data such as this? Of course, it does, or there would be little point in this article!

When reading CSV or JSON data, PySpark can use three different parse modes to help deal with data issues. These are,

Permissive

This is the default behaviour and tells Spark to insert nulls into fields that could not be properly parsed. If any of the input fields are deemed to be non-conformant with the defined schema, all fields are set to the value null. Use this mode when you want to read as much of the data as possible and deal with invalid data at a later stage.

Dropmalformed

In this mode, Spark will drop any records where one or more fields cannot be parsed correctly. You would use this mode when you only want to read data that exactly meets your schema definition.

Failfast

In this mode, Spark will throw an exception if there is any input data at all that it can't parse properly, and no data will be loaded. Use this mode when errors in the input data cannot be tolerated.

Let's look at an example of each of these modes in action.

PySpark code

When processing data like the "cities" file using PySpark, typically, the first thing we would do is set up the schema associated with the input data. In other words, we describe to PySpark the names and data types of the fields that make up our input data records.

Python">from pyspark.sql.types import StructField, StructType, DoubleType,StringType,LongType

citySchema = StructType(
  [

        StructField("city", StringType(), True),
        StructField("country", StringType(), True),
        StructField("latitude", DoubleType(), True),
        StructField("longitude", DoubleType(), True),
        StructField("population", LongType(), True),
  ]
)

Here we say, that each row of our input data should contain 5 fields, the first two fields are text, the next two should be doubles and the final one a long integer.

Using permissive mode

We can use the spark.read function to read our data file. Note that we specify the format of the file as CSV, as parse modes only work for CSV or JSON format files. Next, we indicate that the file contains a header, otherwise PySpark will treat the first line of the file as input data. Next, we specify the pre-defined schema to use, our field delimiter, parse mode and finally the location of our input file.

df = spark.read.format("csv") 
    .option("header", "true").schema("citySchema") 
    .option("delimiter", ',').option("mode","PERMISSIVE")
    .load("dbfs:/FileStore/shared_uploads/test/cities.csv")

df.show()

+---------+-------+--------+---------+----------+
|     city|country|latitude|longitude|population|
+---------+-------+--------+---------+----------+
|    Tokyo|  Japan| 35.6895|139.69171|  38001000|
|    Delhi|  India|28.66667| 77.21667|  25703000|
| Shanghai|  China|   31.22|   121.46|  23741000|
|Sao Paulo| Brazil|  -23.55|     null|      null|
|     null|   null|    null|     null|      null|
+---------+-------+--------+---------+----------+

As you can see, the extra field in line 3 is ignored and the missing values from row 4 are set to null. The last row of the dataset, as we encountered a string where a double was expected, is treated as being corrupt, therefore all fields are set to a null value.

Depending on your circumstances, that extra value in record three might be an issue. Not the fact that Spark ignored it – that bit is OK, but it would have been nice to have been informed that something was not quite right with that particular record.

Luckily, there is a way you get information when this happens by adding an extra option to the spark.read function called columnNameOfCorruptRecord and setting its value to _corrupt_record. Additionally, we need to add an extra field called _corrupt_data to our schema to hold the value of any invalidated records.


from pyspark.sql.types import StructField, StructType, DoubleType,StringType,LongType

# Add  the extra field _corrupt_record
# to our original schema definition
#
citySchema = StructType([
    StructField("City", StringType(), True),
    StructField("Country", StringType(), True),
    StructField("Latitude", FloatType(), True),
    StructField("Longitude", FloatType(), True),
    StructField("Population", IntegerType(), True),
    StructField("_corrupt_record", StringType(), True)
])

#Now see what happens when we read the original file again
# after adding the columnNameOfCorruptRecord option
#
df = spark.read.format("csv") 
    .option("header", "true").schema("citySchema") 
    .option("delimiter", ',').option("mode","PERMISSIVE") 
    .option("columnNameOfCorruptRecord", "_corrupt_record") 
    .load("dbfs:/FileStore/shared_uploads/test/cities.csv")

df.show()

+---------+-------+--------+---------+----------+------------------------------------------+
|City     |Country|Latitude|Longitude|Population|_corrupt_record                           |
+---------+-------+--------+---------+----------+------------------------------------------+
|Tokyo    |Japan  |35.6895 |139.69171|38001000  |null                                      |
|Delhi    |India  |28.66667|77.21667 |25703000  |null                                      |
|Shanghai |China  |31.22   |121.46   |23741000  |Shanghai,China,31.22,121.46,23741000,ABCDE|
|São Paulo|Brazil |-23.55  |null     |null      |São Paulo,Brazil,-23.55                   |
|Mumbai   |India  |null    |72.88084 |21043000  |Mumbai,India, India,72.880838,21043000    |
+---------+-------+--------+---------+----------+------------------------------------------+

As you can see, we get a completely new field, _corrupt_record, added to our DataFrame. This field is set to null if the input record is valid, otherwise it contains the whole of the input record. In the case of the Shanghai record, it has caught that there is extra information present in the input record not accounted for by the schema definition and has, therefore, flagged it as being suspect. This can be very useful in subsequent steps of our data processing pipeline.

Using Dropmalformed

For this example, we'll use the JSON format file we created based on the cities' data. We can revert to our original schema, as we don't need the _corruptrecord option this time around. Remember, dropmalformed only_ processes valid records.

from pyspark.sql.types import StructField, StructType, DoubleType,StringType,LongType

# Define the schema
citySchema = StructType([
    StructField("City", StringType(), True),
    StructField("Country", StringType(), True),
    StructField("Latitude", FloatType(), True),
    StructField("Longitude", FloatType(), True),
    StructField("Population", IntegerType(), True)
])
df = spark.read.format("json") 
    .schema(citySchema) 
    .option("mode", "DROPMALFORMED") 
    .load("dbfs:/FileStore/shared_uploads/test/cities.json")

# Show the DataFrame
df.show(truncate=False)

+-----+-------+--------+---------+----------+
|City |Country|Latitude|Longitude|Population|
+-----+-------+--------+---------+----------+
|Tokyo|Japan  |35.6895 |139.69171|38001000  |
|Delhi|India  |28.66667|77.21667 |25703000  |
+-----+-------+--------+---------+----------+

Good, the output is what we were expecting. Only the two valid records in our input data file have been loaded to the DataFrame.

Using Failfast

This parse mode should throw an exception if ANY input records fail the schema validation. Let's see if that's the case. Switching back to our CSV data file, we have …

from pyspark.sql.types import StructField, StructType, DoubleType,StringType,LongType

# Define the schema
citySchema = StructType([
    StructField("City", StringType(), True),
    StructField("Country", StringType(), True),
    StructField("Latitude", FloatType(), True),
    StructField("Longitude", FloatType(), True),
    StructField("Population", IntegerType(), True)
])

df = spark.read.format("csv") 
    .option("header", "true") 
    .schema("citySchema") 
    .option("delimiter", ',') 
    .option("mode","FAILFAST") 
    .load("dbfs:/FileStore/shared_uploads/test/cities.csv")

df.show()

...
...
Caused by: org.apache.spark.SparkException: Malformed records are detected in record parsing. Parse Mode: FAILFAST. To process malformed records as null result, try setting the option 'mode' as 'PERMISSIVE'.
 at org.apache.spark.sql.errors.QueryExecutionErrors$.malformedRecordsDetectedInRecordParsingError(QueryExecutionErrors.scala:1936)
 at org.apache.spark.sql.catalyst.util.FailureSafeParser.parse(FailureSafeParser.scala:103)
 at org.apache.spark.sql.execution.datasources.json.TextInputJsonDataSource$.$anonfun$readFile$5(JsonDataSource.scala:215)
 at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
 at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
 at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:31)
 at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
 at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1$$anon$2.getNext(FileScanRDD.scala:608)
 ... 31 more
Caused by: org.apache.spark.sql.catalyst.util.BadRecordException: org.apache.spark.SparkRuntimeException: [CANNOT_PARSE_JSON_FIELD] Cannot parse the field name 'Population' and the value 23741000,ABCDE of the JSON token type VALUE_STRING to target Spark data type "INT".
Caused by: org.apache.spark.SparkRuntimeException: [CANNOT_PARSE_JSON_FIELD] Cannot parse the field name 'Population' and the value 23741000,ABCDE of the JSON token type VALUE_STRING to target Spark data type "INT".

...
...

To prove it's working as expected, let's fix the validation errors in our input CSV file and try Failfast again.

City, Country, Latitude,Longitude,Population
Tokyo,Japan,35.6895,139.69171,38001000
Delhi,India,28.66667,77.21667,25703000
Shanghai,China,31.22,121.46,23741000
São Paulo,Brazil,-23.55,-46.6396,12330000
Mumbai,India,19.0760,72.880838,21043000

And our output, running the same FAILFAST code as before, is …

+---------+-------+--------+---------+----------+
|City     |Country|Latitude|Longitude|Population|
+---------+-------+--------+---------+----------+
|Tokyo    |Japan  |35.6895 |139.69171|38001000  |
|Delhi    |India  |28.66667|77.21667 |25703000  |
|Shanghai |China  |31.22   |121.46   |23741000  |
|São Paulo|Brazil |-23.55  |-46.6396 |12330000  |
|Mumbai   |India  |19.076  |72.88084 |21043000  |
+---------+-------+--------+---------+----------+

Summary

In this article, I've shown you how to use the parse mode option when reading JSON or CSV files using PySpark. The 3 different modes, permissive, dropmalformed and failfast give you alternative ways to deal with invalid data. From loading everything and worrying about invalid data later to throwing an exception if any invalid data is encountered, PySpark gives you total control over how you deal with the unexpected when processing CSV or JSON data files.

_Ok, that's all for me for now. Hopefully, you found this article useful. If you did, please check out my profile page at this link. From there, you can see my other published stories and subscribe to get notified when I post new content._

If you liked this content, I think you'll find these articles interesting, too.

PySpark Explained: The explode and collect_list Functions

Luma Labs AI: A first look at their Text/Image to Video product

Tags: Data Engineering Data Science Data Validation Pyspark Python

Comment