Orchestrating a Dynamic Time-series Pipeline in Azure
In the previous story, we went through the potential of PySpark on Databricks for time-series data. I encourage you to catch up here to know more. Without configuring a standalone Spark instance, we can ingest static and streaming data, perform transformation, extract useful time-related features, and build visualization using PySpark on Databricks. Its scalability and performance are particularly advantageous when handling complex transformations of enterprise-level data, up to petabytes.
All the feature engineering tasks were successfully performed within a single Databricks notebook. However, this is only a part of the data engineering story when building a data-centric system. The core part of the data pipeline lies in data orchestration.
Data orchestration generally refers to have the centralized control over data flows so that we can automate, manage, and monitor the entire data pipeline.

Azure Data Factory (ADF) with Azure Databricks
To satisfy these needs, one of the most popular solutions in the industry is to run Azure Databricks notebooks from an ADF platform.
ADF is a cloud-based, serverless, and fully managed data integration service. Though Databricks Workflow gives a good alternative that covers some ADF features, there are still several key benefits to choosing ADF. For example, ADF is a mature tool for integrating with diverse data stores using connectors, including SaaS applications like Salesforce, and Big Data sources like Amazon Redshift, and Google BigQuery. Therefore, it works well for ingestion and integration, especially if the current system has complex dependencies with data systems outside of Databricks. Besides, ADF simplifies and facilitates the quick building of basic pipelines using a drag-and-drop and low-code interface.
In this hands-on journey, we will dive deeper into the data engineering project and explore how ADF helps build a dynamic, skeletal data pipeline for time-series data. I will demonstrate how to mount cloud storage on Azure Databricks, transform data by embedding Notebook on Azure Databricks, and dynamically orchestrate data through custom settings in ADF. Let's get started!
The initial setup
There are several cloud components and services in the first place.
#1 Create an Azure resource group
This container is used to hold and group the resources for an Azure solution. We will place our necessary cloud service components in this logical group for easier building or deployment.

#2 Create an Azure Data Lake Gen 2 storage account
You can choose a suitable storage account based on the requirements of performance and replication. In the advanced tab, we enable the Hierarchical Namespace to set up the Data Lake Storage Gen 2. This allows for storing both structured and unstructured data.

#3 Set up Azure Databricks service
If you have used Databricks before, Azure Databricks service is largely the same. Besides, it is natively integrated with other Azure services and provides a unified billing platform. There are two tiers: (1) Standard – sufficient for our proof-of-concept here; and (2) Premium – the features of the Standard tier, with additionally the Unity Catalog and the advanced networking features that may be necessary for a large enterprise with multiple Databricks workspaces.

#4 Register an application
This service will help mount Azure storage to Databricks, so make sure you note the application ID and tenant ID, and most importantly the app secret value, which cannot be viewed when you revisit it.



Afterward, grant the app service access rights to the app service. This is achieved by assigning the "Storage Blob Data Contributor" role to the app we just registered.



#5 Create Azure SQL Database
To store a transformed data frame, we search for Azure SQL resources and pick a "Single database" as the resource type. There are choices of SQL database servers with different computing hardware, maximum data size, and more. You can instantly get the estimated cost summary while adjusting the server specifications.


After all the initial setups, you are ready to explore how these services are linked together.
Prepare for data orchestrating pipeline
#1 Ingest data
We first upload the electric power consumption data to Azure Data Lake Gen2. This dataset [with license as Database: Open Database, Contents: Database Contents], obtained from Kaggle, is sampled at a one-minute rate from December 2006 to November 2010.

Next, we create a Notebook on the Azure Databricks workspace and mount storage by defining the parameters using previously stored ID values.
# Define the configuration specifications
configs = {"fs.azure.account.auth.type": "OAuth",
"fs.azure.account.oauth.provider.type": "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
"fs.azure.account.oauth2.client.id": "",
"fs.azure.account.oauth2.client.secret": "",
"fs.azure.account.oauth2.client.endpoint": "https://login.microsoftonline.com//oauth2/token"
}
dbutils.fs.mount(
source = "abfss://[email protected]/", # URI of the object storage
mount_point = "/mnt/adlstsdp/input", # local path in the /mnt directory
extra_configs = configs)
To verify file access, we can run the following command:
dbutils.fs.ls("/mnt/adlstsdp/input")
# Output: [FileInfo(path='dbfs:/mnt/adlstsdp/input/household_power_consumption.csv', name='household_power_consumption.csv', size=132960755, modificationTime=1716798010000)]
#2 Embed Notebook on Azure Databricks
Most of the source codes in this section build upon my previous story. The idea is to perform data cleansing, transformation, and feature engineering (create time-related and moving averaging features). The transformed data is ultimately written to the Azure database table.
You can check the complete code below to see the implementation.
# Define file location, file typem and CSV options
file_location = "/mnt/adlstsdp/input/household_power_consumption.csv"
file_type = "csv"
schema = "Date STRING, Time STRING, Global_active_power DOUBLE, Global_reactive_power DOUBLE, Voltage DOUBLE, Global_intensity DOUBLE, Sub_metering_1 DOUBLE, Sub_metering_2 DOUBLE, Sub_metering_3 DOUBLE"
first_row_is_header = "true"
delimiter = ";"
# Read CSV files
org_df = Spark.read.format(file_type)
.schema(schema)
.option("header", first_row_is_header)
.option("delimiter", delimiter)
.load(file_location)
# Data cleansing and transformation
from pyspark.sql.functions import *
cleaned_df = org_df.na.drop()
cleaned_df = cleaned_df.withColumn("Date", to_date(col("Date"),"d/M/y"))
cleaned_df = cleaned_df.withColumn("Date", cleaned_df["Date"].cast("date"))
cleaned_df = cleaned_df.select(concat_ws(" ", to_date(col("Date"),"d/M/y"), col("Time")).alias("DateTime"), "*")
cleaned_df = cleaned_df.withColumn("DateTime", cleaned_df["DateTime"].cast("timestamp"))
df = cleaned_df.groupby("Date").agg(
round(sum("Global_active_power"), 2).alias("Total_global_active_power"),
).sort(["Date"])
# Add time-related features
df = df.withColumn("year", year("Date"))
df = df.withColumn("month", month("Date"))
df = df.withColumn("week_num", weekofyear("Date"))
# Add lagged value features of total global active power
from pyspark.sql.window import Window
from pyspark.sql.functions import lag
windowSpec = Window.orderBy("Date")
df = df.withColumn("power_lag1", round(lag(col("Total_global_active_power"), 1).over(windowSpec), 2))
# Create delta field
df = df.withColumn("power_lag1_delta", round(col("power_lag1") - col("Total_global_active_power"), 2))
# Create window average fields
def add_window_avg_fields(df, window_sizes):
for idx, window_size in enumerate(window_sizes, start=1):
window_col_name = f"avg_power_lag_{idx}"
windowSpec = Window.orderBy("Date").rowsBetween(-window_size, 0)
df = df.withColumn(window_col_name, round(avg(col("Total_global_active_power")).over(windowSpec), 2))
return df
window_sizes = [14, 30]
df = add_window_avg_fields(df, window_sizes)
# Create Exponentially Weighted Moving Average (EWMA) fields
import pyspark.pandas as ps
ps.set_option('compute.ops_on_diff_frames', True)
def add_ewma_fields(df, alphas):
for idx, alpha in enumerate(alphas, start=1):
ewma_col_name = f"ewma_power_weight_{idx}"
windowSpec = Window.orderBy("Date")
df[ewma_col_name] = df.Total_global_active_power.ewm(alpha=alpha).mean().round(2)
return df
alphas = [0.2, 0.8]
df_pd = df.pandas_api()
df_pd = add_ewma_fields(df_pd, alphas)
df = df_pd.to_spark()
# Write transformed dataframe to the database table "electric_usage_table"
df.write.format("jdbc")
.option("url", "jdbc:sqlserver://sql-db-dp.database.windows.net:1433;databaseName=sql-db-dp")
.option("dbtable", "dbo.electric_usage_table")
.option("user", "")
.option("password", "")
.mode("overwrite")
.save()
#3 Build a basic pipeline in ADF
In ADF, we add a "Notebook" activity to the pipeline environment, then configure it to reference the desired Notebook in the Databricks folder. Set up the Databricks linked service, then validate and publish the entire activity pipeline in ADF. You can then run the pipeline in "Debug" mode.

The activity status shows "Succeeded", meaning the data should be migrated and inserted into the Azure SQL Database table. We can view the results for verification using the query editor.

#4 Automate the pipeline
ADF offers functionalities that are far beyond the above simple implementation. For example, we can automate the pipeline by creating a storage-based event trigger. Make sure that Microsoft.EventGrid
is registered as one of the resource providers in your account subscription, then set up the trigger: Whenever a new dataset is uploaded to the storage account, the pipeline will automatically execute.

This type of trigger has various use cases in the industries, such as monitoring the inventory level to replenish orders for the supply chain or tracking customer interactions for personalized recommendations in digital marketing.
#5 Parameterize the Notebook variables
To take a further step to build a more dynamic data pipeline, we can make variables more parametric. For example, during feature engineering on time-series data, the window size of data features may not be optimized initially. The window sizes may need to be adjusted to capture seasonal patterns or based on downstream model fine-tuning. For this scenario, we can amend with the below settings.

In the Notebook, add the code below to create a widget that can get the parameters input from the ADF pipeline:
# Additional code: Access the current value of the widget
inputWindowSizes = dbutils.widgets.get("inputWindowSizes")
window_sizes = inputWindowSizes.split(",")
# Original function for adding window average features
df = add_window_avg_fields(df, window_sizes)
After adjusting the settings and the Notebook codes, we can run the pipeline by providing the window size parameter values, such as 30 and 60.

Finally, we can monitor the pipeline status again using ADF or Databricks workspace.
Wrapping it up
In our hands-on exploration, we mainly used ADF with Azure Databricks to orchestrate a dynamic time-series data pipeline:
- Setup the cloud resources for the compute, analytics, and storage
- Build the skeleton of the data pipeline from data ingestion to storage
- Bring flexibilities to the pipeline by creating triggers and parameterizing variables
At the enterprise level, more complex cloud architectures may be implemented to satisfy evolving needs, such as streaming data, model monitoring, and multi-model pipelines. It thus becomes essential to strive for a delicate balance between performance, reliability, and cost-efficiency through team collaboration on governance policies and spending management.
Before you go
If you enjoy this reading, I invite you to **** follow my Medium page and LinkedIn page. By doing so, you can stay updated with exciting content related to data science side projects and Machine Learning Operations (MLOps) demonstration methodologies.