PySpark Explained: Delta Table Time Travel Queries
Revisit the past faster than a time lord.
In a previous article, I did a deep dive into Databricks Delta tables, what they are, and how you use them. Check it out below,
One of the advantages I mentioned in that article was the ability to do time-travel queries on Delta tables. In other words, you can go back and see what data was in a particular table at any time in the past.
Why might this be useful? I can think of a few advantages and one BIG one in particular.
1. Data Auditing and Compliance
- Historical Data Inspection. Time travel allows you to access historical versions of your data, which is useful for auditing purposes. You can inspect past states of your data to ensure compliance with regulatory requirements.
2. Debugging and Development
- Debugging Data Issues. By querying previous versions of your data, you can identify when and how issues were introduced. This helps debug and resolve data-related problems.
- Reproducible Research. Time travel allows data scientists and analysts to reproduce experiments and reports using the exact data version they used initially, ensuring consistency and reliability.
3. Data Analytics and Reporting
- Historical Analysis. You can perform historical data analysis without maintaining separate historical datasets. This simplifies the process of comparing current data with past data.
- Trend Analysis. By accessing data at different points in time, you can analyse trends and patterns over time, providing valuable insights for decision-making.
4. Simplified Data Management
- Ease of Use. Time travel reduces the need to manage snapshots or backups for historical data access manually, simplifying data management tasks.
- Data Lineage. Delta Lake maintains a full audit trail of changes, which provides complete data lineage. This helps in understanding the origin and transformation of data over time.
And the biggest reason I think time-travel queries are useful…
5. Data Recovery and Versioning
- Accidental Data Deletion Recovery: If data is accidentally deleted or corrupted, you can easily revert to a previous version to recover the lost data.
- Version Rollback: You can roll back to a specific version of the data, which is useful if an error is discovered in a recent update.
In the rest of this article, we'll focus on the data recovery aspect by using time-travel SQL on Delta tables to showcase two classic use cases.
Accessing a FREE PySpark development environment
If you want to follow along with the code in this article, you'll need access to a PySpark development environment with an installation of Delta. Delta tables are pre-built into the Spark ecosystem on Databricks by default, so that's what I'll use to demonstrate them.
If you're lucky enough to have access to PySpark & Delta either through your work, via the cloud, or a local install, go ahead and use that. Otherwise, please have a look at the link below, where I go into detail about how you can access a great FREE online PySpark development environment called the Databricks Community Edition.
Databricks is a cloud-based platform for data engineering, Machine Learning, and analytics built around Apache Spark and provides a unified environment for working with big data workloads. The founders of Databricks created Spark, so they know their stuff.
For the rest of this article, I'll assume you're running the Databricks community edition. I ran all the following code snippets on a Databricks Jupyter-like notebook.
Initial Delta table setup
For our data, we'll use (made up) population census information from cities across the world. We'll start with 3 different data sets. The first will be our initial data, then we'll append a second and third data set. The second and third data sets will contain updated population and census date information. And, we'll assume all 3 data sets are inserted with different timestamps.
from Pyspark.sql import Row
import datetime
# Create a list of sample data for different versions
# of our city data
#
data_version_1 = [
Row(city="Los Angeles", country="USA", population=4000000, census_date=datetime.date(2023, 1, 1)),
Row(city="Mumbai", country="India", population=20000000, census_date=datetime.date(2023, 1, 1)),
Row(city="Tokyo", country="Japan", population=14000000, census_date=datetime.date(2023, 1, 1))
]
data_version_2 = [
Row(city="Los Angeles", country="USA", population=4100000, census_date=datetime.date(2023, 6, 1)),
Row(city="Mumbai", country="India", population=21000000, census_date=datetime.date(2023, 6, 1)),
Row(city="Tokyo", country="Japan", population=13950000, census_date=datetime.date(2023, 6, 1))
]
data_version_3 = [
Row(city="Los Angeles", country="USA", population=4150000, census_date=datetime.date(2024, 1, 1)),
Row(city="Mumbai", country="India", population=21500000, census_date=datetime.date(2024, 1, 1)),
Row(city="Tokyo", country="Japan", population=13900000, census_date=datetime.date(2024, 1, 1))
]
# Convert lists of Rows to DataFrames
df_version_1 = spark.createDataFrame(data_version_1)
df_version_2 = spark.createDataFrame(data_version_2)
df_version_3 = spark.createDataFrame(data_version_3)
# Write DataFrames to a Delta table
delta_table_path = "dbfs:/FileStore/tables/cities"
df_version_1.write.format("delta").mode("overwrite").save(cities)
df_version_2.write.format("delta").mode("append").save(cities)
df_version_3.write.format("delta").mode("append").save(cities)
# Register the Delta table
spark.sql(f"""
CREATE TABLE IF NOT EXISTS cities
USING DELTA
LOCATION '{delta_table_path}'
""")
Now we can see what the initial table looks like.
%sql
select * from cities
+--------------+---------+-----------+------------+
| city | country | population|census_date |
+--------------+---------+-----------+------------+
| Los Angeles | USA | 4000000 | 2023-01-01 |
| Los Angeles | USA | 4100000 | 2023-06-01 |
| Los Angeles | USA | 4150000 | 2024-01-01 |
| Mumbai | India | 20000000 | 2023-01-01 |
| Mumbai | India | 21000000 | 2023-06-01 |
| Mumbai | India | 21500000 | 2024-01-01 |
| Tokyo | Japan | 14000000 | 2023-01-01 |
| Tokyo | Japan | 13950000 | 2023-06-01 |
| Tokyo | Japan | 13900000 | 2024-01-01 |
+--------------+---------+-----------+------------+
Retrieving a Delta table changes history.
To retrieve the historical operations performed on a Delta table, use the describe history
command.
For example,
%sql
DESCRIBE HISTORY cities
+---------+---------------------------+-------------------+------------------------+-----------+-------------------------+-----+-----------------------------------+----------------------+-------------+-------------------+--------------+--------------------------------------------------------------+--------------+-------------------------------------+
| version | timestamp | userId | userName | operation | operationParameters | job | notebook | clusterId | readVersion | isolationLevel | isBlindAppend| operationMetrics | userMetadata | engineInfo |
+---------+---------------------------+-------------------+------------------------+-----------+-------------------------+-----+-----------------------------------+----------------------+-------------+-------------------+--------------+--------------------------------------------------------------+--------------+-------------------------------------+
| 2 | 2024-06-26T12:34:53.000Z | 8479462340460002 | ************@gmail.com | WRITE | {"mode":"Append"} | null| {"notebookId":"2764144823421000"} | 0626-122718-je67tvdp | 1 | WriteSerializable | true | {"numFiles":"3","numOutputRows":"3","numOutputBytes":"4057"} | null | Databricks-Runtime/12.2.x-scala2.12 |
| 1 | 2024-06-26T12:34:45.000Z | 8479462340460002 | ************@gmail.com | WRITE | {"mode":"Append"} | null| {"notebookId":"2764144823421000"} | 0626-122718-je67tvdp | 0 | WriteSerializable | true | {"numFiles":"3","numOutputRows":"3","numOutputBytes":"4058"} | null | Databricks-Runtime/12.2.x-scala2.12 |
| 0 | 2024-06-26T12:34:29.000Z | 8479462340460002 | ************@gmail.com | WRITE | {"mode":"Overwrite"} | null| {"notebookId":"2764144823421000"} | 0626-122718-je67tvdp | | WriteSerializable | false | {"numFiles":"3","numOutputRows":"3","numOutputBytes":"4058"} | null | Databricks-Runtime/12.2.x-scala2.12 |
+---------+---------------------------+-------------------+------------------------+-----------+-------------------------+-----+-----------------------------------+----------------------+-------------+-------------------+--------------+--------------------------------------------------------------+--------------+-------------------------------------+
You can get just the latest change by using,
%sql
DESCRIBE HISTORY cities LIMIT 1
+---------+---------------------------+-------------------+------------------------+-----------+-------------------------+-----+-----------------------------------+----------------------+-------------+-------------------+--------------+--------------------------------------------------------------+--------------+-------------------------------------+
| version | timestamp | userId | userName | operation | operationParameters | job | notebook | clusterId | readVersion | isolationLevel | isBlindAppend| operationMetrics | userMetadata | engineInfo |
+---------+---------------------------+-------------------+------------------------+-----------+-------------------------+-----+-----------------------------------+----------------------+-------------+-------------------+--------------+--------------------------------------------------------------+--------------+-------------------------------------+
| 2 | 2024-06-26T12:34:53.000Z | 8479462340460002 | ************@gmail.com | WRITE | {"mode":"Append"} | null| {"notebookId":"2764144823421000"} | 0626-122718-je67tvdp | 1 | WriteSerializable | true | {"numFiles":"3","numOutputRows":"3","numOutputBytes":"4057"} | null | Databricks-Runtime/12.2.x-scala2.12 |
+---------+---------------------------+-------------------+------------------------+-----------+-------------------------+-----+-----------------------------------+----------------------+-------------+-------------------+--------------+--------------------------------------------------------------+--------------+-------------------------------------+
There is quite a bit of information returned by this statement, but for all intents and purposes, the only two columns you'll need to use are the version and timestamp columns. We'll see why in a second.
The most useful time-travel commands.
In my opinion, the two most useful ways to use time-travel on Delta tables are via theTIMESTAMP AS OF
and VERSION AS OF
directives of the SELECT and RESTORE SQL statements. These will cover 99% of anything you need to do with time-travel queries.
That's why I said earlier that the version and timestamp columns from the describe history command were the important ones to note.
Let's look at some examples of both.
Restore a table to a previous version
In this example, we reset the table as it was just after the first three records were inserted.
%sql
/* Note, versions start at zero */
RESTORE TABLE cities TO VERSION AS OF 0;
select * from cities;
/* And we get back our insert of the
* original first 3 records
*/
+--------------+---------+-----------+------------+
| city | country | population| census_date|
+--------------+---------+-----------+------------+
| Los Angeles | USA | 4000000 | 2023-01-01 |
| Mumbai | India | 20000000 | 2023-01-01 |
| Tokyo | Japan | 14000000 | 2023-01-01 |
+--------------+---------+-----------+------------+
We can also do similar operations using timestamps. This time, the restoration is to a point in time just after the first six records were added.
%sql
/* first get our table back to its original state*/
RESTORE TABLE cities TO VERSION AS OF 2;
/* below is equivalent to restoring back to version 1
* as the timestamp used is the one associated with that particular version
*/
RESTORE TABLE cities TO TIMESTAMP AS OF '2024-06-26T12:34:45.000Z';
+--------------------------+----------------------------+------------------+--------------------+--------------------+---------------------+
| table_size_after_restore | num_of_files_after_restore | num_removed_files| num_restored_files | removed_files_size | restored_files_size |
+--------------------------+----------------------------+------------------+--------------------+--------------------+---------------------+
| 8116 | 6 | 3 | 0 | 4057 | 0 |
+--------------------------+----------------------------+------------------+--------------------+--------------------+---------------------+
select * from cities;
+--------------+---------+-----------+------------+
| city | country | population|census_date |
+--------------+---------+-----------+------------+
| Los Angeles | USA | 4100000 | 2023-06-01 |
| Los Angeles | USA | 4000000 | 2023-01-01 |
| Mumbai | India | 21000000 | 2023-06-01 |
| Mumbai | India | 20000000 | 2023-01-01 |
| Tokyo | Japan | 14000000 | 2023-01-01 |
| Tokyo | Japan | 13950000 | 2023-06-01 |
+--------------+---------+-----------+------------+
Fixing data errors
The most useful thing we can do with time-travel queries is fixing data errors. Let's illustrate with two common scenarios.
In our first example, assume we have fat-fingered a DELETE on a table. If we know the date and time when the deletion happened, we can use time travel to go back to just before it happened and recover the deleted rows.
If you're not sure exactly when a DELETE operation occurred, the describe history
command we used earlier can help pinpoint such cases. Look for the text DELETE under the operation
column of its output.
Python">from pyspark.sql import Row
import datetime
# Create initial population data
data_initial = [
Row(city="Los Angeles", country="USA", population=4000000, census_date=datetime.date(2023, 1, 1)),
Row(city="Mumbai", country="India", population=20000000, census_date=datetime.date(2023, 1, 1)),
Row(city="Tokyo", country="Japan", population=14000000, census_date=datetime.date(2023, 1, 1)),
Row(city="Los Angeles", country="USA", population=4100000, census_date=datetime.date(2023, 6, 1)),
Row(city="Mumbai", country="India", population=21000000, census_date=datetime.date(2023, 6, 1)),
Row(city="Tokyo", country="Japan", population=13950000, census_date=datetime.date(2023, 6, 1))
]
# Convert list of Rows to DataFrame
df_initial = spark.createDataFrame(data_initial)
# Define the path for the Delta table
delta_table_path = "dbfs:/FileStore/tables/cities"
# Write the initial data to the Delta table
df_initial.write.format("delta").mode("overwrite").save(delta_table_path)
# Register the Delta table
spark.sql(f"""
CREATE TABLE IF NOT EXISTS cities
USING DELTA
LOCATION '{delta_table_path}'
""")
spark.sql("select city,country,population,census_date from cities").show()
+-----------+-------+----------+---------------+
| city|country|population|census_date |
+-----------+-------+----------+---------------+
|Los Angeles| USA| 4000000| 2023-01-01|
|Los Angeles| USA| 4100000| 2023-06-01|
| Mumbai| India| 20000000| 2023-01-01|
| Mumbai| India| 21000000| 2023-06-01|
| Tokyo| Japan| 14000000| 2023-01-01|
| Tokyo| Japan| 13950000| 2023-06-01|
+-----------+-------+----------+---------------+
Now, our accidental "delete".
# Perform deletion operation
spark.sql("""
DELETE FROM cities WHERE city = 'Mumbai'
""")
# Verify the deletion
spark.sql("""
SELECT * FROM cities
""").show()
+-----------+-------+----------+---------------+
| city|country|population|census_date |
+-----------+-------+----------+---------------+
|Los Angeles| USA| 4000000| 2023-01-01|
|Los Angeles| USA| 4100000| 2023-06-01|
| Tokyo| Japan| 14000000| 2023-01-01|
| Tokyo| Japan| 13950000| 2023-06-01|
+-----------+-------+----------+---------------+
In the meantime, before we noticed our accidental DELETE, we may have made other changes to the table, such as INSERTing new records etc … but this doesn't affect our data recovery. For example,
# Create new city population data
# after our accidental delete
#
new_cities_data = [
Row(city="London", country="UK", population=8900000, census_date=datetime.date(2023, 1, 1)),
Row(city="New York", country="USA", population=8400000, census_date=datetime.date(2023, 1, 1)),
Row(city="Edinburgh", country="UK", population=500000, census_date=datetime.date(2023, 1, 1))
]
# Convert list of Rows to DataFrame
df_new_cities = spark.createDataFrame(new_cities_data)
# Write the new data to the Delta table
df_new_cities.write.format("delta").mode("append").save(delta_table_path)
# Verify the insertion
spark.sql("""
SELECT * FROM cities
""").show()
+-----------+-------+----------+---------------+
| city|country|population|census_date |
+-----------+-------+----------+---------------+
|Los Angeles| USA| 4000000| 2023-01-01|
|Los Angeles| USA| 4100000| 2023-06-01|
| New York| USA| 8400000| 2023-01-01|
| Edinburgh| UK| 500000| 2023-01-01|
| Tokyo| Japan| 14000000| 2023-01-01|
| Tokyo| Japan| 13950000| 2023-06-01|
| London| UK| 8900000| 2023-01-01|
+-----------+-------+----------+---------------+
The idea is that we can create a temporary table containing our data before the DELETE occurred. This will contain all the deleted records and other valid records that were in the table at the time.
# Query the table state before the deletion
# enter your required timestamp
#
pre_deletion_df = spark.sql(f"""
SELECT *
FROM cities
VERSION AS OF timestamp '2024-06-26T12:34:53.000Z' -- version before the deletion
""")
+-----------+-------+----------+---------------+
| city|country|population|census_date |
+-----------+-------+----------+---------------+
|Los Angeles| USA| 4000000| 2023-01-01|
|Los Angeles| USA| 4100000| 2023-06-01|
| Mumbai| India| 20000000| 2023-01-01|
| Mumbai| India| 21000000| 2023-06-01|
| Tokyo| Japan| 14000000| 2023-01-01|
| Tokyo| Japan| 13950000| 2023-06-01|
+-----------+-------+----------+---------------+
Now we can re-insert whichever deleted records we want, back into the cities table.
# Insert the recovered rows back into
# the original table
#
pre_deletion_df.createOrReplaceTempView("pre_deletion_view")
spark.sql("""
INSERT INTO cities
SELECT * FROM pre_deletion_view WHERE city = 'Mumbai'
""")
# Verify the recovery
spark.sql("""
SELECT * FROM cities
""").show()
+-------+-----------+----------+---------------+
|country| city|population|census_date |
+-------+-----------+----------+---------------+
| Mumbai| India| 20000000| 2023-01-01|
| Mumbai| India| 21000000| 2023-06-01|
| USA|Los Angeles| 4000000| 2023-01-01|
| USA|Los Angeles| 4100000| 2023-06-01|
| USA| New York| 8400000| 2023-01-01|
| UK| Edinburgh| 500000| 2023-01-01|
| Japan| Tokyo| 14000000| 2023-01-01|
| Japan| Tokyo| 13950000| 2023-06-01|
| UK| London| 8900000| 2023-01-01|
+-------+-----------+----------+---------------+
In our second example, assume we have a table where batches of records are being continually inserted daily. It's determined that on a particular date and time in the past (which we know), all the records that were inserted were incorrect and need to be deleted.
So, we have two options available to fix this.
If we can identify all the valid records inserted after the incorrect ones were inserted, we can:-
- create a temporary view containing all these records
- restore the original table to the point just before the invalid records were inserted
- re-insert the valid records from the temporary view into the restored table
Alternatively, we can use the following method if we can uniquely identify all the invalid records.
- copy the invalid records to a temporary view
- use the merge command with the temp view to delete the invalid records
Here's an example using this second method.
# create our data set
#
from pyspark.sql import Row
import datetime
# Create sample DataFrame for the cities table
valid_data_1 = [
Row(city="Los Angeles", country="USA", population=4000000, census_date=datetime.date(2023, 1, 1)),
Row(city="Mumbai", country="India", population=20000000, census_date=datetime.date(2023, 1, 1)),
Row(city="Tokyo", country="Japan", population=14000000, census_date=datetime.date(2023, 1, 1)),
Row(city="London", country="UK", population=12000000, census_date=datetime.date(2023, 1, 1))
]
valid_data_2 = [
Row(city="Los Angeles", country="USA", population=4100000, census_date=datetime.date(2023, 6, 1)),
Row(city="Mumbai", country="India", population=21000000, census_date=datetime.date(2023, 6, 1)),
Row(city="Tokyo", country="Japan", population=13950000, census_date=datetime.date(2023, 6, 1)),
Row(city="London", country="UK", population=12050000, census_date=datetime.date(2023, 6, 1))
]
invalid_data = [
Row(city="Los Angeles", country="USA", population=3000000, census_date=datetime.date(2024, 1, 1)), # Invalid
Row(city="Mumbai", country="India", population=15000000, census_date=datetime.date(2024, 1, 1)), # Invalid
Row(city="Tokyo", country="Japan", population=7000000, census_date=datetime.date(2024, 1, 1)) # Invalid
]
valid_data_3 = [
Row(city="Los Angeles", country="USA", population=4200000, census_date=datetime.date(2024, 6, 19)),
Row(city="Mumbai", country="India", population=22000000, census_date=datetime.date(2024, 6, 19)),
Row(city="Tokyo", country="Japan", population=13850000, census_date=datetime.date(2024, 6, 19)),
Row(city="London", country="UK", population=13150000, census_date=datetime.date(2024, 1, 1))
]
# Convert lists of Rows to DataFrames
df_valid_1 = spark.createDataFrame(valid_data_1)
df_valid_2 = spark.createDataFrame(valid_data_2)
df_invalid = spark.createDataFrame(invalid_data)
df_valid_3 = spark.createDataFrame(valid_data_3)
# Define the path for the Delta table
delta_table_path = "dbfs:/FileStore/tables/cities"
# Write the valid data to the Delta table
df_valid_1.write.format("delta").mode("overwrite").save(delta_table_path)
df_valid_2.write.format("delta").mode("append").save(delta_table_path)
df_invalid.write.format("delta").mode("append").save(delta_table_path)
df_valid_3.write.format("delta").mode("append").save(delta_table_path)
# Register the Delta table
spark.sql(f"""
CREATE TABLE IF NOT EXISTS cities
USING DELTA
LOCATION '{delta_table_path}'
""")
Here is the data set and I've annotated the invalid records.
+--------------+---------+-----------+----------------+
| city | country | population|census_date |
+--------------+---------+-----------+----------------+
| Los Angeles | USA | 4000000 | 2023-01-01 |
| Mumbai | India | 20000000 | 2023-01-01 |
| Tokyo | Japan | 14000000 | 2023-01-01 |
| London | UK | 12000000 | 2023-01-01 |
| Los Angeles | USA | 4100000 | 2023-06-01 |
| Mumbai | India | 21000000 | 2023-06-01 |
| Tokyo | Japan | 13950000 | 2023-06-01 |
| London | UK | 12050000 | 2023-06-01 |
| Los Angeles | USA | 3000000 | 2024-01-01 | <-- Invalid record
| Mumbai | India | 15000000 | 2024-01-01 | <-- Invalid record
| Tokyo | Japan | 7000000 | 2024-01-01 | <-- Invalid record
| London | UK | 13150000 | 2024-01-01 |
| Los Angeles | USA | 4200000 | 2024-06-19 |
| Mumbai | India | 22000000 | 2024-06-19 |
| Tokyo | Japan | 13850000 | 2024-06-19 |
+--------------+---------+-----------+----------------+
Let's assume that the invalid records (and only those) were inserted on 2024–01–17 between 10:15:00 AM and 10:15:59 AM.
1/ Using Timestamp As Of, populate a temporary table with the contents of our table at the point just after the invalid records were inserted.
# Query the state of the table as of 2024-01-17 10:16:00
# This will be ALL our invalid records and some valid records
#
after_invalid = spark.sql(f"""
SELECT *
FROM cities
TIMESTAMP AS OF '2024-01-17T10:16:00.000Z'
""")
after_invalid.createOrReplaceTempView("after_invalid")
spark.sql('select * from after_invalid').show()
+-----------+-------+----------+-----------+
| city|country|population|census_date|
+-----------+-------+----------+-----------+
|Los Angeles| USA| 4000000| 2023-01-01|
| Mumbai| India| 20000000| 2023-01-01|
| Tokyo| Japan| 14000000| 2023-01-01|
| London| UK| 12000000| 2023-01-01|
|Los Angeles| USA| 4100000| 2023-06-01|
| Mumbai| India| 21000000| 2023-06-01|
| Tokyo| Japan| 13950000| 2023-06-01|
| London| UK| 12050000| 2023-06-01|
|Los Angeles| USA| 3000000| 2024-01-01|<-- Invalid record
| Mumbai| India| 15000000| 2024-01-01|<-- Invalid record
| Tokyo| Japan| 7000000| 2024-01-01|<-- Invalid record
+-----------+-------+----------+-----------+
2/ Using Timestamp As Of, populate a second temporary table with the contents of our original table at the point just before the invalid records were inserted.
# Query the state of the table as of 2024-01-17 10:15
# This will contain only valid records
#
before_invalid = spark.sql(f"""
SELECT *
FROM cities
TIMESTAMP AS OF '2024-01-17T10:15:00.000Z'
""")
before_invalid.createOrReplaceTempView("before_invalid")
spark.sql('select * from before_invalid').show()
+-----------+-------+----------+-----------+
| city|country|population|census_date|
+-----------+-------+----------+-----------+
|Los Angeles| USA| 4000000| 2023-01-01|
| Mumbai| India| 20000000| 2023-01-01|
| Tokyo| Japan| 14000000| 2023-01-01|
| London| UK| 12000000| 2023-01-01|
|Los Angeles| USA| 4100000| 2023-06-01|
| Mumbai| India| 21000000| 2023-06-01|
| Tokyo| Japan| 13950000| 2023-06-01|
| London| UK| 12050000| 2023-06-01|
+-----------+-------+----------+-----------+
The first temp table will contain all the invalid records (and some valid records). The second temp table will contain only valid records.
Running an EXCEPT on these two temporary tables will produce invalid records only.
Finally, we can use a Merge statement to delete the invalid records.
# Perform a merge to delete only the invalid records
#
merge_query = """
WITH records_to_delete AS (
SELECT * FROM after_invalid
EXCEPT
SELECT * FROM before_invalid
)
MERGE INTO cities AS target
USING records_to_delete AS source
ON target.city = source.city
AND target.country = source.country
AND target.population = source.population
AND target.census_date = source.census_date
WHEN MATCHED THEN
DELETE
"""
# Execute the MERGE query
spark.sql(merge_query)
# Check the result
spark.sql('select * from cities order by census_date').show()
+-----------+-------+----------+-----------+
| city|country|population|census_date|
+-----------+-------+----------+-----------+
| Tokyo| Japan| 14000000| 2023-01-01|
| Mumbai| India| 20000000| 2023-01-01|
|Los Angeles| USA| 4000000| 2023-01-01|
| London| UK| 12000000| 2023-01-01|
| Mumbai| India| 21000000| 2023-06-01|
| London| UK| 12050000| 2023-06-01|
|Los Angeles| USA| 4100000| 2023-06-01|
| Tokyo| Japan| 13950000| 2023-06-01|
| London| UK| 13150000| 2024-01-01|
| Mumbai| India| 22000000| 2024-06-19|
|Los Angeles| USA| 4200000| 2024-06-19|
| Tokyo| Japan| 13850000| 2024-06-19|
+-----------+-------+----------+-----------+
Note: time travel queries do have certain limitations. For instance, the amount of "history" available for a table defaults to an interval of 30 days but is configurable by setting the table property delta.logRetentionDuration.
Also, you cannot restore a table to an older version where the data files underpinning a table have been deleted, either manually or by the _
vacuum
command. Partial restoration may still be possible, though, ifspark.sql.files.ignoreMissingFiles
is set totrue
_.
Summary
In this article, we've explored in detail the features of Delta table time travel, which allow users to query previous versions of their data.
Many potential uses for time-travel queries were identified, including trend and historical analysis, recording data lineage and fixing data errors.
We showed examples where time-travel queries were used to recover tables to previous versions, to recover accidentally deleted records or to identify and delete incorrectly inserted data. This powerful capability ensures that data recovery and auditing can be performed seamlessly, safeguarding against accidental data loss or corruption.
_OK, that's all for me just now. I hope you found this article useful. If you did, please check out my profile page at this link. From there, you can see my other published stories, follow me or subscribe to get notified when I post new content._
I think you'll find these articles interesting if you like this content.