Mastering Data Streaming in Python

Author:Murphy  |  View: 22781  |  Time: 2025-03-23 11:49:08

In this article, I will address the key challenges data engineers may encounter when designing streaming data pipelines. We'll explore use case scenarios, provide Python code examples, discuss windowed calculations using streaming frameworks, and share best practices related to these topics.

In many applications, having access to real-time and continuously updated data is crucial. Fraud detection, churn prevention and recommendations are the best candidates for streaming. These data pipelines process data from various sources to multiple target destinations in real time, capturing events as they occur and enabling their transformation, enrichment, and analysis.


Streaming data pipeline

In one of my previous articles, I described the most common data pipeline design patterns and when to use them [1].

Data pipeline design patterns

A data pipeline is a sequence of data processing steps, where each stage's output becomes the input for the next, creating a logical flow of data.

A data pipeline exists whenever data is processed between two points, such as from source to destination

The three key components of a data pipeline are the source, the processing step(s), and the destination. For example, data extracted from an external API (source) can be loaded into a data warehouse (destination), illustrating a common scenario where the source and destination are distinct.

In streaming, the source is typically a publisher service, while the destination is a consumer – such as an application or another endpoint – of the processed data. This data often undergoes transformations using windowed calculations. A great example would be a session window defined by an inactivity period following the last event (Google Analytics 4, etc).

Conceptual data pipeline design. Image by author

Actual application example

Consider a simple stream data processing application example built with Python, Kafka and Faust below. High-level application logic would be the following:

  1. API service app/app_main.py allows to POST valid user engagement events to Kafka producer topic. These events can be collected either from the website, a mobile application or sent by another service such as a data publisher of some sort.
{
    "event_type": "page_view",
    "user_id": "e659e3e7-22e1-4a6b",
    "action": "alternative_handset",
    "timestamp": "2024-06-27T15:43:43.315342",
    "metadata": {
        "session_id": "4b481fd1-9973-4498-89fb",
        "page": "/search",
        "item_id": "05efee91",
        "user_agent": "Opera/8.81.(X11; Linux x86_64; hi-IN) Presto/2.9.181 Version/12.00"
    }
}

Event validation can be performed by pydantic and accepts events with valid types and actions, etc. [2]

Our application constantly consumes processed events from consumer topics and sends them to WebSocket so real-time processing can be visualized.

Python for Data Engineers

  1. Data processing service consumes raw events from producer topic, then applies a window calculation (tumbling table) and sends aggregated results by user to consumer topic every 10 seconds. This can be some sort of a streaming framework like kafka-streams or faust .
  2. This stream of constantly processed data can be instantly consumed by API service and visualized at localhost:8000/monitor.

For example, we can process raw user engagement events every 10 seconds to generate a user leaderboard based on a simple count of events per user

Streaming leaderboard app example. Image by author.
Python">import os
import json
import logging
import asyncio
import uvicorn
from contextlib import asynccontextmanager
from fastapi import FastAPI, WebSocket, HTTPException
from fastapi.staticfiles import StaticFiles
from aiokafka import AIOKafkaConsumer, AIOKafkaProducer
from app.models import Event

from .http import events, users
from dotenv import load_dotenv
load_dotenv()

logging.basicConfig(level=logging.INFO)

# kafka_brokers = os.getenv("REDPANDA_BROKERS")
kafka_brokers = (
    "redpanda-0:9092"
    if os.getenv("RUNTIME_ENVIRONMENT") == "DOCKER"
    else "localhost:19092"
)
consumer_topic = os.getenv("CONSUMER_TOPIC")
producer_topic = os.getenv("PRODUCER_TOPIC")
error_topic = os.getenv("ERROR_TOPIC")

def kafka_json_deserializer(serialized):
    return json.loads(serialized)

@asynccontextmanager
async def startup(app):
    app.producer = AIOKafkaProducer(
        bootstrap_servers=[kafka_brokers],)
    await app.producer.start()

    app.consumer = AIOKafkaConsumer(
        consumer_topic,
        # "agg-events",
        # group_id="demo-group",
        # loop=loop,
        bootstrap_servers=[kafka_brokers],
        enable_auto_commit=True,
        auto_commit_interval_ms=1000,  # commit every second
        auto_offset_reset="earliest",  # If committed offset not found, start from beginning
        value_deserializer=kafka_json_deserializer,
    )
    await app.consumer.start()

    yield

app = FastAPI(lifespan=startup)
app.mount("/static", StaticFiles(directory="static"), name="static")

app.include_router(events.router)
app.include_router(users.router)

# WebSocket endpoint
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
    await websocket.accept()

    async def send_message_to_websocket(msg):
        text = str(msg.value)
        await websocket.send_text(text)

    async def consume_from_topic(topic, callback):
        print(f"Consuming from {topic}")
        async for msg in app.consumer:
            print(f"Received message: {msg.value}")
            await callback(msg)

    # Start consuming
    asyncio.create_task(consume_from_topic(consumer_topic, send_message_to_websocket))

    # Keep the connection open
    while True:
        await asyncio.sleep(3)

@app.post("/track")
async def send_event_to_topic(event: Event):

    try:
        data = event.model_dump_json()
        data = data.encode()

        # # Validate the presence of required fields
        # We could do something like this but Pydantic will do
        # everything for us.
        # if "user_id" not in data or "action" not in data:
        #     raise HTTPException(
        #         status_code=422, detail="Incomplete data provided")
        user_id = event.user_id

        # Send filename to Redpanda
        await app.producer.send(producer_topic, data)
        # Returning a confirmation message
        return {"message": "User data submitted successfully!",
                "user_data": {"user_id": user_id}}

    except HTTPException as e:
        # Re-raise HTTPException to return the specified
        # status code and detail
        print(e)
        raise e
    except Exception as e:
        # Handle other unexpected exceptions and return a
        # 500 Internal Server Error
        print(e)
        raise HTTPException(
            status_code=500, detail=f"An error occurred: {str(e)}")

if __name__ == "__main__":
    uvicorn.run("app.app_main:app", host="0.0.0.0", port=8000)

# Run:
# uvicorn app.app_main:app --reload --host 0.0.0.0 --port 8000
# python -m app.app_main api -l info

The code for the stream-processing service can be found further down below.

Kafka, Kinesis, RabbitMQ and other stream-processing tools

Let's take a look into popular data Streaming platforms and frameworks that proved themselves most useful over the last couple of years.

  • Apache Spark – a framework for distributed data computing for large-scale analytics and complex data transformations.
  • Apache Kafka – a real-time data pipeline tool with a distributed messaging system for applications. It uses a publish-subscribe model where producers send data to topics, and consumers pull data from those topics. Each topic is split into partitions that are replicated across different servers for better availability and to balance the load. Plus, Kafka has built-in fault tolerance, so you can set the replication factor and how many in-sync replicas (ISRs) you want for each topic. This means your data stays accessible, even if some servers go down.
  • AWS Kinesis is a real-time streaming platform for analytics and applications. I previously wrote about it here [3].

Building a Streaming Data Pipeline with Redshift Serverless and Kinesis

  • Google Cloud Dataflow – Google's streaming platform for real-time event processing and analytics pipelines.
  • Apache Flink – a distributed streaming data platform designed for low-latency data processing.
  • RabbitMQ is an open-source message broker that facilitates communication between applications. It uses a queuing system based on the Advanced Message Queuing Protocol (AMQP), allowing you to send, receive, and route messages efficiently. RabbitMQ helps decouple application components, enabling them to work independently and scale easily.

Kafka is one of my favourite distributed streaming platforms that lets you publish and subscribe to data streams, process them in real time, and store them reliably. It was built for Java but is now also available for Python developers (kafka-python)).

One thing I like about it is a built-in window methods that simplify session calculations.

For example, using a faust-streamingframework this can be achieved with ease. Consider this code below. It demonstrates how our application would calculate a windowed aggregation for each user:


import os
import random
from datetime import datetime, timedelta
import faust
SINK = os.getenv("CONSUMER_TOPIC")
TOPIC = os.getenv("PRODUCER_TOPIC")

BROKER = (
    "redpanda-0:9092"
    if os.getenv("RUNTIME_ENVIRONMENT") == "DOCKER"
    else "localhost:19092"
)

TABLE = "tumbling-events"
CLEANUP_INTERVAL = 1.0
WINDOW = 10  # 10 seconds window
WINDOW_EXPIRES = 10
PARTITIONS = 1

app = faust.App("event-stream", broker=f"kafka://{BROKER}")

app.conf.table_cleanup_interval = CLEANUP_INTERVAL
source = app.topic(TOPIC, value_type=Event)
sink = app.topic(SINK, value_type=UserStats)

@app.timer(interval=3.0, on_leader=True)
async def generate_event_data():
    events_topic = app.topic(TOPIC, key_type=str, value_type=Event)
    allowed_events = [e.value for e in AllowedEvents]
    allowed_actions = [e.value for e in AllowedActions]

    # Create a loop to send data to the Redpanda topic
    # Send 20 messages every time the timer is triggered (every 5 seconds)
    for i in range(20):
        # Send the data to the Redpanda topic

        await events_topic.send(
            key=random.choice(["User1", "User2", "User3", "User4", "User5"]),
            value=Event(
                event_type=random.choice(["page_view", "scroll"]),
                user_id=random.choice(["User1", "User2", "User3", "User4", "User5"]),  # noqa: E501
                action=random.choice(["action_1", "action_2"]),
                timestamp=datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f"),

            ),
        )

    print("Producer is sleeping for 3 seconds          

Tags: Data Engineering Data Science Kafka Python Streaming

Comment