Python for Data Engineers

Author:Murphy  |  View: 29225  |  Time: 2025-03-23 12:21:08
Photo by Boitumelo on Unsplash

In this story I will speak about advanced data engineering techniques in Python. No doubt, Python is the most popular programming language for data. During my almost twelve-year career in data engineering, I encountered various situations when code had issues. This story is a brief summary of how I resolved them and learned to write better code. I will show a few techniques that make our ETL faster and help to improve the performance of our code.


List comprehensions

Imagine you are looping through a list of tables. Typically, we would do this:

data_pipelines = ['p1','p2','p3']
processed_tables = []
for table in data_pipelines:
    processed_tables.append(table)

But instead, we could use list comprehensions. Not only they are faster, they also reduce the code making it more concise:

processed_tables = [table for table in data_pipelines]

For example, looping through a super large file with data to transform (ETL) each row has never been easier:

def etl(item):
    # Do some data transformation here
    return json.dumps(item)

data = u"n".join(etl(item) for item in json_data)

List comprehensions are extremely useful for ETL processing of large data files. Imagine a data file we need to transform into a newline delimited format. Try running this example in your Python environment:


import io
import json

def etl(item):
    return json.dumps(item)

# Text file loaded as a blob
blob = """
        [
{"id":"1","first_name":"John"},
{"id":"2","first_name":"Mary"}
]
"""
json_data = json.loads(blob)
data_str = u"n".join(etl(item) for item in json_data)

print(data_str)
data_file = io.BytesIO(data_str.encode())

# This data file is ready for BigQuery as Newline delimited JSON
print(data_file)

Output will be a newline delimited JSON. This is a standard format for data in BigQuery data warehouse and it is ready for loading into the table:

{"id": "1", "first_name": "John"}
{"id": "2", "first_name": "Mary"}
<_io.BytesIO object at 0x10c732430>

Generators

If we are dealing with CSV and DAT files where data is stored line by line then our file object is already a generator of lines and we can use a list comprehension to process data not consuming too much of our memory:

for line in open('very_big_file.csv'):
    validate_schema(line)

# or the same using list comprehension:
data_errors = [validate_schema(line) for line in open('very_big_file.csv')]

Validating records before we actually insert them into a data warehouse table might be useful for batch data processing pipelines.

Often we need to validate data files before we load them into a data warehouse. If one record fails the whole batch will fail.

We can use it to create close to real-time analytics pipelines. This is also a very cost-effective way to process data compared to streaming data pipeline design pattern. I previously wrote about it here:

Data pipeline design patterns

Alternatively, we could use yield when dealing with Big Data and when our file is not a newline delimited text. It is always a good practice as we would want to process the data file in a memory-efficient manner. For example:

# Create a file first: ./very_big_file.csv as:
# transaction_id,user_id,total_cost,dt
# 1,John,10.99,2023-04-15
# 2,Mary, 4.99,2023-04-12

# Example.py
def etl(item):
    # Do some etl here
    return item.replace("John", '****') 

# Create a generator 
def batch_read_file(file_object, batch_size=19):
    """Lazy function (generator) can read a file in chunks.
    Default chunk: 1024 bytes."""
    while True:
        data = file_object.read(batch_size)
        if not data:
            break
        yield data
# and read in chunks
with open('very_big_file.csv') as f:
    for batch in batch_read_file(f):
        print(etl(batch))

# In command line run
# Python example.py

This will read a local file and process it in chunks of 19 bytes. Output will be:

transaction_id,user
_id,total_cost,dt
1
,****,10.99,2023-04
-15
2,Mary, 4.99,20
23-04-12

It's just an example of how we can deal with binary data. In real life, it might be easier to split the file contents into segments using a separator, i.e. newline 'n' or '}{' depending on how our data was structured.

Imagine that the text data is pulled from some external location, i.e. from cloud storage. We also can process it as a stream. We wouldn't want to load the whole data file and run split('n') to process it line by line. It would consume a lot of memory. We can use re.finditer instead which acts like a generator and will read our data file in chunks so we'll be able to run the required ETL not consuming too much memory.

import io
import re
def etl(item):
    print(f'Transforming item: {item}')
    return item.replace("John", '****')

# Helper function to split our text file into chunks
# using separator
def splitStr(string, sep="s+"):
    if sep=='':
        return (c for c in string)
    else:
        return (_.group(1) for _ in re.finditer(f'(?:^|{sep})((?:(?!{sep}).)*)', string))

# Text file loaded as a blob
blob = """transaction_id,user_id,total_cost,dt
1,John,10.99,2023-04-15
2,Mary, 4.99,2023-04-12
"""

# data = blob.split("n") # We wouldn't want to do this on large datasets 
# as it would require to load big data file as a whole in the first place
# consuming lots of memory

# We would want to use our generator helper function
# and process data in chunks
data = splitStr(blob, sep='n')
data_str = u"n".join(etl(item) for item in data)

print('New file contents:')
print(data_str)
data_file = io.BytesIO(data_str.encode())

print('This data file is ready for BigQuery:')
print(data_file)

Output:

python example.py                                  ✔  48  19:52:06  dataform_env
Transforming item: transaction_id,user_id,total_cost,dt
Transforming item: 1,John,10.99,2023-04-15
Transforming item: 2,Mary, 4.99,2023-04-12
Transforming item:
New file contents:
transaction_id,user_id,total_cost,dt
1,****,10.99,2023-04-15
2,Mary, 4.99,2023-04-12

This data file is ready for BigQuery:
<_io.BytesIO object at 0x103672980>

Python properties for data validation

We can use Python properties [2] to validate data records. If a record is not an instance of a class that we defined then the exception must be raised.

We can store our data as objects of the data class.

It is as simple as that. Let's imagine that we have a streaming data pipeline and we want to validate some fields in our records.

Put it simply— they must match the existing table schema.

We can use Python properties for that. Consider this example below.

class ConnectionDataRecord(object):
    def __init__(self, user, ts):
        self.user = user
        self.ts = ts

    @property
    def user(self):
        return self._user

    @description.setter
    def user(self, d):
        if not d: raise Exception("user cannot be empty")
        self._user = d

    @property
    def ts(self):
        return self._ts

    @value.setter
    def ts(self, v):
        if not (v > 0): raise Exception("value must be greater than zero")
        self._ts = v

If we choose to break the rules and assign some values that don't match our criteria, the exception will be thrown. For example, the exception will be raised if we try to call ConnectionDataRecord('', 1)

Alternatively, we can use a library called Pydantic Consider this cde below. It will throw an error if we call the function with an object that doesn't meet our requirements.

from pydantic import BaseModel

class ConnectionDataRecord(BaseModel):
    user: str
    ts: int

record = ConnectionDataRecord(user="user1", ts=123456789)

Decorators

Decorators were designed to make our code look leaner and to add extra functionality to it. We can simply pass one function as an argument into another function (decorator) and do some data transformation inside this wrapper. Imagine that we have many different ETL functions to process data but we need just one to upload the result into the data lake. This is how we do it:

If some code logic repeats it is a good practice to use a decorator.

It helps to maintain code base easier and saves a lot of time in case we need to change the repetitive logic.

def etl_decorator(func):
    def wrapper():
        result = func()
        return f'Processing {result}' 
    return wrapper

@etl_decorator
def unzip_data():
    return "unzipped data"

print(unzip_data())  # Output: Processing unzipped data

Decorators are used everywhere due to their effectiveness. Consider this Airflow DAG example:

@dag(default_args=default_args, tags=['etl'])
def etl_pipeline():

    @task()
    def extract():
        return json.loads(data_string)    
    @task(multiple_outputs=True)
    def transform(order_data_dict: dict):
        return {"total_count": len(order_data_dict)}    
    @task()
    def load(total_order_value: float):
        print(f"Total order value is: {total_count}")    

    extracted = extract()
    transformed = transform(extracted)
    load(transformed["total_count"])

Working with APIs

As a data engineer you would perform HTTP requests calling various API endpoints very often. Consider this example of GET request below.

response = requests.get('https://api.nasa.gov/neo/rest/v1/feed?start_date=2015-09-07&end_date=2015-09-08&api_key=your_api_key'))
print(response.json())

It pulls some data from the free NASA Asteroids API and will return all asteroids close to Earth on that date. Just replace your API key in this URL path above or use the one I created. The requests library handles everything but there is a better way of doing it.

We can use session and process data from our API endpoint as a stream.

That would ensure we won't encounter any memory issues and process our GET request in a streaming manner [3]:

import requests
session = requests.Session()

url="https://api.nasa.gov/neo/rest/v1/feed"
apiKey="your_api_key"
requestParams = {
    'api_key': apiKey,
    'start_date': '2023-04-20',
    'end_date': '2023-04-21'
}
response = session.get(url, params = requestParams, stream=True)
print(response.status_code)

Understanding how HTTP requests work is crucial in Data Engineering.

I do all sorts of things with API requests daily and I don't have to rely on any other frameworks or libraries.

For example, just a couple of weeks ago I was working on the Dataform migration project and realised that the existing Google library (from google.cloud import dataform_v1beta1) can't create schedules. The workaround was to use Dataform API [4] and it was as simple as POST request to that particular endpoint:

Python">from google.cloud import dataform_v1beta1
import requests
import google.auth.transport.requests
from google.oauth2 import service_account
...
# Get Dataform and BigQuery credentials from encrypted file:
print(f'Getting BigQuery credentials from encrypted file...')
credentials = service_account.Credentials.from_service_account_file(
    './../credentials.json'
    , scopes=['https://www.googleapis.com/auth/cloud-platform'])

def create_dataform_workflow_config(credentials, id, workflow_config, repository_id):
    '''
    The function will create a schedule (workflow) in Dataform configs by making a direct API call
    and using request_params with requests module
    , i.e.
    https://cloud.google.com/dataform/reference/rest/v1beta1/projects.locations.repositories.workflowConfigs/create
    https://cloud.google.com/dataform/reference/rest/v1beta1/projects.locations.repositories.workflowConfigs#WorkflowConfig
    If successful will create a workflow:
    {'name': 'projects/my-project-data-staging/locations/us-central1/repositories/dataform-poc/workflowConfigs/test_live_20230831', 'releaseConfig': 'projects/my-project-data-staging/locations/us-central1/repositories/dataform-poc/releaseConfigs/staging', 'invocationConfig': {'includedTags': ['test']}, 'cronSchedule': '40 13 * * *', 'timeZone': 'Africa/Abidjan'}

    If the workflow exists an error will be sent:
    {'error': {'code': 409, 'message': "Resource 'projects/123456789/locations/us-central1/repositories/dataform-poc/workflowConfigs/test_live_20230831' already exists", 'status': 'ALREADY_EXISTS', 'details': [{'@type': 'type.googleapis.com/google.rpc.ResourceInfo', 'resourceName': 'projects/123456789/locations/us-central1/repositories/dataform-poc/workflowConfigs/test_live_20230831'}]}}

    Accepts workflow_config as request_body, i.e.
     request_body = {
             # "name": "projects/123456789/locations/us-central1/repositories/dataform-poc/workflowConfigs/test_live_20230830",
             "releaseConfig": "projects/my-project-data-staging/locations/us-central1/repositories/dataform-poc/releaseConfigs/staging",
             "invocationConfig": {
                 "includedTags": [
                     "test"
                 ]
             },
             "cronSchedule": "40 13 * * *",
             "timeZone": "Africa/Abidjan"
     }
    '''
    request = google.auth.transport.requests.Request()
    credentials.refresh(request)

    print('Creating a workflow...')
    # Make the request
    try:
        session = requests
        url=f'https://dataform.googleapis.com/v1beta1/projects/123456789/locations/us-central1/repositories/{repository_id}/workflowConfigs/'

        headers = {
            "Authorization": "Bearer " + credentials.token,
            "Content-Type" : "application/json; charset=utf-8"

        }
        query_params = {
            "workflowConfigId": id
        }
        request_body = workflow_config

        page_result = session.post(url, params=query_params, json=request_body, headers=headers)
        print(page_result.json())
    except Exception as e:
        print(e)

The essence of this request is that we send workflow_config as json and add workflowConfigId in path parameters using this knowledge from Google documentation [4].

This will create a required schedule to run our data transformation scripts in BigQuery's Dataform.

Similarly, as we did in GET request we can stream data into our POST API endpoint using Python generators like so:

import time
import requests

def etl_data_generator():
    yield b"Foo"
    time.sleep(3)
    yield b"Bar"

requests.post("http://some.api.endpoint", data=etl_data_generator())

The idea is clear. We can process and send data in a memory-efficient manner.

Handling API rate limits

All APIs have rate limits and we would want to keep that in mind while fetching the data out of it. We can use decorators to handle it. Simple decoration can be implemented like this:

from ratelimit import limits
import requests
CALLS = 10
TIME_PERIOD = 900   # time period in seconds

@limits(calls=CALLS, period=TIME_PERIOD)
def call_api():
    response = requests.get('https://api.example.com/data')
    if response.status_code != 200:
        raise Exception('API response: {}'.format(response.status_code))
    return response.json()

Using this decorator our function won't make more than 10 API calls within 15 minutes. The most simple way to handle this situation is to use time.sleep() but Python ratelimit allows us to do it in this elegant manner.

Async and await in Python

Performing ETL asynchronously is another extremely useful feature. We can use the asyncio library to run tasks simultaneously. Let consider this simple synchronous example where we process tables in for loop:

import requests

def pull_data(url, requestParams):
    return requests.get(url, params = requestParams, stream=True)

for table in api_endpoints_list:
    data = pull_data(table.api_url, table.requestParams)
    etl(data)

Running this code we would have to wait for each table to finish the pull_data() task but with Async, we can process them in parallel.

Consider using this code instead:

import asyncio
import aiohttp

async def pull_data(session, url, requestParams):
  async with session.get(url, params = requestParams, stream=True) as response:
    return await response

async def main():
    async with aiohttp.ClientSession() as session:
        tasks = [pull_data(session, url, requestParams) for table in api_endpoints_list:]
        tasks_data = await asyncio.gather(*tasks)
        for data in tasks_data:
            etl(task_data)

asyncio.run(main())

It will pull data from reporting APIs simultaneously and it will significantly improve our ETL performance.

It helps to manage ETL tasks easier while system resources are being distributed in the best way possible.

For example, we can run two ETL jobs simultaneously but then we can define the order of execution:

async def etl():
    job1 = asyncio.create_task(perform_etl_script1())
    job2 = asyncio.create_task(read_s3_data())
    job3 = asyncio.create_task(upload_s3_data())

    await job2
    await job1
    await job3

Use Map and Filter

Map and filter work even faster than list comprehensions.

We can transform our data line by line aplying map function to items in our dataset prrocessing it as an iterable:

import math 
numbers = [10,20]
factorials = list(map(lambda i: math.factorial(int(math.sqrt(i**3))), numbers))
print(factorials)
# Output:
# [8222838654177922817725562880000000, 16507955160908461081216919262453619309839666236496541854913520707833171034378509739399912570787600662729080382999756800000000000000000000]

We would want to use filter to extract objects matching a certain criteria, i.e.


numbers = [10,21]
even_numbers = list(filter(lambda i: i% 2 == 0, numbers))
print(even_numbers)
# Output:
# [10]

Process large datasets using Pandas

Later versions of Pandas library provide a handy context manager that can be used like so:

batchsize = 10 ** 5
with pd.read_csv(filename, chunksize=batchsize) as reader:
    for batch in reader:
        etl(batch)

It will process data in batch mode assuming we don't need to load the whole dataset into a dataframe at one time.

It has a wide range of applications from OLAP reporting to Machine learning (ML) pipelines, for instance, we might want to create a recommendation model trainer task and would need to prepare a dataset like so:

batch_data=pd.read_table('recommendation_data.csv',chunksize=100000,sep=';',
       names=['group','user','rating','date','id'],index_col='id',
       header=None,parse_dates=['date'])

df=pd.DataFrame()
%time df=pd.concat(batch.groupby(['group','user',batch['date'].map(lambda x: x.year)])['rating'].agg(['sum']) for batch in batch_data)

This way Pandas will ensure our application always has enough memory to process data.

Use joblib for pipelining and parallel computation

joblib.dump() and joblib.load() methods allow us to pipeline large dataset transformations efficiently. joblib will store and serialise big data working with arbitrary Python objects such as numpy arrays.

What do you think scikit-learn uses to save and load ML models? The correct answer is – joblib.

First of all, why save a model? – Simply because we might need it later further down the pipeline, i.e. to make predictions using new data, etc.

We wouldn't want to retrain our ML model as it is a very time-consuming task.

Another reason is that we might want to save different versions of the same model to see which one performs better. joblib helps to do all that [5]:

from joblib import dump, load
import os
import numpy as np
import joblib

filename = os.path.join(savedir, 'test.joblib')
to_persist = [('foo', [1, 2, 3]), ('bar', np.arange(5))]

# Save a model
joblib.dump(to_persist, filename)  
# ['...test.joblib']

# Load a model
joblib.load(filename)
# [('foo', [1, 2, 3]), ('bar', array([0, 1, 2, 3, 4]))]

These functions explicitly connect the file we save on disk and the execution context of the original Python object. So instead of file names joblib also accepts file objects:


# WRITE
with open(filename, 'wb') as fo:
   joblib.dump(model, fo)

# READ
with open(filename, 'rb') as fo:  
   model = joblib.load(fo)

AWS S3 model dump/load example:

import tempfile
import boto3
import joblib

s3_client = boto3.client('s3')
bucket_name = "my-bucket"
key = "model.pkl"

# WRITE
with tempfile.TemporaryFile() as fp:
    joblib.dump(model, fp)
    fp.seek(0)
    s3_client.put_object(Body=fp.read(), Bucket=bucket_name, Key=key)

# READ
with tempfile.TemporaryFile() as fp:
    s3_client.download_fileobj(Fileobj=fp, Bucket=bucket_name, Key=key)
    fp.seek(0)
    model = joblib.load(fp)

# DELETE
s3_client.delete_object(Bucket=bucket_name, Key=key)

Parallel computing with joblib

It is very efficient because it relies on multiprocessing and will execute tasks concurrently using multiple Python workers on all CPU cores or across multiple machines. Consider this example:

import time 
from joblib import Parallel,delayed 
import math 

t1 = time.time() 

# Normal 
results = [math.factorial(int(math.sqrt(i**3))) for i in range(1000,2000)] 

t2 = time.time() 

print('nComputing time {:.2f} s'
      .format(t2 - t1))

# Using all CPU cores
t1 = time.time()
results = Parallel(n_jobs=-1)(delayed(math.factorial) (int(math.sqrt(i**3))) for i in range(1000,2000)) 

t2 = time.time()
print('nComputing time {:.2f} s'
      .format(t2 - t1))

We can use all CPU cores we have to unlock full potential of our hardware.

Here we tell Parallel to use all cores (-1) and the computation runs 5 times faster:

# The output:
Computing time 59.67 s

Computing time 12.18 s

Unit test ETL pipelines

One of the most important picks I've learned throughout my data engineering career is that everything must be unit-tested. That includes not only SQL but ETL jobs and integrations with other services we use on our data pipelines.

We can use the unittest Pyhton library to test our code. Let's imagine we have a helper module that was created to check a number if the number is a prime number:

# ./prime.py
import math

def is_prime(num):
    '''Check if num is prime or not.
    '''
    for i in range(2,int(math.sqrt(num))+1):
        if num%i==0:
            return False
    return True

How do we test the logic inside this function?

unittest makes it really simple:

# ./test.py
import unittest
from prime import is_prime

class TestPrime(unittest.TestCase):

    def test_thirteen(self):
        self.assertTrue(is_prime(13))

Now if we run this in our command line we will test the logic:

python -m unittest test.py
# Output:
# .
# ----------------------------------------------------------------------
# Ran 1 test in 0.000s

# OK

This is correct because 13 is a prime number. Let's test is further. We know that 4 is not a prime number and, therefore, we would want our unit test for this particular function to return pass while asserting False:

# ./test.py
import unittest
from prime import is_prime

class TestPrime(unittest.TestCase):

    def test_thirteen(self):
        self.assertTrue(is_prime(13))
    def test_four(self):
        self.assertFalse(is_prime(4))
python -m unittest test.py
# Output:
# ..
# ----------------------------------------------------------------------
# Ran 2 tests in 0.000s

# OK

Simple. Let's take a look at a more advanced example.

Let's imagine we have an ETL service that pulls data from some API and it takes a lot of time. Then our service will transform this dataset and we would like to test that this ETL transformation logic persists.

How do we do it?

We can use mock and patch methods from the unittest library. Consider this application file asteroids.py

# ./asteroids.py
import requests
API_KEY="fsMlsu69Y7KdMNB4P2m9sqIpw5TGuF9IuYkhURzW"
ASTEROIDS_API_URL="https://api.nasa.gov/neo/rest/v1/feed"

def get_data():
    print('Fetching data from NASA Asteroids API...')
    session = requests.Session()
    url=ASTEROIDS_API_URL
    apiKey=API_KEY
    requestParams = {
        'api_key': apiKey,
        'start_date': '2023-04-20',
        'end_date': '2023-04-21'
    }
    response = requests.get(url, params = requestParams)
    print(response.status_code)
    near_earth_objects = (response.json())['near_earth_objects']
    return near_earth_objects

def save_data():
    # Do some ETL here
    data = get_data()
    return data

print(save_data())

If we run app.py the output will have a list of asteroids that are close to Earth on that particular date:

# python ./asteroids.py

Fetching data from NASA Asteroids API...
200
{'2023-04-20': [{'links': {'self': 'http://api.nasa.gov/neo/rest/v1/neo/2326291?api_key=fsMlsu69Y7KdMNB4P2m9sqIpw5TGuF9IuYkhURzW'}, 'id': '2326291', 'neo_reference_id': '2326291', 'name': '326291 (1998 HM3)', 'nasa_jpl_url': 'http://ssd.jpl.nasa.gov/sbdb.cgi?sstr=2326291', 'absolute_magnitude_h': 19.0, 'estimated_diameter': {'kilometers': {'estimated_diameter_min': 0.4212646106, 'estimated_diameter_max': 0.9419763057}, 'meters': {'estimated_diameter_min': 421.2646105562, 'estimated_diameter_max': 941.9763057186}, 'miles': {'estimated_diameter_min': 0.2617616123, 'estimated_diameter_max': 0.5853167591}, 'feet': {'estimated_diameter_min': 1382.1017848971, 'estimated_diameter_max': 3090.4735428537}}, 'is_potentially_hazardous_asteroid': False, 'close_approach_data':
....

Pulling data from API might take a lot of time but we would want our unit test to run fast. We can mock some fake API response into our get_data() function and then use it to test ETL logic in save_data() function:

# ./test_etl.py
import unittest
from asteroids import *

import unittest.mock as mock

class TestEtl(unittest.TestCase):

    def test_asteroids_etl(self): 
        with mock.patch('asteroids.get_data') as GetDataMock:
            GetDataMock.return_value = ['asteroid_1', 'asteroid_2']
            self.assertEqual(['1', '2'], save_data())

The output will be:

AssertionError: Lists differ: ['1', '2'] != ['asteroid_1', 'asteroid_2']

First differing element 0:
'1'
'asteroid_1'

- ['1', '2']
+ ['asteroid_1', 'asteroid_2']

----------------------------------------------------------------------
Ran 1 test in 0.001s

FAILED (failures=1)

In our unit test, we replaced (using mock) values returned by asteroids.get_data function and expected them to be transformed into (ETL) ['asteroid_1', 'asteroid_2'] which our ETL function failed to do. Unit test failed.

Unit tests are very powerful.

It helps us to deal with human errors while deploying new features in ETL pipelines. Some more advanced examples can be found in one of my previous stories. I use it very often in CI/CD pipelines [6]:

Test Data Pipelines the Fun and Easy Way

Monitoring memory usage

Often I deploy ETL microservices using serverless. It's a very neat a cost-effective tool. I deploy Lambdas and Cloud Functions and wouldn't want to overprovision them with excessive memory.

I previously wrote about it here:

Infrastructure as Code for Beginners

Indeed, why would we give our Lambda 3Gb of memory and pay more when the data can be processed at 256Mb?

There are various ways to monitor our ETL application memory usage. One of the most popular is tracemalloc [7] library. It can trace Python memory blocks and return the results in (, ) format in bytes. Consider this example to extract data from asteroid API in one big chunk and save it to the disk:

# asteroids.py
import requests
import json
import tracemalloc

tracemalloc.start()

API_KEY="fsMlsu69Y7KdMNB4P2m9sqIpw5TGuF9IuYkhURzW"
ASTEROIDS_API_URL="https://api.nasa.gov/neo/rest/v1/feed"

@profile
def get_data():
    print('Fetching data from NASA Asteroids API...')
    session = requests.Session()
    url=ASTEROIDS_API_URL
    apiKey=API_KEY
    requestParams = {
        'api_key': apiKey,
        'start_date': '2023-04-20',
        'end_date': '2023-04-27'
    }
    response = requests.get(url, params = requestParams).text
    with open('out.csv', 'w') as fd:
        fd.write(response)

get_data()

print(tracemalloc.get_traced_memory())

tracemalloc.stop()

The output will be:

Fetching data from NASA Asteroids API...
(85629, 477039)

We can see that peak usage is around 540Kb.

Let's see how it can be optimised simply by using stream :

# asteroids_stream.py
import requests
import json
import tracemalloc

tracemalloc.start()

API_KEY="fsMlsu69Y7KdMNB4P2m9sqIpw5TGuF9IuYkhURzW"
ASTEROIDS_API_URL="https://api.nasa.gov/neo/rest/v1/feed"

def get_data():
    print('Fetching data from NASA Asteroids API...')
    session = requests.Session()
    url=ASTEROIDS_API_URL
    apiKey=API_KEY
    requestParams = {
        'api_key': apiKey,
        'start_date': '2023-04-20',
        'end_date': '2023-04-27'
    }
    response = session.get(url, params = requestParams, stream = True)
    print('Saving to disk...')
    with open('out.csv', 'wb') as fd:
        for chunk in response.iter_content(chunk_size=1024):
            fd.write(chunk)

get_data()

print(tracemalloc.get_traced_memory())

tracemalloc.stop()
# asteroids_stream.py
Fetching data from NASA Asteroids API...
Saving to disk...
(85456, 215260)

We can see that peak memory usage is twice lower.

Working with SDKs

As a data engineer, we would work with cloud service providers very often. In a nutshell, SDKs are collections of service libraries that allow programmatic access to cloud services. We would want to learn and master at least one or two SDKs from market leaders such as Amazon, Azure or Google. One of the services I use very often using programmatic access is Cloud Storage. Indeed in data engineering, almost every data pipeline relies on data storage in the cloud, i.e. Google Cloud Storage or AWS S3. The most common data pipeline design would be the one created around a data bucket. I described this pattern in one of my previous stories [9].

Data pipeline design patterns

Objects created in cloud storage can trigger other ETL services. This becomes useful when orchestrating data pipelines using these events.

In this scenario we would want to be able to read and write data in cloud storage used as a data lake for our data platform.

Typical data pipeline. Image by author

In this diagram, we can see that we extract and save our data into the datalake bucket first. Then it will trigger data warehouse data ingestion and load the data into our table for OLAP analytics using the Business intelligence (BI) tool.

This code snippet below explains how to save data using AWS SDK as a stream.

# nasa.py
import boto3
import requests
import os
S3_DATA = os.environ['S3_DATA_BUCKET'] #"your.datalake.bucket"
API_KEY="fsMlsu69Y7KdMNB4P2m9sqIpw5TGuF9IuYkhURzW"
ASTEROIDS_API_URL="https://api.nasa.gov/neo/rest/v1/feed"

print('Fetching data from NASA Asteroids API...')
session = requests.Session()
url=ASTEROIDS_API_URL
apiKey=API_KEY
requestParams = {
    'api_key': apiKey,
    'start_date': '2023-04-20',
    'end_date': '2023-04-21'
}
response = session.get(url, params = requestParams, stream=True)
print(response.status_code)
# Perform Multi-part upload to AWS S3 datalake:
s3_bucket = S3_DATA # i.e. 'data.staging.aws'
s3_file_path = 'nasa/test_nasa_.csv' # i.e. "path_in_s3"
s3 = boto3.client('s3')
print('Saving to S3. Run to download: aws s3 cp s3://{}/{} ./'.format(s3_bucket,s3_file_path))
with response as part:
    part.raw.decode_content = True
    conf = boto3.s3.transfer.TransferConfig(multipart_threshold=10000, max_concurrency=4)
    s3.upload_fileobj(part.raw, s3_bucket, s3_file_path, Config=conf)

In your command line run this to extract asteroids data from NASA API:

S3_DATA_BUCKET="your.staging.databucket" python nasa.py
# Output:
# Fetching data from NASA Asteroids API...
# 200
# Saving to S3. Run to download: aws s3 cp s3://your.staging.databucket/nasa/test_nasa_.csv ./

Conclusion

This story is a summary of Python code techniques I use in ETL services almost every day. I hope you find it useful too. It helps to keep the code clean and execute data pipeline transformations efficiently. The serverless application model is a very cost-effective framework where we can deploy ETL microservices that cost almost nothing. We just need to optimise the memory usage and deploy them in an atomic manner so they run fast. It can handle almost any type of data pipeline for our data platform. A good summary of these architectural types and design patterns can be found in one of my previous stories.

Data Platform Architecture Types

Understanding basic HTTP methods is crucial in data engineering and it helps to create robust API interactions for our data pipelines. Pipelining our functions and models using joblib helps to write fast and efficient code. Pulling data from APIs using streams and running ETL tasks in a memory-efficient manner prevents resources from overprovisioning and ensures that our data services will never run out of memory. Unit tests can be run continuously using CI/CD tools. It helps to catch mistakes and human errors early before our code changes reach production environments. I hope you enjoyed reading this.

Recommended read:

[1] https://stackoverflow.com/questions/519633/lazy-method-for-reading-big-file-in-python

[2] https://docs.python.org/3/library/functions.html

[3] https://stackoverflow.com/questions/60343944/how-does-requests-stream-true-option-streams-data-one-block-at-a-time

[4] https://cloud.google.com/dataform/reference/rest/v1beta1/projects.locations.repositories.workflowConfigs/create

[5] https://joblib.readthedocs.io/en/stable/persistence.html

[6] https://medium.com/towards-data-science/test-data-pipelines-the-fun-and-easy-way-d0f974a93a59

[7] https://docs.python.org/3/library/tracemalloc.html

[8] https://levelup.gitconnected.com/infrastructure-as-code-for-beginners-a4e36c805316

[9] https://medium.com/towards-data-science/data-pipeline-design-patterns-100afa4b93e3

Tags: Big Data Data Engineering Deep Dives Learning To Code Python

Comment