Orchestrating a Dynamic Time-series Pipeline in Azure

Author:Murphy  |  View: 20801  |  Time: 2025-03-22 21:32:02

In the previous story, we went through the potential of PySpark on Databricks for time-series data. I encourage you to catch up here to know more. Without configuring a standalone Spark instance, we can ingest static and streaming data, perform transformation, extract useful time-related features, and build visualization using PySpark on Databricks. Its scalability and performance are particularly advantageous when handling complex transformations of enterprise-level data, up to petabytes.

All the feature engineering tasks were successfully performed within a single Databricks notebook. However, this is only a part of the data engineering story when building a data-centric system. The core part of the data pipeline lies in data orchestration.

Data orchestration generally refers to have the centralized control over data flows so that we can automate, manage, and monitor the entire data pipeline.

Photo by Julio Rionaldo on Unsplash

Azure Data Factory (ADF) with Azure Databricks

To satisfy these needs, one of the most popular solutions in the industry is to run Azure Databricks notebooks from an ADF platform.

ADF is a cloud-based, serverless, and fully managed data integration service. Though Databricks Workflow gives a good alternative that covers some ADF features, there are still several key benefits to choosing ADF. For example, ADF is a mature tool for integrating with diverse data stores using connectors, including SaaS applications like Salesforce, and Big Data sources like Amazon Redshift, and Google BigQuery. Therefore, it works well for ingestion and integration, especially if the current system has complex dependencies with data systems outside of Databricks. Besides, ADF simplifies and facilitates the quick building of basic pipelines using a drag-and-drop and low-code interface.


In this hands-on journey, we will dive deeper into the data engineering project and explore how ADF helps build a dynamic, skeletal data pipeline for time-series data. I will demonstrate how to mount cloud storage on Azure Databricks, transform data by embedding Notebook on Azure Databricks, and dynamically orchestrate data through custom settings in ADF. Let's get started!

The initial setup

There are several cloud components and services in the first place.

#1 Create an Azure resource group

This container is used to hold and group the resources for an Azure solution. We will place our necessary cloud service components in this logical group for easier building or deployment.

Azure resource group (Image by author)

#2 Create an Azure Data Lake Gen 2 storage account

You can choose a suitable storage account based on the requirements of performance and replication. In the advanced tab, we enable the Hierarchical Namespace to set up the Data Lake Storage Gen 2. This allows for storing both structured and unstructured data.

Storage account (Image by author)

#3 Set up Azure Databricks service

If you have used Databricks before, Azure Databricks service is largely the same. Besides, it is natively integrated with other Azure services and provides a unified billing platform. There are two tiers: (1) Standard – sufficient for our proof-of-concept here; and (2) Premium – the features of the Standard tier, with additionally the Unity Catalog and the advanced networking features that may be necessary for a large enterprise with multiple Databricks workspaces.

Azure Databricks workspace (Image by author)

#4 Register an application

This service will help mount Azure storage to Databricks, so make sure you note the application ID and tenant ID, and most importantly the app secret value, which cannot be viewed when you revisit it.

App registration – Setting (Image by author)
App registration – Info (Image by author)
App registration – Client secret (Image by author)

Afterward, grant the app service access rights to the app service. This is achieved by assigning the "Storage Blob Data Contributor" role to the app we just registered.

Storage account – Grant access right (1/3) (Image by author)
Storage account – Grant access right (2/3) (Image by author)
Storage account – Grant access right (3/3) (Image by author)

#5 Create Azure SQL Database

To store a transformed data frame, we search for Azure SQL resources and pick a "Single database" as the resource type. There are choices of SQL database servers with different computing hardware, maximum data size, and more. You can instantly get the estimated cost summary while adjusting the server specifications.

Create SQL Database (1/2) (Image by author)
Create SQL Database (2/2) (Image by author)

After all the initial setups, you are ready to explore how these services are linked together.

Prepare for data orchestrating pipeline

#1 Ingest data

We first upload the electric power consumption data to Azure Data Lake Gen2. This dataset [with license as Database: Open Database, Contents: Database Contents], obtained from Kaggle, is sampled at a one-minute rate from December 2006 to November 2010.

Upload input data (Image by author)

Next, we create a Notebook on the Azure Databricks workspace and mount storage by defining the parameters using previously stored ID values.

# Define the configuration specifications
configs = {"fs.azure.account.auth.type": "OAuth",
"fs.azure.account.oauth.provider.type": "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
"fs.azure.account.oauth2.client.id": "",
"fs.azure.account.oauth2.client.secret": "",
"fs.azure.account.oauth2.client.endpoint": "https://login.microsoftonline.com//oauth2/token"
}

dbutils.fs.mount(
  source = "abfss://[email protected]/", # URI of the object storage
  mount_point = "/mnt/adlstsdp/input",  # local path in the /mnt directory
  extra_configs = configs)

To verify file access, we can run the following command:

dbutils.fs.ls("/mnt/adlstsdp/input")

# Output: [FileInfo(path='dbfs:/mnt/adlstsdp/input/household_power_consumption.csv', name='household_power_consumption.csv', size=132960755, modificationTime=1716798010000)]

#2 Embed Notebook on Azure Databricks

Most of the source codes in this section build upon my previous story. The idea is to perform data cleansing, transformation, and feature engineering (create time-related and moving averaging features). The transformed data is ultimately written to the Azure database table.

You can check the complete code below to see the implementation.

# Define file location, file typem and CSV options
file_location = "/mnt/adlstsdp/input/household_power_consumption.csv"
file_type = "csv"
schema = "Date STRING, Time STRING, Global_active_power DOUBLE, Global_reactive_power DOUBLE, Voltage DOUBLE, Global_intensity DOUBLE, Sub_metering_1 DOUBLE, Sub_metering_2 DOUBLE, Sub_metering_3 DOUBLE"
first_row_is_header = "true"
delimiter = ";"

# Read CSV files
org_df = Spark.read.format(file_type) 
  .schema(schema) 
  .option("header", first_row_is_header) 
  .option("delimiter", delimiter) 
  .load(file_location)

# Data cleansing and transformation
from pyspark.sql.functions import *
cleaned_df = org_df.na.drop()
cleaned_df = cleaned_df.withColumn("Date", to_date(col("Date"),"d/M/y"))
cleaned_df = cleaned_df.withColumn("Date", cleaned_df["Date"].cast("date"))
cleaned_df = cleaned_df.select(concat_ws(" ", to_date(col("Date"),"d/M/y"), col("Time")).alias("DateTime"), "*")
cleaned_df = cleaned_df.withColumn("DateTime", cleaned_df["DateTime"].cast("timestamp"))
df = cleaned_df.groupby("Date").agg(
    round(sum("Global_active_power"), 2).alias("Total_global_active_power"),
    ).sort(["Date"])

# Add time-related features
df = df.withColumn("year", year("Date"))
df = df.withColumn("month", month("Date"))
df = df.withColumn("week_num", weekofyear("Date"))

# Add lagged value features of total global active power
from pyspark.sql.window import Window
from pyspark.sql.functions import lag

windowSpec = Window.orderBy("Date")
df = df.withColumn("power_lag1", round(lag(col("Total_global_active_power"), 1).over(windowSpec), 2))

# Create delta field
df = df.withColumn("power_lag1_delta", round(col("power_lag1") - col("Total_global_active_power"), 2))

# Create window average fields
def add_window_avg_fields(df, window_sizes):
    for idx, window_size in enumerate(window_sizes, start=1):
        window_col_name = f"avg_power_lag_{idx}"
        windowSpec = Window.orderBy("Date").rowsBetween(-window_size, 0)
        df = df.withColumn(window_col_name, round(avg(col("Total_global_active_power")).over(windowSpec), 2))
    return df

window_sizes = [14, 30]
df = add_window_avg_fields(df, window_sizes)

# Create Exponentially Weighted Moving Average (EWMA) fields
import pyspark.pandas as ps
ps.set_option('compute.ops_on_diff_frames', True)

def add_ewma_fields(df, alphas):
    for idx, alpha in enumerate(alphas, start=1):
        ewma_col_name = f"ewma_power_weight_{idx}"
        windowSpec = Window.orderBy("Date")
        df[ewma_col_name] = df.Total_global_active_power.ewm(alpha=alpha).mean().round(2)
    return df

alphas = [0.2, 0.8]
df_pd = df.pandas_api()
df_pd = add_ewma_fields(df_pd, alphas)
df = df_pd.to_spark()

# Write transformed dataframe to the database table "electric_usage_table"
df.write.format("jdbc") 
    .option("url", "jdbc:sqlserver://sql-db-dp.database.windows.net:1433;databaseName=sql-db-dp") 
    .option("dbtable", "dbo.electric_usage_table") 
    .option("user", "") 
    .option("password", "") 
    .mode("overwrite")  
    .save()

#3 Build a basic pipeline in ADF

In ADF, we add a "Notebook" activity to the pipeline environment, then configure it to reference the desired Notebook in the Databricks folder. Set up the Databricks linked service, then validate and publish the entire activity pipeline in ADF. You can then run the pipeline in "Debug" mode.

The success status of pipeline run (Image by author)

The activity status shows "Succeeded", meaning the data should be migrated and inserted into the Azure SQL Database table. We can view the results for verification using the query editor.

Query result of the Azure SQL database (Image by author)

#4 Automate the pipeline

ADF offers functionalities that are far beyond the above simple implementation. For example, we can automate the pipeline by creating a storage-based event trigger. Make sure that Microsoft.EventGrid is registered as one of the resource providers in your account subscription, then set up the trigger: Whenever a new dataset is uploaded to the storage account, the pipeline will automatically execute.

Set a new trigger in ADF (Image by author)

This type of trigger has various use cases in the industries, such as monitoring the inventory level to replenish orders for the supply chain or tracking customer interactions for personalized recommendations in digital marketing.

#5 Parameterize the Notebook variables

To take a further step to build a more dynamic data pipeline, we can make variables more parametric. For example, during feature engineering on time-series data, the window size of data features may not be optimized initially. The window sizes may need to be adjusted to capture seasonal patterns or based on downstream model fine-tuning. For this scenario, we can amend with the below settings.

Set up parameters for pipeline run (Image by author)

In the Notebook, add the code below to create a widget that can get the parameters input from the ADF pipeline:

# Additional code: Access the current value of the widget
inputWindowSizes = dbutils.widgets.get("inputWindowSizes")
window_sizes = inputWindowSizes.split(",")

# Original function for adding window average features
df = add_window_avg_fields(df, window_sizes)

After adjusting the settings and the Notebook codes, we can run the pipeline by providing the window size parameter values, such as 30 and 60.

Input window size value for pipeline run (Image by author)

Finally, we can monitor the pipeline status again using ADF or Databricks workspace.

Wrapping it up

In our hands-on exploration, we mainly used ADF with Azure Databricks to orchestrate a dynamic time-series data pipeline:

  • Setup the cloud resources for the compute, analytics, and storage
  • Build the skeleton of the data pipeline from data ingestion to storage
  • Bring flexibilities to the pipeline by creating triggers and parameterizing variables

At the enterprise level, more complex cloud architectures may be implemented to satisfy evolving needs, such as streaming data, model monitoring, and multi-model pipelines. It thus becomes essential to strive for a delicate balance between performance, reliability, and cost-efficiency through team collaboration on governance policies and spending management.


Before you go

If you enjoy this reading, I invite you to **** follow my Medium page and LinkedIn page. By doing so, you can stay updated with exciting content related to data science side projects and Machine Learning Operations (MLOps) demonstration methodologies.

Performing Customer Analytics with LangChain and LLMs

Managing the Technical Debts of Machine Learning Systems

Tags: Azure Big Data Data Engineering Databricks Spark

Comment