Apache Beam: Data Processing, Data Pipelines, Dataflow and Flex Templates
In this first article, we're exploring Apache Beam, from a simple pipeline to a more complicated one, using GCP Dataflow. Let's learn what PTransform
, PCollection
, GroupByKey
and Dataflow Flex Template mean

An Introduction to Apache Beam
Without any doubt, processing data, creating features, moving data around, and doing all these operations within a safe environment, with stability and in a computationally efficient manner, is super relevant for all AI tasks nowadays. Back in the day, Google started to develop an open-source project to start both batching and streaming data processing operations, named Beam. Following, Apache Software Foundation has started to contribute to this project, bringing to scale Apache Beam.
The relevant key of Apache Beam is its flexibility, making it one of the best programming SDKs for building data processing pipelines. I would recognise 4 main concepts in Apache Beam, that make it an invaluable data tool:
- Unified model for batching/ streaming processing: Beam is a unified programming model, namely with the same Beam code you can decide whether to process data in batch or streaming mode, and the pipeline can be used as a template for other new processing units. Beam can automatically ingest a continuous stream of data or perform specific operations on a given batch of data.
- Parallel Processing: The efficient and scalable data processing core starts from the parallelization of the execution of the data processing pipelines, that distribute the workload across multiple "workers" – a worker can be intended as a node. The key concept for parallel execution is called "
ParDo
transform", which takes a function that processes individual elements and applies it concurrently across multiple workers. The great thing about this implementation is that you do not have to worry about how to split data or create batch-loaders. Apache Beam will do everything for you. - Data pipelines: Given the two aspects above, a data pipeline can be easily created in a few lines of code, from the data ingestion to the output results.
Python, Java, and a simple example to start
Apache Beam SDK is present in two languages: Python and Java. The Java SDK has been present since the entire project started, it guarantees the best flexibility, feature richness, and strong support from the community, for writing data pipelines. The Python SDK provides the same features as the Java counterpart, although some implementations are not ported yet from the Java side. However, Python gives the power of creating efficient data pipelines, exploiting a tremendous computational power, in a scalable way, to perform any hard data processing operation.
To get our hands dirty and understand better the capabilities of Apache Beam, let's start with a simple example – this is the most famous example everyone starts with, a word counter, on the input text file "King Lear". In this article, we'll see the Python SDK only, but if you're interested and if there are enough requests I'll be more than happy to set up an article for the Java part.
Before starting, create your virtual environment, and install pip install apache-beam
and pip install apache-beam[gcp]
. The latter will grant you all the GCP packages to download the input data from Google Cloud Storage (or GCS, needed for this example, to download the King Lear data). Then, just copy and paste this code:
import argparse
import logging
import re
import apache_beam as beam
from apache_beam.io import ReadFromText
from apache_beam.io import WriteToText
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
class WordExtractingDoFn(beam.DoFn):
""" This is the transform, that extracts the input words
Args:
element: A worker's element
Return:
words in the element text
"""
def process(self, element):
"""Returns an iterator over the words of this element.
The element is a line of text. If the line is blank, note that, too.
Args:
element: the element being processed
Returns:
The processed element.
"""
return re.findall(r'[w']+', element, re.UNICODE)
def run(argv=None, save_main_session=True):
""" Main entry point; defines and runs the wordcount pipeline."""
parser = argparse.ArgumentParser()
parser.add_argument(
'--input',
dest='input',
default='gs://Dataflow-samples/shakespeare/kinglear.txt',
help='Input file to process.')
parser.add_argument(
'--output',
dest='output',
required=True,
help='Output file to write results to.')
known_args, pipeline_args = parser.parse_known_args(argv)
# We use the save_main_session option because one or more DoFn's in this
# workflow relies on global context (e.g., a module imported at module level).
pipeline_options = PipelineOptions(pipeline_args)
pipeline_options.view_as(SetupOptions).save_main_session = save_main_session
# The pipeline will be run on exiting the with block.
with beam.Pipeline(options=pipeline_options) as p:
# Read the text file[pattern] into a PCollection.
lines = p | 'Read' >> ReadFromText(known_args.input)
counts = (
lines
| 'Split' >> (beam.ParDo(WordExtractingDoFn()).with_output_types(str))
| 'PairWithOne' >> beam.Map(lambda x: (x, 1))
| 'GroupAndSum' >> beam.CombinePerKey(sum))
# Format the counts into a collection of strings.
def format_result(word, count):
return '%s: %d' % (word, count)
output = counts | 'Format' >> beam.MapTuple(format_result)
# Write the output using a "Write" transform that has side effects.
# pylint: disable=expression-not-assigned
output | 'Write' >> WriteToText(known_args.output)
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
run()
Then, you can run this code on your local environment as: python wordcount.py --output "output_from_beam.txt"
. After a weird output message, you'll see output_from_beam.txt
being populated as:
KING: 243
LEAR: 236
DRAMATIS: 1
PERSONAE: 1
king: 65
of: 447
Britain: 2
OF: 15
FRANCE: 10
DUKE: 3
BURGUNDY: 8
CORNWALL: 63
ALBANY: 67
EARL: 2
KENT: 156
GLOUCESTER: 141
...
Let's highlight the main Beam concepts from the code above
- There's a context manager for calling the pipeline into action, and from there the pipeline's steps are separated with a vertical bar:
with beam.Pipeline(options=pipeline_options) as p:
# start the pipeline p
lines = p | 'Read' >> ReadFromText(known_args.input)
# call a second step
second_step = lines
| 'Split' >> (beam.ParDo(WordExtractingDoFn()).with_output_types(str))
...
- Each pipeline's steps has a name, e.g.
p | 'Read'
Read is the name of the component, and calls a specific function with>>
- Each function in the pipeline returns a specific Beam object, called
PCollection
. APCollection
is a piece of the input data, processed by a defined function. For example, inp | 'Read' >> ReadFromText(known_args.input)
returns one line of text from the given input file. Each output is aString
, so the output from this step isPCollection
. - All the
PCollection
, or collections, of data, can be read by the next steps in the pipeline. To read correctly and efficiently all these input data we need to process them in parallel. For custom functions, e.g.WordExtractingDoFn
we can use two elements. Firstly, the wrapperbeam.ParDo
, or parallel-do. This is a "Beam transform", and it maps each input element fromPCollection
to some processing function. The second element isbeam.DoFn
, or do-function. From here: - Each custom function that processes data is called "transform". The function must be defined as a Python object, rather than a function, thus the
class
. Each class must have aprocess
method, where the processing logic is implemented. Theprocess
method receiveselement
, that's the input element, and returns an iterable with the function output values.
A more complicated example
Let's turn our attention to a more complicated example, to appreciate the power of Apache Beam. In particular, we're going to create a code that reads a log file. The log file has 1,000,000 entries. These entries are parsed and analysed, grouped by key and the average duration of each log event is computed as an output.
To create the log file, you can use this script:
import random
from datetime import datetime, timedelta
def generate_log_entry():
timestamp = datetime(2022, 1, 1, 12, 0, 0) + timedelta(minutes=random.randint(0, 1440))
event_type = f"event_type_{random.choice(['A', 'B', 'C'])}"
duration = round(random.uniform(5.0, 30.0), 2)
return f"{timestamp.isoformat()},{event_type},{duration}"
if __name__ == '__main__':
output_file = 'log_entries_large.txt'
with open(output_file, 'w') as file:
for _ in range(1000000):
file.write(generate_log_entry() + 'n')
print(f"Generated log entries file: {output_file}")
The script saves a file called log_entries_large.txt
. It created 1M lines. Each line represents a logged event, with a timestamp, event type, and duration of the event:
2022-01-02T00:36:00,event_type_A,18.52
2022-01-01T17:31:00,event_type_B,25.5
2022-01-02T03:22:00,event_type_C,26.79
2022-01-01T23:26:00,event_type_C,17.98
2022-01-02T01:15:00,event_type_C,29.54
2022-01-01T19:43:00,event_type_C,19.68
2022-01-02T01:30:00,event_type_B,5.65
2022-01-01T23:33:00,event_type_C,25.4
2022-01-02T07:20:00,event_type_A,16.59
2022-01-01T14:49:00,event_type_C,23.62
These input data can be ingested easily in a Beam pipeline, to produce the average duration for each log event:
import argparse
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
from structlog import get_logger
logger = get_logger()
class ParseLogEntry(beam.DoFn):
def process(self, element):
# Assuming log entries are in CSV format: timestamp,event_type,duration
timestamp, event_type, duration = element.split(',')
return [{'event_type': event_type, 'duration': float(duration)}]
class CalculateAverageDuration(beam.CombineFn):
def create_accumulator(self):
return (0.0, 0)
def add_input(self, accumulator, input):
total_duration, count = accumulator
for duration in input:
total_duration += duration
count += 1
return total_duration, count
def merge_accumulators(self, accumulators):
total_duration, count = zip(*accumulators)
return sum(total_duration), sum(count)
def extract_output(self, accumulator):
total_duration, count = accumulator
return total_duration / count if count != 0 else 0
def run_pipeline(argv=None, save_main_session=True):
parser = argparse.ArgumentParser()
parser.add_argument('--input-file', dest='input', required=True)
parser.add_argument('--output-file', dest='output', required=True)
known_args, pipeline_args = parser.parse_known_args(argv)
pipeline_options = PipelineOptions(pipeline_args)
pipeline_options.view_as(SetupOptions).save_main_session = save_main_session
options = PipelineOptions(['--runner=DirectRunner'])
with beam.Pipeline(options=options) as pipeline:
results = (
pipeline
| 'ReadFromText' >> beam.io.ReadFromText(known_args.input)
| 'ParseLogEntry' >> beam.ParDo(ParseLogEntry())
| 'WithKeys' >> beam.Map(lambda element: (element['event_type'], element['duration']))
| 'GroupByKey' >> beam.GroupByKey()
| 'CalculateAverageDuration' >> beam.CombinePerKey(CalculateAverageDuration())
| 'FormatOutput' >> beam.Map(lambda kv: f'Event Type: {kv[0]}, Average Duration: {kv[1]:.2f}')
| 'WriteToText' >> beam.io.WriteToText(known_args.output)
)
if __name__ == '__main__':
run_pipeline()
Let's analyse what's happening in this script:
- The main function is
run_pipeline
. As always, like Beam nomenclature wants, the pipeline receives input arguments and we set the main session to be shared among all the workers. This allows us to save the state of the global environment in the main session of the code. This environment can then be shared across all the worker nodes of the process, so that the pipeline's transformations (e.g.ParDo
), which are executed by the pipeline's workers, can have access to the same global environment as the main function. - Then we write the usual set-up of a Beam pipeline, defining what are the input arguments and what are the Pipeline options
parser = argparse.ArgumentParser()
parser.add_argument('--input-file', dest='input', required=True)
parser.add_argument('--output-file', dest='output', required=True)
known_args, pipeline_args = parser.parse_known_args(argv)
pipeline_options = PipelineOptions(pipeline_args)
pipeline_options.view_as(SetupOptions).save_main_session = save_main_session
options = PipelineOptions(['--runner=DirectRunner'])
It is worth noticing that Beam has different runner
options. In particular, we can have three main runners:
DirectRunner
: the runner used for local pipeline executionDataflowRunner
: this is the runner designed to execute the pipeline on Google Cloud Dataflow, a fully managed, serverless data processing serviceFlinkRunner
: this is the runner that allows Beam pipelines to run on Apache Flink, a stream processing framework. AWS Kinesis Analytics can run Apache Flink, thus we could say this is the AWS counterpart ofDataflowRunner
- The main pipeline is structured in 7 steps. Once the data are read we execute a first transform, with
beam.ParDo(ParseLogEntry())
. The objectParseLogEntry
performs a simple parsing operation on thePCollection
output fromReadFromText
step WithKeys
step is very important, as it creates aPCollection
output, namely a collection of keys and values. The output from the previous step is a string like:{'event_type':'eventy_type_A', 'duration':18.52}
. What we want to know is to take all the types of events, transform them into keys, and use the duration value as the keys' value. Thus, the output will beevent_type_A: 18.52
- Once we mapped the values with keys, we can group them.
GroupByKey
step usesbeam.GroupByKey()
sorts all the collections in groups with the same key. - From here, we compute the average duration for each event type. This can be done through another type of Beam transform, called
beam.CombineFn
:
class CalculateAverageDuration(beam.CombineFn):
def create_accumulator(self):
return (0.0, 0)
def add_input(self, accumulator, input):
total_duration, count = accumulator
for duration in input:
total_duration += duration
count += 1
return total_duration, count
def merge_accumulators(self, accumulators):
total_duration, count = zip(*accumulators)
return sum(total_duration), sum(count)
def extract_output(self, accumulator):
total_duration, count = accumulator
return total_duration / count if count != 0 else 0
The beam.CombineFn
creates a combine-transform. Each combine-transform requires the following functions:
create_accumulator
: this function creates an accumulator. In this case, we'll have a tuple of two values, the first value represents the duration, and the second is the number of elements we counted the duration across.add_input
: this function receives the accumulator and the input. Here we sum up the duration of all the events for a specific key, and then we sum up the number of elements we counted so farmerge_accumulators
: this function performs a merge of all the accumulated values. For each key that we have grouped, we'll have three distinct accumulators, with the total sum of total duration and counted elementsextract_output
: this function returns the combined (averaged) values we want, performing the arithmetic average of the accumulators.
- the two final steps of the pipeline convert the output to a writable format and write it to an output text.
Apache Beam, even locally, copes very well with the scaling size of the files, as the tab.1 shows. The average time has been computed across 3 different runs, reporting the average values and its standard error (for reference, these calculations have been computed on a Mac M2 Pro, 16 GB). It is worth noticing that the input file is anyway small, as it has only three columns, but the results are truly remarkable if we think we run these calculations on a simple laptop.
Scale up your calculations: run Apache Beam on GCP Dataflow
To try out Apache Beam on GCP you can use Google's $300 credit offer. If you never subscribed to GCP, just head here, and follow the procedure to create a new GCP project. This will grant you a credit of $300. Make sure the credit appears under Billing as it appears in the image below – as a gentle reminder, remember always what is your current free trial credit, before incurring unwanted charges.

What's Dataflow
Dataflow is a fully managed system, provided by Google Cloud Platform, to perform data processing both in streaming and batch mode. The great strength of Dataflow is its simplicity, allowing users to implement pipelines with simple lines of code. The code can be containerised, and Dataflow controls the scaling up, and the number of workers, ensuring automatic horizontal and vertical scaling. The necessity of a tool like Dataflow starts from the increasing demand for scalable and flexible data processors, that can be easy to control, and unified by a programming language like Apache Beam, giving people the power to deal with giga and terabytes of data processing.
Another cool aspect of Dataflow – I know, many of you may say "Well, that's normal"—is that it's a nice UI. Once a pipeline has been deployed to Dataflow, you'll be able to monitor closely each single component and retrieve statistics about each part of the pipeline, giving you a powerful tool for model and data monitoring.
How can I push my data pipeline in Dataflow?

In this post, we will not use gcloud
from within your command line interface, but we'll make use of the GCP cloud shell, as Fig. 2 shows. If you click on that button, you'll get access to the GCP CLI. This gives us all the functionalities for gcloud
and Docker already installed, and we can test immediately our product.
To deploy your pipeline to Dataflow we'll use the "flex template". Dataflow gives you the ability to deploy pipelines as "templates". If a pipeline is a template, you can start drafting out a possible skeleton, and a data scientist, or research engineer, later, can pick that up, and use it for their purposes. A "flex" template, allows you more flexibility. In this deployment, a pipeline is packaged as a Docker image, so custom dependencies can be imported and used. The image is pushed to the GCP Artifact Registry. At launch time, Dataflow will pull the Docker image and run the pipeline setting up all virtual machines (VMs) under the hoods. The flex template provides the flexibility in specifying the dependencies and custom runtime environment; a parameterization, to define the parameters to use during job submission; as well as full control of the user over the image, custom libraries, binaries and link to resources beyond Python (e.g. creating a package that runs Rust ).
Set up your GCP environment first!
Before packaging up your image with Docker and pushing it to GCP we need to set up Docker for GCP, as well as create a "service account" that can enable the usage of Dataflow and all the GCP tools we'll need.
- From the GCP head to the Artifact Registry. Click on "Create Repository"

- In the following menu, create a repository named
processing-pipeline
, and keep the format toDocker
, as a region select the region best based on your location (for example, I am based in Scotland, soeu-west2
is the best location for me). Keep all the other options and hitCreate
. A new Artifact Registry repository will be created. - Activate your Docker environment. Open the GCP CLI and run the following command
gcloud auth configure-docker {{YOUR_REGION}}-docker.pkg.dev
where{{YOUR_REGION}}
is the region you selected above. For example, for me iseurope-west2
. This command sets up the docker configurations for your environment.
Then, we need to set up a "service account" that gives us the ability to interact with GCP Dataflow, Artifact Registry, Storage, and enabling Dataflow, logging, storage, artifact registry and cloud build API services. For this, you just need to open up a GCP shell and run the following in the terminal:
# set up gcloud with your project
gcloud config set project YOUR_PROJECT_ID
# grant roles to service account
gcloud projects add-iam-policy-binding YOUR_PROJECT_ID --member="serviceAccount:[email protected]" --role=roles/dataflow.admin
gcloud projects add-iam-policy-binding YOUR_PROJECT_ID --member="serviceAccount:[email protected]" --role=roles/dataflow.worker
gcloud projects add-iam-policy-binding YOUR_PROJECT_ID --member="serviceAccount:[email protected]" --role=roles/storage.objectAdmin
gcloud projects add-iam-policy-binding YOUR_PROJECT_ID --member="serviceAccount:[email protected]" --role=roles/storage.admin
gcloud projects add-iam-policy-binding YOUR_PROJECT_ID --member="serviceAccount:[email protected]" -role=roles/artifactregistry.reader
gcloud projects add-iam-policy-binding YOUR_PROJECT_ID --member="serviceAccount:[email protected]" --role=roles/dataflow.admin
gcloud projects add-iam-policy-binding YOUR_PROJECT_ID --member="serviceAccount:[email protected]" --role=roles/dataflow.worker
# enable all the services
gcloud services enable dataflow compute_component logging storage_component storage_api cloudresourcemanager.googleapis.com artifactregistry.googleapis.com cloudbuild.googleapis.com
# execute an auth login
gcloud auth application-default login
PROJECT_ID
and PROJECT_NUMBER
can be found on your GCP homepage, as fig.4 shows.

The commands first set the default project of your CLI to your YOUR_PROJECT_ID
and then it creates a service account [email protected]
with the right roles for interacting with the needed GCP infrastructure.
Following, we can create the buckets (storage) we'll need for our Dataflow pipelines, where:
flex_templates_my_pipeline
stores the built Dataflow template fileinput_files_my_pipeline
stores the input file will be dealing withoutput_files_my_pipeline
stores the output from the pipelinemypipelines-dataflow-temp
is a bucket to store temporary files, generating during the run by Dataflow
gcloud storage buckets create gs://flex_templates_my_pipeline --location eu --project YOUR_PROJECT_ID
gcloud storage buckets create gs://input_files_my_pipeline --location eu --project YOUR_PROJECT_ID
gcloud storage buckets create gs://output_files_my_pipeline --location eu --project YOUR_PROJECT_ID
gcloud storage buckets create gs://mypipelines-dataflow-temp --location eu --project YOUR_PROJECT_ID
Finally, you have to grant to Docker all the permissions to push new images to the artifact registry. Here is a thorough documentation that GCP has put in place. One of the authentication method is more than enough to fix all the permissions issues.
Build the pipeline image and the flex template
It's now time to get our hands on the real building part. You can find all the relevant codes in my repo. Clone the repo in a folder on your GCP CLI, and let's go through the image build step. I wrote a simple bash script with all the needed commands so that you can do bash build_flex_template.sh
. Let's analyse what's happening here:
- At first, we're building the image of our Beam pipeline and we're pushing it to the artifact registry
echo "Build Docker image"
docker build --no-cache -t processing_pipeline -f docker/Dockerfile .
echo "Tag Docker image"
docker tag processing_pipeline europe-west2-docker.pkg.dev/long-axle-412512/processing-pipeline/pipeline:latest
echo "Push Docker image"
docker push europe-west2-docker.pkg.dev/long-axle-412512/processing-pipeline/pipeline:latest
The Dockerfile in docker/Dockerfile
has specific Beam/Dataflow instructions – for this first tutorial, we'll not see how to deal with external dependencies.
- The first thing is that we're building on top of a
gcr
base imageFROM gcr.io/dataflow-templates-base/python39-template-launcher-base
This guarantees to have all the needed tools for setting up a Dataflow template; - Between lines 3–20 we're working as usual on Docker, creating a working directory – be aware, that it's better to name this directory as
/template
, and then download and install all the requirements; - The final lines are crucial to give the right instructions to Dataflow to find the right setups when launching the template. In particular, we're exporting the requirements file path, the pipeline python file, and the pipeline's setup file. Finally, the entry point uses
/opt/apache/beam/root
. This ensures we are bootstrapping the Apache Beam pipeline execution within the container, it handles the configuration of the execution environment for each worker, it translates the righ tinformation to dataflow to launch and run the image.
ENV FLEX_TEMPLATE_PYTHON_REQUIREMENTS_FILE="${WORKDIR}/requirements.txt"
ENV FLEX_TEMPLATE_PYTHON_PY_FILE="${WORKDIR}/pipeline/processing_logs.py"
ENV FLEX_TEMPLATE_PYTHON_SETUP_FILE="${WORKDIR}/setup.py"
ENV PIP_NO_DEPS=True
ENTRYPOINT [ "/opt/apache/beam/boot" ]
- The second step is to call the dataflow API to build the Dataflow Template:
echo "Build Flex Template"
gcloud dataflow flex-template build gs://flex_templates_my_pipeline/template.json
--image-gcr-path europe-west2-docker.pkg.dev/long-axle-412512/processing-pipeline/pipeline:latest
--sdk-language "PYTHON"
--metadata-file metadata/metadata.json
--project long-axle-412512
--worker-region europe-west2
--env "FLEX_TEMPLATE_PYTHON_REQUIREMENTS_FILE=requirements.txt"
--env "FLEX_TEMPLATE_PYTHON_PY_FILE=pipeline/processing_logs.py"
--env "FLEX_TEMPLATE_PYTHON_SETUP_FILE=setup.py"
--flex-template-base-image "PYTHON3"
--py-path "."
The command creates a template structure in gs://flex_templates_my_[pipeline](https://github.com/Steboss/dataflow_teaching/blob/main/pipeline/processing_logs.py)/template.json
. This template is based on the image that we built and pushed previously. The Dataflow pipeline has a manifest, called metadata.json
that specifies the input and output parameters of the pipeline. Now, differently from what we run locally, we have to add additional information to our pipeline:
def run_pipeline(argv=None):
parser = argparse.ArgumentParser()
parser.add_argument('--input-file', dest='input', required=True)
parser.add_argument('--output-file', dest='output', required=True)
parser.add_argument('--job_name', dest='job_name', required=True)
parser.add_argument('--project', dest='project', required=True)
parser.add_argument('--region', dest='region', required=True)
known_args, pipeline_args = parser.parse_known_args(argv)
pipeline_options = PipelineOptions(
pipeline_args,
streaming=False,
save_main_session=True,
job_name=known_args.job_name,
project=known_args.project,
region=known_args.region
)
As you can see, besides passing the input file and output file, we added some extra fields, required by Dataflow:
job_name
: This will be the name of the pipeline on Dataflowproject
: This is the GCP project IDregion
: The region where we want to run the job from. If you're struggling to run the job fromeurope-X
regions, don't worry and try to run it fromus-central1
region (no extra cost if you're in trial mode)
This info is then ported to the metadata.json
file:
{
"name": "my-processing-logs-pipeline",
"description": "This is a processing log pipeline",
"parameters": [
{
"name": "input-file",
"label": "Input File",
"helpText": "Path to the input file"
},
{
"name": "output-file",
"label": "Output File",
"helpText": "Path to the output file"
},
{
"name": "project",
"label": "GCP project",
"helpText": "Project to run the dataflow job on"
},
{
"name": "job_name",
"label": "Dataflow job name",
"helpText": "Name of the dataflow job"
},
{
"name": "region",
"label": "Region",
"helpText": "Region to run the dataflow job in"
}
]
}
Finally, going back to the gcloud dataflow flex-template
command, it's important to notice that we have to export again the environment variable we used in the docker file. In this case, the path is referring to our local setup, so if we're running the script within our repo's directory, we'll have:
--env "FLEX_TEMPLATE_PYTHON_REQUIREMENTS_FILE=requirements.txt"
--env "FLEX_TEMPLATE_PYTHON_PY_FILE=pipeline/processing_logs.py"
--env "FLEX_TEMPLATE_PYTHON_SETUP_FILE=setup.py"
--py-path "."
In particular, --py-path "."
specifies the Python module search path, suggesting that the pipeline code and any Python modules are located in the same directory where the command is executed
Run the flex template on Dataflow!
Before running the job, let's create a huge input file with the given generate_data
script. I modified line 14, in order to generate an input file with 100'000'000 rows. I then moved this file to the GCS bucket we created above gcloud cp INPUT_FILE.txt gs://input_files_my_pipeline/input_file.txt
.
Following, you can use this script to run the built flex template. The command is pretty straightforward, just a few things to notice:
echo "Running Flex Template"
gcloud dataflow flex-template run ${PIPELINE_NAME}
--project=${PROJECT}
--template-file-gcs-location=${DATAFLOW_GCS_LOCATION}
--worker-region=us-central1
--region=us-central1
--worker-machine-type=n1-standard-2
--max-workers=$NUM_MAX_WORKERS
--num-workers=1
--temp-location=gs://mypipelines-dataflow-temp/
--staging-location=gs://dataflow-staging-europe-west2-1028464732444/
--parameters job_name=processing-pipeline
--parameters project=${PROJECT}
--parameters region=us-central1
--parameters input-file=gs://input_files_my_pipeline/input_file.txt
--parameters output-file=gs://input_files_my_pipeline/output
--worker
defines the region where the workers of your job will be based. For some GCP problems/reasons, I placed everything in us-central1
– although the image is in europe-west2
. Ideally, you should have everything in a single region, to lower the latency, and pay less money for some expensive API calls.
--max-workers
gives you the ability to know to what extent your job can scale up. In this case, we're processing a 100'000'000 rows input file, but it's more than ok to have at max 2 nodes to work with. Always consider carefully how much you want your pipeline to scale up, in order to avoid extra costs, or extra machines for a work that requires little computational power. In particular, --worker-machine-type
specifies the machine we're using.
Finally, all the input parameters must be specified with --parameters
. Note that in --parameters
all the underscores are converted to dash symbols.
At the start, Dataflow will pull the pipeline image and prepare all the setup:

Once the job has been created, it will start running. You can then see the job graph

Another amazing thing in GCP Dataflow is that you can monitor all the steps of your pipeline with tons of additional information on your right menu bar.
For streaming job at run time, and batch jobs once they run, you can see metrics and performances:

Fig.7 shows the batch job after the run. You can see that for each step we have detailed information on the processing time, as well as how many stages the step has been divided into. Part of the under-the-hood Dataflow's tasks is to optimise the input pipeline. This is achieved by simplifying and breaking down a given step into smaller stages, to better handle the resource allocation, parallelisation and execution order. Additionally, for steps like GroupByKey
, Dataflow introduces stages to shuffle the data. Shuffling data at the GroupByKey
stage is needed, to better handle the parallelisation process.

Further important information can be retrieved under JOB METRICS
dashboard. In this tab, you can find all the information related to the pipeline's throughput, CPU utilization, as well as GPU utilization and memory utilization in general.
Conclusion
This is the end of the first part of the tutorials dedicated to Apache Beam. This time we learned the basic concepts of Apache Beam, in particular:
- Why Apache Beam is so powerful
- How to build a simple pipeline
- What are the main concepts in Apache Beam,
PCollection
andParDo
- How to deal with Dataflow
Once you try out this tutorial, you'll be able to appreciate more and more Apache Beam and lower its learning barrier. In the next article, we'll dive into external dependencies, more complicated paradigms for feature generations and how to get the best out of Dataflow with Dataflow Prime. Stay tuned