PySpark Explained: Four Ways to Create and Populate DataFrames

Author:Murphy  |  View: 29307  |  Time: 2025-03-22 20:56:55
Image by AI (Dalle-3)

When using PySpark, especially if you have a background in SQL, one of the first things you'll want to do is get the data you want to process into a DataFrame. Once the data is in a DataFrame, it's easy to create a temporary view (or permanent table) from the DataFrame. At that stage, all of PySpark SQL's rich set of operations becomes available for you to use to further explore and process the data.

Since many standard SQL skills are easily transferable to PySpark SQL, it's crucial to prepare your data for direct use with PySpark SQL as early as possible in your processing pipeline. Doing this should be a top priority for efficient data handling and analysis.

You don't have to do this of course, as anything you can do with PySpark SQL on views or tables can be done directly on DataFrames too using the API. But as someone who is far more comfortable using SQL than the DataFrame API, my goto process when using Spark has always been,

input data -> DataFrame-> temporary view-> SQL processing

To help you with this process, this article will discuss the first part of this pipeline, i.e. getting your data into DataFrames, by showcasing four of the most common ways to create and populate them from various data sources.

I'll cover populating DataFrames from …

  • Resilient Distributed Datasets (RDDs)
  • In-memory data structures like lists, dictionaries, and ranges
  • External files such as CSVs, JSON and Parquet formats
  • External RDBMS tables

Accessing a FREE PySpark development environment

Want to follow along with the code in this article? Then you'll need access to a PySpark development environment.

If you're lucky enough to have access to PySpark either through your work, via the cloud, or a local install, go ahead and use that. If not, please have a look at the link below, where I go into detail about how you can access a great FREE online PySpark development environment called the Databricks Community Edition.

Databricks is a cloud-based platform for Data Engineering, machine learning, and analytics built around Apache Spark and provides a unified environment for working with big data workloads. The founders of Databricks created Spark, so they know their stuff.

How to access a FREE online Spark development environment

DataFrame population examples

Since I'm using the DataBricks community edition for my code, in all my examples my Spark session is set to the variable spark, and I'm assuming yours is too.

  1. From Resilient Distributed Datasets (RDDs)

RDDs are the fundamental data structure that underpins all of Spark's processing grunt. RDDs are useful in their own right, but in more recent versions of Spark, their use has largely been superseded due to the introduction of their fancier rivals, the DataFrame and DataSet. You should note that DataSets are not available in Pyspark due to their "typed" nature, so aren't discussed further here.

Firstly, we need to get our input data into an RDD, and one of the main ways of doing this is by parallelizing an existing collection of data.

For example, a list of tuples

Python">
from pyspark.sql.types import *

myData = [(1,'Tom','Jones'),(2,'Dick','Barton'),(3,'Harry','Worth')] 
# Assuming your spark session is already set
sc = spark.sparkContext
myRdd = sc.parallelize(myData)

myRdd.collect()

>>

Out[4]: [(1, 'Tom', 'Jones'), (2, 'Dick', 'Barton'), (3, 'Harry', 'Worth')]

Now that the data is in an RDD, we can create a DataFrame based on it using the toDF()directive. When created like this, the data types of the DataFrame columns are inferred from the data.

# Convert the RDD to a DataFrame with inferred schema
myDF = myRdd.toDF(['id', 'first_name', 'last_name'])

# Show the DataFrame
myDF.show()

myDf:pyspark.sql.dataframe.DataFrame
id:long
firstName:string
lastName:string

+---+---------+--------+
| id|firstName|lastName|
+---+---------+--------+
|  1|      Tom|   Jones|
|  2|     Dick|  Barton|
|  3|    Harry|   Worth|
+---+---------+--------+

A second way to create a DataFrame from an RDD is to use the createDataFrame function. This method, while similar to the first, has the advantage over using toDF() in that you can fully specify a pre-defined schema (i.e. a name and data type) to apply to the DataFrame for each column of the data.

So, assuming the same input data as before, I first define the schema of the data.

mySchema = StructType([StructField("ID", StringType()),
                    StructField("FirstName", StringType()),
                    StructField("SurName", StringType())]) 

In this case, I specified that the ID column should be of string data type rather than an integer. Now I can use this code,

myData = [(1,'Tom','Jones'),(2,'Dick','Barton'),(3,'Harry','Worth')] 
myRdd = sc.parallelize(myData) 
myDf = spark.createDataFrame(myRdd,mySchema) 
myDf.show()

myDf:pyspark.sql.dataframe.DataFrame
ID:string
FirstName:string
SurName:string
+---+---------+-------+
| ID|FirstName|SurName|
+---+---------+-------+
|  1|      Tom|  Jones|
|  2|     Dick| Barton|
|  3|    Harry|  Worth|
+---+---------+-------+

2. From in-memory data structures

For some data, you don't always need the intermediate step of converting from an RDD to a DataFrame. Often you can use the createDataFrame function to read data directly into a DataFrame. If you have a Python list for example, assuming each data item is the same data type, you can use the following syntax.

myList = [1, 2, 3, 4]

myDf=spark.createDataFrame(myList,IntegerType()) 
myDf.show()

+-----+ 
|value| 
+-----+ 
| 1   | 
| 2   | 
| 3   | 
| 4   | 
+-----+

If the data types are mixed, it's a tad more complicated, but not by much. For instance,

# Mixed data in a Python list
data = [1, 2, 3, 'hello', 'world']

# Create the DataFrame
df = spark.createDataFrame([(value,) for value in data], ["value"])

# Show the DataFrame
df.show()

df:pyspark.sql.dataframe.DataFrame
value:string
+-----+
|value|
+-----+
|    1|
|    2|
|    3|
|hello|
|world|
+-----+

PySpark recognised that there were mixed data types present in this example and defaulted to the lowest type that could support all the data. In this case, that was a string data type.

You can also handle data that's in other in-memory data structures, like tuples and dictionaries. Here are examples of both.

# A dictionary
#
myDict=[{'Name':'Tom','Age':25},
        {'Name':'Dick','Age':33},
        {'Name':'Harry','Age':40}] 

myDf=spark.createDataFrame(myDict) 

myDf.show()

+---+-----+
|Age| Name|
+---+-----+
| 25|  Tom|
| 33| Dick|
| 40|Harry|
+---+-----+

# And tuples
#
mySchema = StructType([StructField("Career", StringType()),
      StructField("FirstName", StringType()),
      StructField("SurName", StringType())]) 

myTuple = [('Singer','Tom','Jones'),
           ('Special Agent','Dick','Barton'),
           ('Comedian','Harry','Worth')] 

myDf = spark.createDataFrame(myTuple,mySchema) 

myDf.show()

+-------------+---------+-------+
|       Career|FirstName|SurName|
+-------------+---------+-------+
|       Singer|      Tom|  Jones|
|Special Agent|     Dick| Barton|
|     Comedian|    Harry|  Worth|
+-------------+---------+-------+

Related to that last example, it's also possible to create an empty DataFrame, with a pre-defined schema.

# create an empty dataframe with a pre-defined schema

from pyspark.sql.types import *

field = [StructField("FIELDNAME_1",StringType(), True),
         StructField("FIELDNAME_2", IntegerType(), True),
         StructField("FIELDNAME_3", FloatType(), True)]

schema = StructType(field)
empty_df = spark.createDataFrame([], schema) # spark is the Spark Session
empty_df.show()

empty_df:pyspark.sql.dataframe.DataFrame
FIELDNAME_1:string
FIELDNAME_2:integer
FIELDNAME_3:float

+-----------+-----------+-----------+
|FIELDNAME_1|FIELDNAME_2|FIELDNAME_3|
+-----------+-----------+-----------+
+-----------+-----------+-----------+

Lastly, you can use the spark.range function to create a one-column DataFrame of data type long, containing a sequence of integers.

# Use spark.range to create a DataFrame with even
# numbers between 4 and 14. Default column name is "id"
# so we rename it
#
df = spark.range(4,15,2).withColumnRenamed("id", "even numbers")

df.show(10)

+------------+
|even numbers|
+------------+
|           4|
|           6|
|           8|
|          10|
|          12|
|          14|
+------------+

3. From an external file data source such as a CSV, JSON or parquet file

The exact code to do this depends on a few variables, for example, if reading CSV data do you know the schema of your input file, does it have a header record etc …

If you do not know the schema of your input file you will have to use the inferschema=TRUEdirective, which I highly recommend avoiding if possible, especially for large input files.

In this example, assume you have a CSV file, c://cities.csv, on your local PC, that looks like this.

City, Country, Latitude, Longitude, Population
Tokyo,Japan,35.6895,139.69171,38001000 
Delhi,India,28.66667,77.21667,25703000 
Shanghai,China,31.22,121.46,23741000 
São Paulo,Brazil,-23.55

I uploaded this file to the dbfs filesystem on Databricks, but the location of the file isn't that important, and it could just as easily be somewhere like an AWS S3 bucket.

First, specify our schema.

from pyspark.sql.types import StructField, 
                              StructType, DoubleType,StringType,LongType

citySchema = StructType([
    StructField("city", StringType(), True),
    StructField("country", StringType(), True),
    StructField("latitude", DoubleType(), True),
    StructField("longitude", DoubleType(), True),
    StructField("population", LongType(), True),
        ])

Now, read the CSV data into our DataFrame.

myDf = spark.read.format("csv")
          .option("header", "true").schema("citySchema")
          .option("delimiter", ',').option("mode","permissive")
          .load("dbfs:/FileStore/shared_uploads/test/cities.txt")

myDf.show()

+---------+-------+--------+---------+----------+ 
| city    |country|latitude|longitude|population| 
+---------+-------+--------+---------+----------+ 
| Tokyo   | Japan |35.6895 |139.69171| 38001000 | 
| Delhi   | India |28.66667|77.21667 | 25703000 | 
| Shanghai| China |31.22   |121.46   | 23741000 | 
|Sao Paulo| Brazil|-23.55  |null     | null     | 
+---------+-------+--------+---------+----------+

You may not have seen the modeoption before in the spark.read.format function call. This tells Spark how to deal with missing and/or malformed records in CSV data. In this case, permissive mode tells Spark to insert nulls into fields that could not be properly parsed.

If, instead, the input data was in JSON format like this,

{"City": "Tokyo", "Country": "Japan", "Latitude": 35.6895, "Longitude": 139.69171, "Population": 38001000}
{"City": "Delhi", "Country": "India", "Latitude": 28.66667, "Longitude": 77.21667, "Population": 25703000}
{"City": "Shanghai", "Country": "China", "Latitude": 31.22, "Longitude": 121.46, "Population": 23741000}
{"City": "São Paulo", "Country": "Brazil", "Latitude": -23.55, "Longitude": -46.6333, "Population": 21066245}

The code would be just as simple.

# Path to the JSON file
json_file_path = "dbfs:/FileStore/shared_uploads/test/cities.json"

# Read the JSON file into a DataFrame with error handling
df = spark.read.json(json_file_path)
df.show()
df.printSchema()

df:pyspark.sql.dataframe.DataFrame
City:string
Country:string
Latitude:double
Longitude:double
Population:long

+---------+-------+--------+---------+----------+
|     City|Country|Latitude|Longitude|Population|
+---------+-------+--------+---------+----------+
|    Tokyo|  Japan| 35.6895|139.69171|  38001000|
|    Delhi|  India|28.66667| 77.21667|  25703000|
| Shanghai|  China|   31.22|   121.46|  23741000|
|São Paulo| Brazil|  -23.55| -46.6333|  21066245|
+---------+-------+--------+---------+----------+

root
 |-- City: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Latitude: double (nullable = true)
 |-- Longitude: double (nullable = true)
 |-- Population: long (nullable = true)

And for a Parquet input file, it's this …

...
...
# Path to the Parquet file
parquet_file_path = "dbfs:/FileStore/shared_uploads/test/cities.parquet"

df = spark.read.parquet(parquet_file_path)
...
...

It's worth remembering that if your input data is a text file, and is unstructured and/or has very complex parsing requirements, it may be a better option to read it into an RDD first. You could then process it using RDD-specific functions like map before converting it to a regular DataFrame as we did previously by using toDF().

The syntax to read a text file into an RDD is:-

sc = spark.sparkContext

lines = sc.textFile("dbfs:/FileStore/tables/cities.txt")

4. From tables in an external RDBMS

For our final example, I'll show how you can create a PySpark DataFrame from an external relational database table. Assume you have a MySQL database table filled with our cities' data.

I used the FreeSQLdatabase online service for this. It allows you to set up a small MySQL database for free that's completely online and accessible by PySpark, as long as you have the correct database connection credentials and PySpark has access to an appropriate JDBC driver.

If you want to try it out, click the link below.

Free Sql Database

Using this online database, or otherwise, set up a cities table that mimics our city data from the previous example. Here is my table.

Image by Author

Whichever database or connection method is used, you'll need to take note of the host server as well as the database name and user login credentials.

After this set-up, and before using your database with PySpark, you'll need to ensure that PySpark has access to the relevant JDBC driver for your database. How you do this depends on which system you're using. If using Databricks Community Edition, do the following.

Click on your Spark cluster (accessible from the Computemenu block on the left-hand side of your screen), then click on the Libraries TAB. Your screen should look something like this.

Image from Databricks Community Edition

Now click on the Install new button at the far right-hand side. You'll get a pop-up screen like this.

Image from Databricks Community Edition

Make sure you click on the Maven option, then in the Coordinates text box enter the following and click the Install button.

mysql:mysql-connector-java:8.0.26

If you're not using MySQL, use the Search Packages link to find the appropriate driver you need.

For other Spark environments, getting access to a JDBC driver depends on the system you're using. For most, you will need to follow steps similar to this, e.g. for the MySQL JDBC driver,

  • Download the MySQL Connector/J from MySQL Connector/J or Maven Central. For example, mysql-connector-java-8.0.26.jar.
  • Copy the downloaded JAR file to a location accessible by your Spark application. For simplicity, let's assume the JAR file is located at /path/to/mysql-connector-java-8.0.26.jar.
  • Use the --jars option to include the JDBC driver when you submit your Spark application. For example,
spark-submit --jars /path/to/mysql-connector-java-8.0.26.jar your_spark_application.py

Alternatively, add the driver to your Spark Session Configuration using code similar to this.

from pyspark.sql import SparkSession

# Initialize Spark session with JDBC driver JAR
spark = SparkSession.builder 
    .appName("SparkJDBCExample") 
    .config("spark.jars", "/path/to/mysql-connector-java-8.0.26.jar") 
    .getOrCreate()

Your PySpark environment should now have access to MySQL and as I've already created and populated a cities table on the database, creating a DataFrame based on its contents is simply this,

jdbc_url = "jdbc:mysql://your_mysql_host:3306/your_mysql_dbname"
jdbc_properties = {
    "user": "your_dbuser",
    "password": "your_dbpassword",
    "driver": "com.mysql.cj.jdbc.Driver"
}

# Verify the table by reading the data back into a DataFrame
df_read = spark.read.jdbc(
    url=jdbc_url,
    table="cities",
    properties=jdbc_properties
)

df_read.display()

df_read:pyspark.sql.dataframe.DataFrame
City:string
Country:string
Latitude:double
Longitude:double
Population: integer

+---------+-------+--------+---------+----------+
|     City|Country|Latitude|Longitude|Population|
+---------+-------+--------+---------+----------+
|São Paulo| Brazil|  -23.55| -46.6333|  21066245|
|    Tokyo|  Japan| 35.6895|139.69171|  38001000|
|    Delhi|  India|28.66667| 77.21667|  25703000|
| Shanghai|  China|   31.22|   121.46|  23741000|
+---------+-------+--------+---------+----------+

Summary

In this article, I have demonstrated four of the most common methods of populating PySpark DataFrames from external data sources.

Loading your data into a DataFrame is an essential first step in performing further processing or analysis on it. You can of course process the data using PySpark's DataFrame API, but if you have a background in SQL you'll find it's much easier to do so using PySpark's powerful SQL processing capabilities.

Once your data is in a DataFrame, you can create a temporary view on it with a single line of PySpark code, giving you access to the full range of PySpark's SQL operations.

_Ok, that's all for me for now. Hopefully, you found this article useful. If you did, please check out my profile page at this link. From there, you can see my other published stories and subscribe to get notified when I post new content._

I know times are tough and wallets constrained, but if you got real value from this article, please consider buying me a wee dram.

If you liked this content, I think you'll find these articles interesting too.

PySpark Explained: The explode and collect_list Functions

SQL Explained: Ranking Analytics

Tags: Data Engineering Data Science Programming Pyspark Python

Comment