How to Create a Custom Execution Strategy#
Note
To learn how to use a built-in asynchronous execution strategy, see How to Ingest Documents Asynchronously.
In Ragbits, document processing during ingestion is controlled by a component known as "processing execution strategy". It doesn't deal with the actual processing of documents, but rather, it orchestrates how the processing is executed.
Ragbits provides several built-in execution strategies that can be easily interchanged. You can also create your own custom execution strategy to fulfill your specific needs. This guide will show you how to develop a custom execution strategy using a somewhat impractical example of a strategy that processes documents one by one, but with a delay between each document.
Implementing a Custom Execution Strategy#
To create a custom execution strategy, you need to create a new class that inherits from ProcessingExecutionStrategy
and implement the abstract method execute
. This method should take a list of documents and process them asynchronously. It should also implement the abstract method process_documents
.
While implementing the process_documents
method, you can use the built-in process_document
method, which has the same signature and performs the actual processing of a single document.
import asyncio
from ragbits.document_search.ingestion.processor_strategies import ProcessingExecutionStrategy
class DelayedExecutionStrategy(ProcessingExecutionStrategy):
async def process_documents(
self,
documents: Sequence[DocumentMeta | Document | Source],
processor_router: DocumentProcessorRouter,
processor_overwrite: BaseProvider | None = None,
) -> list[Element]:
elements = []
for document in documents:
await asyncio.sleep(1)
element = await self.process_document(document, processor_router, processor_overwrite)
elements.append(element)
return elements
Implementing an Advanced Custom Execution Strategy#
Alternatively, instead of using the process_document
method, you can process documents directly using the processor_router
and processor_overwrite
parameters. This gives you more control over the processing of documents.
import asyncio
from ragbits.document_search.ingestion.processor_strategies import ProcessingExecutionStrategy
class DelayedExecutionStrategy(ProcessingExecutionStrategy):
async def process_documents(
self,
documents: Sequence[DocumentMeta | Document | Source],
processor_router: DocumentProcessorRouter,
processor_overwrite: BaseProvider | None = None,
) -> list[Element]:
elements = []
for document in documents:
# Convert the document to DocumentMeta
document_meta = await self.to_document_meta(document)
# Get the processor for the document
processor = processor_overwrite or processor_router.get_processor(document)
await asyncio.sleep(1)
element = await processor.process(document_meta)
elements.append(element)
return elements
Using the Custom Execution Strategy#
To use your custom execution strategy, you need to specify it when creating the DocumentSearch
instance:
from ragbits.core.embeddings.litellm import LiteLLMEmbeddings
from ragbits.core.vector_stores.in_memory import InMemoryVectorStore
from ragbits.document_search import DocumentSearch
from ragbits.document_search.documents.document import DocumentMeta
documents = [
DocumentMeta.create_text_document_from_literal("Example document 1"),
DocumentMeta.create_text_document_from_literal("Example document 2"),
]
embedder = LiteLLMEmbeddings(
model="text-embedding-3-small",
)
vector_store = InMemoryVectorStore()
processing_strategy = DelayedExecutionStrategy()
document_search = DocumentSearch(
embedder=embedder,
vector_store=vector_store,
processing_strategy=processing_strategy
)