Implementing "Modular RAG" with Haystack and Hypster

Intro
Keeping up with the latest in AI can be a challenge, especially when it comes to an increasingly evolving field like Retrieval Augmented Generation (RAG). With so many different solutions and implementations, one can easily feel lost.
I struggled with this myself for a long time, trying to wrap my head around every new article or "trick" to make RAG systems better in one way or another. Every new paper, tutorial or blogpost felt like something completely new and it became increasingly difficult to keep up with all the acronyms for all the newest fancy methods – HyDE, RAPTOR, CRAG, FLARE – they started to sound like Pokémon character names to me.
Then I came across this paper by Gao et al. (2024) "Modular RAG: Transforming RAG Systems into LEGO-like Reconfigurable Frameworks".

Modular RAG
This paper provides a structured approach for breaking down RAG systems into a unified framework that can encompass diverse solutions and approaches. They proposed six main components:
- Indexing: Organize your data for efficient search.
- Pre-Retrieval: Process the user's query before searching.
- Retrieval: Find the most relevant information.
- Post-Retrieval: Refine the retrieved information.
- Generation: Use an LLM to generate a response.
- Orchestration: Control the overall flow of the system.
The key insight from this paper is that a wide range of existing RAG solutions can be described using these components in a LEGO-like manner. This modularity provides a framework for understanding, designing, and navigating the process of building a RAG system with greater flexibility and clarity.
In the paper, the authors showcase how this is possible by taking examples of existing RAG solutions and expressing them using the same building blocks. For example:


I highly recommend reading this paper and the set of blog-posts by the author of the paper, Yunfan Gao: Modular RAG and RAG Flow: Part I, Part II.
Personally, this framework helped me understand how different RAG approaches relate to each other, and now I can easily make sense of new papers and implementations.
Implementing Modular RAG
So, how can we actually implement this "Modular RAG" framework?
Since it's more of a meta-framework – what does that mean in practical terms? Does it mean that we need to implement all the possible combinations of components? Or do we just build the individual components and let developers figure out how to put them together?
I believe that in most real-life situations – it's not necessary to try to cover every possible RAG configuration, but to narrow down the space of relevant configurations based on the requirements and constraints of each project.
In this tutorial, I'll show you a concrete example of how to build a configurable system using a small set of options. Hopefully, this will give you the right perspective and tools to create your own version of a Modular RAG that contains the set of relevant configurations for your specific use-case.
Let's go on to explore the two main tools we'll be using:
Haystack – The Main Components Library
haystack
is an open-source framework for building production-ready LLM applications, retrieval-augmented generative pipelines and state-of-the-art search systems that work intelligently over large document collections.
Pros:
- Great component design
- The pipeline is very flexible and allows for dynamic configurations
- Extremely (!) well documented
- The framework includes many existing implementations and integrations with Generative AI providers.
Cons:
- The pipeline interface can be a bit verbose
- Using components outside of a pipeline is not very ergonomic.
I've played around with a few different Generative AI frameworks, and Haystack was by far the easiest for me to understand, use and customize.
Hypster – Managing Configuration Spaces
hypster
is a lightweight pythonic configuration system for AI & Machine Learning projects. It offers minimal, intuitive pythonic syntax, supporting hierarchical and swappable configurations.
Introducing HyPSTER: A Pythonic Framework for Managing Configurations to Build Highly Optimized AI…
Hypster is a new open-source project that I've developed to enable a new kind of Programming paradigm for AI & ML workflows – one that moves beyond single solutions towards a "superposition of workflows" or a "hyper-workflow."
Hypster allows you to define a range of possible configurations and easily switch between them for experimentation and optimization. This makes it simple to add and customize your own configuration spaces, instantiate them with different settings, and ultimately select the optimal configuration for your production environment.
Note: Hypster is currently under active development. It is not yet recommended for production environments.
Codebase
This is an advanced tutorial. It assumes you're already familiar with the main components of RAG.
I'll break down the main parts of the codebase and provide my insights as we go.
The full and updated code is in the following repository. Don't forget to add your ⭐️
LLM
Let's start with our LLM configuration-space definition:
from hypster import config, HP
@config
def llm_config(hp: HP):
anthropic_models = {"haiku": "claude-3-haiku-20240307",
"sonnet": "claude-3-5-sonnet-20240620"}
openai_models = {"gpt-4o-mini": "gpt-4o-mini",
"gpt-4o": "gpt-4o",
"gpt-4o-latest": "gpt-4o-2024-08-06"}
model_options = {**anthropic_models, **openai_models}
model = hp.select(model_options, default="gpt-4o-mini")
temperature = hp.number(0.0)
if model in openai_models.values():
from haystack.components.generators import OpenAIGenerator
llm = OpenAIGenerator(model=model,
generation_kwargs={"temperature": temperature})
else: #anthropic
from haystack_integrations.components.generators.anthropic import AnthropicGenerator
llm = AnthropicGenerator(model=model,
generation_kwargs={"temperature": temperature})
This code snippet demonstrates a basic example of Hypster and Haystack. Using the @config
decorator, we define a function called llm_config
that encapsulates the configuration space for our LLM. This space includes options for selecting different LLM providers (Anthropic or OpenAI) and their corresponding models, as well as a parameter for controlling the temperature.
Within the llm_config
function, we use conditional logic to instantiate the appropriate Haystack component based on the selected model. This allows us to seamlessly switch between different LLMs using a selection without modifying the structure of our code.
For example, to create an Anthropic generator with the "haiku" model and a temperature of 0.5, we can instantiate the configuration as follows:
result = llm_config(final_vars=["llm"],
values={"model" : "haiku", "temperature" : 0.5})
Indexing pipeline
Let's move on to create our indexing pipeline, where we'll define how to process our input files. In our case – PDF files.
@config
def indexing_config(hp: HP):
from haystack import Pipeline
from haystack.components.converters import PyPDFToDocument
pipeline = Pipeline()
pipeline.add_component("loader", PyPDFToDocument())
Next, we'll add an optional functionality – enriching the document with an LLM summary based on the first 1000 characters of the document.
This is a nice trick where we use the first n
characters of a document and then, upon splitting the document into chunks, each chunk "inherits" this enriched information for its embeddings and response generation.
enrich_doc_w_llm = hp.select([True, False], default=True)
if enrich_doc_w_llm:
from textwrap import dedent
from haystack.components.builders import PromptBuilder
from src.haystack_utils import AddLLMMetadata
template = dedent("""
Summarize the document's main topic in one sentence (15 words max).
Then list 3-5 keywords or acronyms that best
represent its content for search purposes.
Context:
{{ documents[0].content[:1000] }}
============================
Output format:
Summary:
Keywords:
""")
llm = hp.nest("configs/llm.py")
pipeline.add_component("prompt_builder", PromptBuilder(template=template))
pipeline.add_component("llm", llm["llm"])
pipeline.add_component("document_enricher", AddLLMMetadata())
pipeline.connect("loader", "prompt_builder")
pipeline.connect("prompt_builder", "llm")
pipeline.connect("llm", "document_enricher")
pipeline.connect("loader", "document_enricher")
splitter_source = "document_enricher"
else:
splitter_source = "loader"
split_by = hp.select(["sentence", "word", "passage", "page"],
default="sentence")
splitter = DocumentSplitter(split_by=split_by,
split_length=hp.int(10),
split_overlap=hp.int(2))
pipeline.add_component("splitter", splitter)
pipeline.connect(splitter_source, "splitter")
Here we can see Haystack's pipeline in action. If the user selects enrich_doc_w_llm==True
we go on to add components and connections that enable this enrichment. In our case: PromptBuilder → LLM → AddLLMMetadata.
As you can see – it's very flexible and we can construct it on-the-fly using conditional logic. This is extremely powerful.
Now we can instantiate the configuration object in a couple of ways. For example:
results = indexing_config(values={"enrich_doc_w_llm": False,
"split_by" : "page",
"split_length" : 1})
Here we get a simple pipeline with a loader and a splitter, with the selected splitter configurations

Otherwise, we can select to enrich the document with an LLM summary:
results = indexing_config(values={"enrich_doc_w_llm": True})
Notice that Hypster takes on default values that are defined in each parameter, so there's no need to specify all the parameter selections every time. Here's an illustration of the resulting pipeline:

Notice how we casually inserted the llm_config
inside our indexing pipeline using hp.nest("configs/llm_config.py")
. This nesting ability lets us create nested configurations in a hierarchical way. We can define parameters values within the nested llm_config
using dot notation. For example:
results = indexing_config(values={"llm.model" : "gpt-4o-latest"})
This will result in instantiating an indexing pipeline with the LLM enrichment task using the OpenAI gpt-4o-2024–08
model.
So far, we've built a compact configuration space for many potential indexing pipelines.
For the sake of brevity, I will skip over the embedding configuration, where I incorporated fastembed
and jina
embeddings. If you're curious, please check out the full implementation.
Let's move on to the retrieval pipeline.
Retrieval
Haystack comes with an in-memory document store for fast experimentation. It includes an embedding retriever and a BM25 retriever. In this section – we'll build a configuration space that enables using either a BM25, an embedding retriever or both.
@config
def in_memory_retrieval(hp: HP):
from haystack import Pipeline
from haystack.document_stores.in_memory import InMemoryDocumentStore
from src.haystack_utils import PassThroughDocuments, PassThroughText
pipeline = Pipeline()
# utility components for the first and last parts of the pipline
pipeline.add_component("query", PassThroughText())
pipeline.add_component("retrieved_documents", PassThroughDocuments())
retrieval_types = hp.multi_select(["bm25", "embeddings"],
default=["bm25", "embeddings"])
if len(retrieval_types) == 0:
raise ValueError("At least one retrieval type must be selected.")
document_store = InMemoryDocumentStore()
if "embedding" in retrieval_types:
from haystack.components.retrievers.in_memory import InMemoryEmbeddingRetriever
embedding_similarity_function = hp.select(["cosine", "dot_product"], default="cosine")
document_store.embedding_similarity_function = embedding_similarity_function
pipeline.add_component("embedding_retriever", InMemoryEmbeddingRetriever(document_store=document_store))
if "bm25" in retrieval_types:
from haystack.components.retrievers.in_memory import InMemoryBM25Retriever
bm25_algorithm = hp.select(["BM25Okapi", "BM25L", "BM25Plus"], default="BM25L")
document_store.bm25_algorithm = bm25_algorithm
pipeline.add_component("bm25_retriever", InMemoryBM25Retriever(document_store=document_store))
pipeline.connect("query", "bm25_retriever")
if len(retrieval_types) == 2: # both bm25 and embeddings
from haystack.components.joiners.document_joiner import DocumentJoiner
bm25_weight = hp.number(0.5)
join_mode = hp.select(["distribution_based_rank_fusion",
"concatenate", "merge",
"reciprocal_rank_fusion"],
default="distribution_based_rank_fusion")
joiner = DocumentJoiner(join_mode=join_mode, top_k=hp.int(10),
weights=[bm25_weight, 1-bm25_weight])
pipeline.add_component("document_joiner", joiner)
pipeline.connect("bm25_retriever", "document_joiner")
pipeline.connect("embedding_retriever", "document_joiner")
pipeline.connect("document_joiner", "retrieved_documents")
elif "embeddings" in retrieval_types: #only embeddings retriever
pipeline.connect("embedding_retriever", "retrieved_documents")
else: # only bm25
pipeline.connect("bm25_retriever", "retrieved_documents")
Here, we're using a couple of "tricks" to make it work. First of all, we use hp.multi_select
which allows us to select multiple options from the options. Second, we add "helper" components from the start and end of the pipeline (PassThroughText, PassThroughDocuments) to make sure that any selection will start with query
and end with retrieved_documents
and the rest is relatively straightforward.
A couple of example instantiations would be:
in_memory_retrieval(values={"retrieval_types": ["bm25"],
"bm25_algorithm": "BM25Okapi"})

And:
in_memory_retrieval(values={"join_mode": "reciprocal_rank_fusion"})

In the full implementation, I've added a Qdrant vector store, an optional reranking step, and a final generation pipeline. These are all meant as examples to show the possibilities of adding and customizing the different components in these pipelines and you can find them as well in the full repository.
Eventually, we have the main config that binds all of these settings together:
@config
def rag_config(hp: HP):
indexing = hp.nest("configs/indexing.py")
indexing_pipeline = indexing["pipeline"]
embedder_type = hp.select(["fastembed", "jina"], default="fastembed")
match embedder_type:
case "fastembed":
embedder = hp.nest("configs/fast_embed.py")
case "jina":
embedder = hp.nest("configs/jina_embed.py")
indexing_pipeline.add_component("doc_embedder", embedder["doc_embedder"])
document_store_type = hp.select(["in_memory", "qdrant"],
default="in_memory")
match document_store_type:
case "in_memory":
retrieval = hp.nest("configs/in_memory_retrieval.py")
case "qdrant":
retrieval = hp.nest("configs/qdrant_retrieval.py",
values={"embedding_dim": embedder["embedding_dim"]})
from haystack.components.writers import DocumentWriter
from haystack.document_stores.types import DuplicatePolicy
document_writer = DocumentWriter(retrieval["document_store"],
policy=DuplicatePolicy.OVERWRITE)
indexing_pipeline.add_component("document_writer", document_writer)
indexing_pipeline.connect("splitter", "doc_embedder")
indexing_pipeline.connect("doc_embedder", "document_writer")
# Retrieval + Generation Pipeline
pipeline = retrieval["pipeline"]
pipeline.add_component("text_embedder", embedder["text_embedder"])
pipeline.connect("query", "text_embedder")
pipeline.connect("text_embedder", "embedding_retriever.query_embedding")
from src.haystack_utils import PassThroughDocuments
pipeline.add_component("docs_for_generation", PassThroughDocuments())
use_reranker = hp.select([True, False], default=True)
if use_reranker:
reranker = hp.nest("configs/reranker.py")
pipeline.add_component("reranker", reranker["reranker"])
pipeline.connect("retrieved_documents", "reranker")
pipeline.connect("reranker", "docs_for_generation")
pipeline.connect("query", "reranker")
else:
pipeline.connect("retrieved_documents", "docs_for_generation")
response = hp.nest("configs/response.py")
from haystack.components.builders import PromptBuilder
pipeline.add_component("prompt_builder", PromptBuilder(template=response["template"]))
pipeline.add_component("llm", response["llm"])
pipeline.connect("prompt_builder", "llm")
pipeline.connect("query.text", "prompt_builder.query")
pipeline.connect("docs_for_generation", "prompt_builder")
from here we can define pretty much anything we want inside any of the sub-components. For example:
results = rag_config(values={"indexing.enrich_doc_w_llm": True,
"indexing.llm.model": "gpt-4o-mini",
"document_store": "qdrant",
"embedder_type": "fastembed",
"reranker.model": "tiny-bert-v2",
"response.llm.model": "sonnet",
"indexing.splitter.split_length": 6,
"reranker.top_k": 3})
And we've instantiated a concrete set of working pipelines:

We can now execute them sequentially:
indexing_pipeline = results["indexing_pipeline"]
indexing_pipeline.warm_up()
file_paths = ["data/raw/modular_rag.pdf", "data/raw/enhancing_rag.pdf"]
for file_path in file_paths: # this can be parallelized
indexing_pipeline.run({"loader": {"sources": [file_path]}})
query = "What are the 6 main modules of the modular RAG framework?"
pipeline = results["pipeline"]
pipeline.warm_up()
response = pipeline.run({"query": {"text": query}})
print("Response: ", response["llm"]["replies"][0])
Response: The six main modules of the modular RAG framework are
Indexing, Pre-retrieval, Retrieval, Post-retrieval, Generation,
and Orchestration.
Supporting quote from Document 1: "Based on the current stage of RAG
development, we have established six main modules: Indexing,
Pre-retrieval, Retrieval, Post-retrieval, Generation, and Orchestration."
Great Response!