Data Pipelines with Polars: Step-by-Step Guide

Introduction
The aim of this post is to explain and show you how to build data pipelines with Polars. It puts together and uses all the knowledge you've got from the previous two parts of this series, so if you haven't gone through them yet, I highly recommend you start there and come back here later.
EDA with Polars: Step-by-Step Guide for Pandas Users (Part 1)
EDA with Polars: Step-by-Step Guide to Aggregate and Analytic Functions (Part 2)
Setup
You can find all the code in this repository, so don't forget to clone/pull and star it. In particular, we'll be exploring this file which means that we'll finally move away from notebooks into the real world!
Data used in this project can be downloaded from Kaggle (CC0: Public Domain). It's the same YouTube trending dataset that was used in the previous two parts. I assume that you already have Polars installed, so just make sure to update it to the latest version using pip install -U polars
.
Data Pipelines
Put simply, a data pipeline is an automated sequence of steps that pulls the data from one or multiple locations, applies processing steps and saves the processed data elsewhere making it available for further use.
Pipelines in Polars
Polars way of working with data lends itself quite nicely to building scalable data pipelines. First of all, the fact that we can chain the methods so easily allows for some fairly complicated pipelines to be written quite elegantly.
For example, let's say we want to find out which trending videos had the most views in each month of 2018. Below you can see a full pipeline to calculate this metric and to save it as a parquet file.
import polars as pl
csv_path = "./youtube/GBvideos.csv"
pl.read_csv(csv_path).with_columns(
# Original date is in string format like 17.01.01
pl.col("trending_date").str.to_date(format="%y.%d.%m")
).filter(pl.col("trending_date").dt.year() == 2018).with_columns(
pl.col("views")
.rank(descending=True)
# Rank is calculated over a month
.over(pl.col("trending_date").dt.month())
.alias("monthly_rank")
).filter(
pl.col("monthly_rank") == 1
).select(
pl.col("trending_date"), pl.col("title"), pl.col("channel_title"), pl.col("views")
).write_parquet(
"top_monthly_videos.parquet"
)
Quite neat, right? If you know SQL this is very easy to read and understand. But can we make it even better? Of course, with Polars .pipe()
method. This method gives us a structured way to apply sequential functions to the Data Frame. For this to work, let's refactor the code above into functions.
def process_date(df, date_column, format):
result = df.with_columns(pl.col(date_column).str.to_date(format))
return result
def filter_year(df, date_column, year):
result = df.filter(pl.col(date_column).dt.year() == year)
return result
def get_first_by_month(df, date_column, metric):
result = df.with_columns(
pl.col(metric)
.rank(method="ordinal", descending=True)
.over(pl.col(date_column).dt.month())
.alias("rank")
).filter(pl.col("rank") == 1)
return result
def select_data_to_write(df, columns):
result = df.select([pl.col(c) for c in columns])
return result
Notice that these functions take a Polars DataFrame as input (together with some other arguments) and output already altered Polars Data Frame. Chaining these methods together is a piece of cake with .pipe()
.
(
pl.read_csv(csv_path)
.pipe(process_date, date_column="trending_date", format="%y.%d.%m")
.pipe(filter_year, date_column="trending_date", year=2018)
.pipe(get_first_by_month, date_column="trending_date", metric="views")
.pipe(
select_data_to_write,
columns=["trending_date", "title", "channel_title", "views"],
)
).write_parquet("top_monthly_videos.parquet")
First of all, the reformatted code is much easier to understand. Second, separation of concerns is in general a good programming principle to follow as it allows easier debugging and cleaner code. For this simple example, it might be an overkill to modularise the pipeline but you'll see how useful it is in the larger example in the next section. Now, let's make the whole thing faster using lazy mode.
Lazy mode allows us to write the queries and pipelines, put them all together, and let the backend engine do its optimisation magic. For example, the code written above is definitely not optimal. I've put the column selection as the last step which means that the size of processed data is unnecessarily large. Lucky for us, Polars is smart enough to realise this, so it will optimise the code. Also, we only need to change two small things in the code to get the speed up which is incredible. First of all, we change pl.read_csv
to pl.scan_csv
to read the data in lazy mode. Then, at the end of the query, we put .collect()
to tell Polars that we want to execute the optimised query
(
pl.scan_csv(csv_path).pipe(process_date, date_column="trending_date", format="%y.%d.%m")
.pipe(filter_year, date_column="trending_date", year=2018)
.pipe(get_first_by_month, date_column="trending_date", metric="views")
.pipe(
select_data_to_write,
columns=["trending_date", "title", "channel_title", "views"],
)
).collect().write_parquet("top_monthly_videos.parquet")
On my machine, I've got the ~ 3x speedup which is impressive given we've made two very simple edits. Now that you've got the concept of piping and lazy evaluations, let's move on to a more complicated example.
Data Pipeline for Machine Learning Features
Warning: There's a lot of text and a lot of code! Sections should be followed sequentially since they build up the pipeline.
Given the dataset that we have at hand (YouTube Trending Videos), let's build features that can be used in predicting how long a video will be in trending. Even though this might sound simple, the pipeline to create them is going to be quite complex. The final format of dataset should have one row per video ID, features that are available as soon as the video gets into trending, and the actual number of days that the video stayed in trending (target).

The features that could be useful in our prediction task are:
- Video characteristics (e.g. category)
- Number of views, like, comments, etc. at the moment of entry into Trending
- Past performance of the channel in Trending (e.g. number of trending videos in the last 7 days)
- General Trending characteristics (e.g. average time in trending for all the videos in the last 30 days)
Below you can see a diagram representation of the required pipeline to create this dataset (make sure to zoom in).

I know it's a lot to take in, so let's eat this elephant a bite at a time. Below you can find the descriptions and code for every pipeline step. In addition, this pipeline will be parametrised using a YAML configuration file, so you'll find the configuration parameters for every steps as well. Here's how you'd read a YAML file named pipe_config.yaml
which you can find in the repo.
import yaml
# Read config
with open("pipe_config.yaml", "r") as file:
pipe_config = yaml.safe_load(file)
So, for every step of the pipeline you'll find:
- Description of the step
- Relevant functions
- Relevant configuration parameters
- Code to run the pipeline up to this step
This way, we'll slowly build up to the full pipeline and you'll have an in-depth understanding of what's going on and how to create something similar for your own data.
Read Data
The aim of this step is self explanatory – to read in the dataset for further processing. We have two inputs – a csv file with the main data and a json with the data about category mappings. The parameters for this step are as follows:
data_path: "./youtube/GBvideos.csv"
category_map_path: "./youtube/GB_category_id.json"
No need to write a function to read the csv data (since it alreday exists in Polars), so the only function we write is for reading in the category mappings.
def read_category_mappings(path: str) -> Dict[int, str]:
with open(path, "r") as f:
categories = json.load(f)
id_to_category = {}
for c in categories["items"]:
id_to_category[int(c["id"])] = c["snippet"]["title"]
return id_to_category
With this function, the code to read in the required files is quite simple.
# Create mapping
id_to_category = read_category_mappings(pipe_config["category_map_path"])
col_mappings = {"category_id": id_to_category}
# Pipeline
output_data = pl.scan_csv(pipe_config["data_path"]).collect()
Now, let's move on to a very unexciting yet crucial step – data cleaning.
Data Cleaning
This dataset is already quite clean but we need to do some additional pre-processing to the dates and the category columns.
trending_date
andpublish_time
need to be formatted aspl.datetime
category_id
needs to be mapped from ID to the actual category name
Polars need to know in which format the dates will be provided, so it's best to encode the data formats with corresponding date columns in the pipe_config.yaml
file to make it clear and easy to change.
# Pre-processing config
date_column_format:
trending_date: "%y.%d.%m"
publish_time: "%Y-%m-%dT%H:%M:%S%.fZ"
Since we want to use Polars pipelines to modularise the code, we need to create two functions – parse_dates
and map_dict_columns
which will perform two required transformations. However, here's the catch – splitting these operations into two steps makes the code slower because Polars can't use parallelisation efficiently. You can test this yourself by timing the execution of these two Polars expressions.
slow = df.with_columns(
# Process dates
pl.col("trending_date").str.to_date("%y.%d.%m"),
pl.col("publish_time").str.to_date("%Y-%m-%dT%H:%M:%S%.fZ"),
).with_columns(
# Then process category
pl.col("category_id").map_dict(id_to_category)
)
fast = df.with_columns(
# Process all together
pl.col("trending_date").str.to_date("%y.%d.%m"),
pl.col("publish_time").str.to_date("%Y-%m-%dT%H:%M:%S%.fZ"),
pl.col("category_id").map_dict(id_to_category)
)
For me, the first expression was ~2x slower which is very significant. So what do we do then? Well, here's the secret:
We should build up the expressions before passing them through the
.with_columns
method.
Hence, the functions parse_dates
and map_dict_columns
should return lists of expressions instead of the transformed Data Frames. These expressions can be combined and applied in the final cleaning function that we'll call clean_data
.
def parse_dates(date_cols: Dict[str, str]) -> List[pl.Expr]:
expressions = []
for date_col, fmt in date_cols.items():
expressions.append(pl.col(date_col).str.to_date(format=fmt))
return expressions
def map_dict_columns(
mapping_cols: Dict[str, Dict[str | int, str | int]]
) -> List[pl.Expr]:
expressions = []
for col, mapping in mapping_cols.items():
expressions.append(pl.col(col).map_dict(mapping))
return expressions
def clean_data(
df: pl.DataFrame,
date_cols_config: Dict[str, str],
mapping_cols_config: Dict[str, Dict[str | int, str | int]],
) -> pl.DataFrame:
parse_dates_expressions = parse_dates(date_cols=date_cols_config)
mapping_expressions = map_dict_columns(mapping_cols_config)
df = df.with_columns(parse_dates_expressions + mapping_expressions)
return df
As you can see, now we only have one .with_columns
operation which makes the code more optimised. Note that all the arguments to the functions are provided as dictionaries. This is because YAML gets read in as a dictionary. Now, let's add the cleaning step to the pipeline.
# Create mapping
id_to_category = read_category_mappings(pipe_config["category_map_path"])
col_mappings = {"category_id": id_to_category}
# Read in configs
date_column_format = pipe_config["date_column_format"]
# Pipeline
output_data = pl.scan_csv(pipe_config["data_path"]).pipe(
clean_data, date_column_format, col_mappings
).collect()
Clean, modular, fast – what not to like? Let's move on to the next step.
Basic Feature Engineering
This step does some basic feature engineering on top of the clean data, namely:
- Calculates ratio features – likes to dislikes, likes to views and comments to views
- Calculates difference in days between publishing and trending
- Extracts weekdays from
trending_date
column
Let's parametrise the calculation of these features in the configuration file. We want to specify the name of a feature we want to create and the corresponding columns in the dataset that should be used for calculation.
# Feature engineering config
ratio_features:
# feature name
likes_to_dislikes:
# features used in calculation
- likes
- dislikes
likes_to_views:
- likes
- views
comments_to_views:
- comment_count
- views
difference_features:
days_to_trending:
- trending_date
- publish_time
date_features:
trending_date:
- weekday
Logic for the functions is still the same – build up expressions and pass them to .with_columns
method. Hence, the functions ratio_features
, diff_features
and date_features
are all called within the main function named basic_feature_engineering
.
def ratio_features(features_config: Dict[str, List[str]]) -> List[pl.Expr]:
expressions = []
for name, cols in features_config.items():
expressions.append((pl.col(cols[0]) / pl.col(cols[1])).alias(name))
return expressions
def diff_features(features_config: Dict[str, List[str]]) -> List[pl.Expr]:
expressions = []
for name, cols in features_config.items():
expressions.append((pl.col(cols[0]) - pl.col(cols[1])).alias(name))
return expressions
def date_features(features_config: Dict[str, List[str]]) -> List[pl.Expr]:
expressions = []
for col, features in features_config.items():
if "weekday" in features:
expressions.append(pl.col(col).dt.weekday().alias(f"{col}_weekday"))
if "month" in features:
expressions.append(pl.col(col).dt.month().alias(f"{col}_month"))
if "year" in features:
expressions.append(pl.col(col).dt.year().alias(f"{col}_year"))
return expressions
def basic_feature_engineering(
data: pl.DataFrame,
ratios_config: Dict[str, List[str]],
diffs_config: Dict[str, List[str]],
dates_config: Dict[str, List[str]],
) -> pl.DataFrame:
ratio_expressions = ratio_features(ratios_config)
date_diff_expressions = diff_features(diffs_config)
date_expressions = date_features(dates_config)
data = data.with_columns(
ratio_expressions + date_diff_expressions + date_expressions
)
return data
Similar to the previous step, we just pass the main function to the pipe
and provide to it all the required configurations as arguments.
# Create mapping
id_to_category = read_category_mappings(pipe_config["category_map_path"])
col_mappings = {"category_id": id_to_category}
# Read in configs
date_column_format = pipe_config["date_column_format"]
ratios_config = pipe_config["ratio_features"]
diffs_config = pipe_config["difference_features"]
dates_config = pipe_config["date_features"]
# Pipeline
output_data = (
pl.scan_csv(pipe_config["data_path"])
.pipe(clean_data, date_column_format, col_mappings)
.pipe(basic_feature_engineering, ratios_config, diffs_config, dates_config)
).collect()
Nice, we're half way through the pipeline! Now, let's transform the dataset into the right format and finally calculate our target – days in trending.
Data Transformation
Just a reminder that the original dataset has multiple entries per video since it details information for every day in trending. If a video stayed five days in trending, this video would appear five times in this dataset. Our goal is to end up with the dataset that has just one entry per video (refer to the image below).

We can achieve this using a combination of .groupby
and .agg
methods. The only configuration parameter to specify here is for filtering the videos that took too long to get into trending since these videos are the outliers identified in the previous part of this series. After we get the table with video_ids
and the corresponding target (days in trending) we also need to not forget to join the features from the original dataset since they won't be carried over during the groupby
operation. Hence, we'll also need to specify which features to join and which columns should be the joining keys.
# Filter videos
max_time_to_trending: 60
# Features to join to the transformed data
base_columns:
- views
- likes
- dislikes
- comment_count
- comments_disabled
- ratings_disabled
- video_error_or_removed
- likes_to_dislikes
- likes_to_views
- comments_to_views
- trending_date_weekday
- channel_title
- tags
- description
- category_id
# Use these columns to join transformed data with original
join_columns:
- video_id
- trending_date
To perform the required step, we'll design two functions – join_original_features
and create_target_df
.
def join_original_features(
main: pl.DataFrame,
original: pl.DataFrame,
main_join_cols: List[str],
original_join_cols: List[str],
other_cols: List[str],
) -> pl.DataFrame:
original_features = original.select(original_join_cols + other_cols).unique(
original_join_cols
) # unique ensures one row per video + date
main = main.join(
original_features,
left_on=main_join_cols,
right_on=original_join_cols,
how="left",
)
return main
def create_target_df(
df: pl.DataFrame,
time_to_trending_thr: int,
original_join_cols: List[str],
other_cols: List[str],
) -> pl.DataFrame:
# Create a DF with video ID per row and corresponding days to trending and days in trending (target)
target = (
df.groupby(["video_id"])
.agg(
pl.col("days_to_trending").min().dt.days(),
pl.col("trending_date").min().dt.date().alias("first_day_in_trending"),
pl.col("trending_date").max().dt.date().alias("last_day_in_trending"),
# our TARGET
(pl.col("trending_date").max() - pl.col("trending_date").min()).dt.days().alias("days_in_trending"),
)
.filter(pl.col("days_to_trending") <= time_to_trending_thr)
)
# Join features to the aggregates
target = join_original_features(
main=target,
original=df,
main_join_cols=["video_id", "first_day_in_trending"],
original_join_cols=original_join_cols,
other_cols=other_cols,
)
return target
Notice that the groupby
operation to create the target and the join_original_features
function are both run in create_target_df
function since they both use the original dataset as input. This means that even though we have an intermediate output (target
variable), we can still run this function in a pipe
method with no issues.
# Create mapping
id_to_category = read_category_mappings(pipe_config["category_map_path"])
col_mappings = {"category_id": id_to_category}
# Read in configs
date_column_format = pipe_config["date_column_format"]
ratios_config = pipe_config["ratio_features"]
diffs_config = pipe_config["difference_features"]
dates_config = pipe_config["date_features"]
# Pipeline
output_data = (
pl.scan_csv(pipe_config["data_path"])
.pipe(clean_data, date_column_format, col_mappings)
.pipe(basic_feature_engineering, ratios_config, diffs_config, dates_config)
.pipe(
create_target_df,
time_to_trending_thr=pipe_config["max_time_to_trending"],
original_join_cols=join_cols,
other_cols=base_features,
)
).collect()
For the final step, let's generate more advanced features using dynamic and rolling aggregates (covered in-depth in the last post).
Advanced Aggregates
This step is responsible to generating time-based aggregates. The only configuration that we need to provide are the windows for the aggregates.
aggregate_windows:
- 7
- 30
- 180
Rolling Aggregates
Let's starting with rolling features. Below you can see an example with two lagged rolling features for a channel abc
for a window of two days.

Rolling features are extremely easy in Polars, all you need is .groupby_rolling()
method and some aggregates within the .agg()
namespace. The aggregates that can be useful are:
- Number of previous videos in trending
- The average number of days in trending for the previously trending videos
- The maximum number of days in trending for the previously trending videos
With this in mind, let's build a function named build_channel_rolling
that can take the required period as input, this way we'll be able to to easily create rolling features for any period that we want, and outputs these required aggregates. The by
argument should be channel_title
because we want to create aggregates by channel and the index column should be first_day_in_trending
since this is our main date columns. These two columns will also be used to join these rolling aggregates to the original Data Frame.
def build_channel_rolling(df: pl.DataFrame, date_col: str, period: int) -> pl.DataFrame:
channel_aggs = (
df.sort(date_col)
.groupby_rolling(
index_column=date_col,
period=f"{period}d",
by="channel_title",
closed="left", # only left to not include the actual day
)
.agg(
pl.col("video_id").n_unique().alias(f"channel_num_trending_videos_last_{period}_days"),
pl.col("days_in_trending").max().alias(f"channel_max_days_in_trending_{period}_days"),
pl.col("days_in_trending").mean().alias(f"channel_avg_days_in_trending_{period}_days"),
)
.fill_null(0)
)
return channel_aggs
def add_rolling_features(
df: pl.DataFrame, date_col: str, periods: List[int]
) -> pl.DataFrame:
for period in periods:
rolling_features = build_channel_rolling(df, date_col, period)
df = df.join(rolling_features, on=["channel_title", "first_day_in_trending"])
return df
The add_rolling_features
is a wrapper function that can be passed to our pipeline. It generates and joins the aggregates for the periods specified in the config. Now, let's move on to the final feature generation step.
Period Aggregates
These aggregates are similar to the rolling ones, but they aim to measure general behaviour in the Trending tab.

If rolling aggregates aimed to capture the past behaviour of a channel, these aggregates will capture general trends. This might be useful since the algorithm that's used to determine who gets into Trending and for how long changes constantly. Hence, the aggregates that we want to create are:
- Number videos in trending last period
- Average days in trending in the last period
- Maximum days in trending in the last period
The logic for functions is the same – we'll create a function that builds these aggregates and a wrapper function that will build and join the aggregates for all the periods. Notice that we don't specify by
parameter because we want to calculate these features for all the videos per day. Also notice that we need to use shift
on the aggregates since we want to use the features for the last period and not the current one.
def build_period_features(df: pl.DataFrame, date_col: str, period: int) -> pl.DataFrame:
general_aggs = (
df.sort(date_col)
.groupby_dynamic(
index_column=date_col,
every="1d",
period=f"{period}d",
closed="left",
)
.agg(
pl.col("video_id").n_unique().alias(f"general_num_trending_videos_last_{period}_days"),
pl.col("days_in_trending").max().alias(f"general_max_days_in_trending_{period}_days"),
pl.col("days_in_trending").mean().alias(f"general_avg_days_in_trending_{period}_days"),
)
.with_columns(
# shift match values with previous period
pl.col(f"general_num_trending_videos_last_{period}_days").shift(period),
pl.col(f"general_max_days_in_trending_{period}_days").shift(period),
pl.col(f"general_avg_days_in_trending_{period}_days").shift(period),
)
.fill_null(0)
)
return general_aggs
def add_period_features(
df: pl.DataFrame, date_col: str, periods: List[int]
) -> pl.DataFrame:
for period in periods:
rolling_features = build_period_features(df, date_col, period)
df = df.join(rolling_features, on=["first_day_in_trending"])
return df
Finally, let's put this all together into our pipeline!
# Create mapping
id_to_category = read_category_mappings(pipe_config["category_map_path"])
col_mappings = {"category_id": id_to_category}
# Read in configs
date_column_format = pipe_config["date_column_format"]
ratios_config = pipe_config["ratio_features"]
diffs_config = pipe_config["difference_features"]
dates_config = pipe_config["date_features"]
output_data = (
pl.scan_csv(pipe_config["data_path"])
.pipe(clean_data, date_column_format, col_mappings)
.pipe(basic_feature_engineering, ratios_config, diffs_config, dates_config)
.pipe(
create_target_df,
time_to_trending_thr=pipe_config["max_time_to_trending"],
original_join_cols=pipe_config["join_columns"],
other_cols=pipe_config["base_columns"],
)
.pipe(
add_rolling_features,
"first_day_in_trending",
pipe_config["aggregate_windows"],
)
.pipe(
add_period_features,
"first_day_in_trending",
pipe_config["aggregate_windows"],
)
).collect()
I hope you're as excited as I am because we're almost there! The final step – writing out data.
Write Data
Saving transformed data is a piece of cake since we can use .save_parquet()
right after the collect()
operation. Below you can see full code that the file data_preparation_pipeline.py
contains.
def pipeline():
"""Pipeline that reads, cleans, and transofrms data into
the format we need for modelling
"""
# Read and unwrap the config
with open("pipe_config.yaml", "r") as file:
pipe_config = yaml.safe_load(file)
date_column_format = pipe_config["date_column_format"]
ratios_config = pipe_config["ratio_features"]
diffs_config = pipe_config["difference_features"]
dates_config = pipe_config["date_features"]
id_to_category = read_category_mappings(pipe_config["category_map_path"])
col_mappings = {"category_id": id_to_category}
output_data = (
pl.scan_csv(pipe_config["data_path"])
.pipe(clean_data, date_column_format, col_mappings)
.pipe(basic_feature_engineering, ratios_config, diffs_config, dates_config)
.pipe(
create_target_df,
time_to_trending_thr=pipe_config["max_time_to_trending"],
original_join_cols=pipe_config["join_columns"],
other_cols=pipe_config["base_columns"],
)
.pipe(
add_rolling_features,
"first_day_in_trending",
pipe_config["aggregate_windows"],
)
.pipe(
add_period_features,
"first_day_in_trending",
pipe_config["aggregate_windows"],
)
).collect()
return output_data
if __name__ == "__main__":
t0 = time.time()
output = pipeline()
t1 = time.time()
print("Pipeline took", t1 - t0, "seconds")
print("Output shape", output.shape)
print("Output columns:", output.columns)
output.write_parquet("./data/modelling_data.parquet")
We can run this pipeline like any other Python file.
python data_preparation_pipeline.py
Pipeline took 0.3374309539794922 seconds
Output shape (3196, 38)
Output columns: [
'video_id', 'days_to_trending', 'first_day_in_trending',
'last_day_in_trending', 'days_in_trending', 'views', 'likes', 'dislikes',
'comment_count', 'comments_disabled', 'ratings_disabled',
'video_error_or_removed', 'likes_to_dislikes', 'likes_to_views',
'comments_to_views', 'trending_date_weekday', 'channel_title',
'tags', 'description', 'category_id', 'channel_num_trending_videos_last_7_days',
'channel_max_days_in_trending_7_days', 'channel_avg_days_in_trending_7_days',
'channel_num_trending_videos_last_30_days', 'channel_max_days_in_trending_30_days',
'channel_avg_days_in_trending_30_days', 'channel_num_trending_videos_last_180_days',
'channel_max_days_in_trending_180_days', 'channel_avg_days_in_trending_180_days',
'general_num_trending_videos_last_7_days', 'general_max_days_in_trending_7_days',
'general_avg_days_in_trending_7_days', 'general_num_trending_videos_last_30_days',
'general_max_days_in_trending_30_days', 'general_avg_days_in_trending_30_days',
'general_num_trending_videos_last_180_days', 'general_max_days_in_trending_180_days',
'general_avg_days_in_trending_180_days'
]
On my laptop these steps take less than a half a second which is impressive given how many operations we've chained together and how many features are generated. Most importantly, the pipeline looks clean, is very easy to debug, and can be extended/changed/cut in no time. Great job us!
Conclusion
If you've followed through the steps and got here – amazing job! Here's a brief summary of what you should've learned in this post:
- How to chain multiple operations together into a pipeline
- How to make this pipeline efficient
- How to structure your pipeline project and parametrise it using YAML file
Make sure to apply these learnings to your own data. I recommend starting small (2–3 steps) and then expanding the pipeline as your needs grow. Make sure to keep it modular, lazy, and group as many operations into .with_columns()
as possible to ensure proper parallelisation.