Skip to content

Execution Strategies#

ragbits.document_search.ingestion.processor_strategies.ProcessingExecutionStrategy #

Bases: ABC

Base class for processing execution strategies that define how documents are processed to become elements.

Processing execution strategies are responsible for processing documents using the appropriate processor, which means that they don't usually determine the business logic of the processing itself, but rather how the processing is executed.

from_config classmethod #

from_config(config: dict) -> Self

Creates and returns an instance of the ProcessingExecutionStrategy subclass from the given configuration.

PARAMETER DESCRIPTION
config

A dictionary containing the configuration for initializing the instance.

TYPE: dict

RETURNS DESCRIPTION
Self

An initialized instance of the ProcessingExecutionStrategy subclass.

Source code in packages/ragbits-document-search/src/ragbits/document_search/ingestion/processor_strategies/base.py
@classmethod
def from_config(cls, config: dict) -> Self:
    """
    Creates and returns an instance of the ProcessingExecutionStrategy subclass from the given configuration.

    Args:
        config: A dictionary containing the configuration for initializing the instance.

    Returns:
        An initialized instance of the ProcessingExecutionStrategy subclass.
    """
    return cls(**config)

to_document_meta async staticmethod #

to_document_meta(document: DocumentMeta | Document | Source) -> DocumentMeta

Convert a document, document meta or source to a document meta object.

PARAMETER DESCRIPTION
document

The document to convert.

TYPE: DocumentMeta | Document | Source

RETURNS DESCRIPTION
DocumentMeta

The document meta object.

Source code in packages/ragbits-document-search/src/ragbits/document_search/ingestion/processor_strategies/base.py
@staticmethod
async def to_document_meta(document: DocumentMeta | Document | Source) -> DocumentMeta:
    """
    Convert a document, document meta or source to a document meta object.

    Args:
        document: The document to convert.

    Returns:
        The document meta object.
    """
    if isinstance(document, Source):
        return await DocumentMeta.from_source(document)
    elif isinstance(document, DocumentMeta):
        return document
    else:
        return document.metadata

process_document async #

process_document(document: DocumentMeta | Document | Source, processor_router: DocumentProcessorRouter, processor_overwrite: BaseProvider | None = None) -> list[Element]

Process a single document and return the elements.

PARAMETER DESCRIPTION
document

The document to process.

TYPE: DocumentMeta | Document | Source

processor_router

The document processor router to use.

TYPE: DocumentProcessorRouter

processor_overwrite

Forces the use of a specific processor, instead of the one provided by the router.

TYPE: BaseProvider | None DEFAULT: None

RETURNS DESCRIPTION
list[Element]

A list of elements.

Source code in packages/ragbits-document-search/src/ragbits/document_search/ingestion/processor_strategies/base.py
async def process_document(
    self,
    document: DocumentMeta | Document | Source,
    processor_router: DocumentProcessorRouter,
    processor_overwrite: BaseProvider | None = None,
) -> list[Element]:
    """
    Process a single document and return the elements.

    Args:
        document: The document to process.
        processor_router: The document processor router to use.
        processor_overwrite: Forces the use of a specific processor, instead of the one provided by the router.

    Returns:
        A list of elements.
    """
    document_meta = await self.to_document_meta(document)
    processor = processor_overwrite or processor_router.get_provider(document_meta)
    return await processor.process(document_meta)

process_documents abstractmethod async #

process_documents(documents: Sequence[DocumentMeta | Document | Source], processor_router: DocumentProcessorRouter, processor_overwrite: BaseProvider | None = None) -> list[Element]

Process documents using the given processor and return the resulting elements.

PARAMETER DESCRIPTION
documents

The documents to process.

TYPE: Sequence[DocumentMeta | Document | Source]

processor_router

The document processor router to use.

TYPE: DocumentProcessorRouter

processor_overwrite

Forces the use of a specific processor, instead of the one provided by the router.

TYPE: BaseProvider | None DEFAULT: None

RETURNS DESCRIPTION
list[Element]

A list of elements.

Source code in packages/ragbits-document-search/src/ragbits/document_search/ingestion/processor_strategies/base.py
@abstractmethod
async def process_documents(
    self,
    documents: Sequence[DocumentMeta | Document | Source],
    processor_router: DocumentProcessorRouter,
    processor_overwrite: BaseProvider | None = None,
) -> list[Element]:
    """
    Process documents using the given processor and return the resulting elements.

    Args:
        documents: The documents to process.
        processor_router: The document processor router to use.
        processor_overwrite: Forces the use of a specific processor, instead of the one provided by the router.

    Returns:
        A list of elements.
    """

ragbits.document_search.ingestion.processor_strategies.SequentialProcessing #

Bases: ProcessingExecutionStrategy

A processing execution strategy that processes documents in sequence, one at a time.

from_config classmethod #

from_config(config: dict) -> Self

Creates and returns an instance of the ProcessingExecutionStrategy subclass from the given configuration.

PARAMETER DESCRIPTION
config

A dictionary containing the configuration for initializing the instance.

TYPE: dict

RETURNS DESCRIPTION
Self

An initialized instance of the ProcessingExecutionStrategy subclass.

Source code in packages/ragbits-document-search/src/ragbits/document_search/ingestion/processor_strategies/base.py
@classmethod
def from_config(cls, config: dict) -> Self:
    """
    Creates and returns an instance of the ProcessingExecutionStrategy subclass from the given configuration.

    Args:
        config: A dictionary containing the configuration for initializing the instance.

    Returns:
        An initialized instance of the ProcessingExecutionStrategy subclass.
    """
    return cls(**config)

to_document_meta async staticmethod #

to_document_meta(document: DocumentMeta | Document | Source) -> DocumentMeta

Convert a document, document meta or source to a document meta object.

PARAMETER DESCRIPTION
document

The document to convert.

TYPE: DocumentMeta | Document | Source

RETURNS DESCRIPTION
DocumentMeta

The document meta object.

Source code in packages/ragbits-document-search/src/ragbits/document_search/ingestion/processor_strategies/base.py
@staticmethod
async def to_document_meta(document: DocumentMeta | Document | Source) -> DocumentMeta:
    """
    Convert a document, document meta or source to a document meta object.

    Args:
        document: The document to convert.

    Returns:
        The document meta object.
    """
    if isinstance(document, Source):
        return await DocumentMeta.from_source(document)
    elif isinstance(document, DocumentMeta):
        return document
    else:
        return document.metadata

process_document async #

process_document(document: DocumentMeta | Document | Source, processor_router: DocumentProcessorRouter, processor_overwrite: BaseProvider | None = None) -> list[Element]

Process a single document and return the elements.

PARAMETER DESCRIPTION
document

The document to process.

TYPE: DocumentMeta | Document | Source

processor_router

The document processor router to use.

TYPE: DocumentProcessorRouter

processor_overwrite

Forces the use of a specific processor, instead of the one provided by the router.

TYPE: BaseProvider | None DEFAULT: None

RETURNS DESCRIPTION
list[Element]

A list of elements.

Source code in packages/ragbits-document-search/src/ragbits/document_search/ingestion/processor_strategies/base.py
async def process_document(
    self,
    document: DocumentMeta | Document | Source,
    processor_router: DocumentProcessorRouter,
    processor_overwrite: BaseProvider | None = None,
) -> list[Element]:
    """
    Process a single document and return the elements.

    Args:
        document: The document to process.
        processor_router: The document processor router to use.
        processor_overwrite: Forces the use of a specific processor, instead of the one provided by the router.

    Returns:
        A list of elements.
    """
    document_meta = await self.to_document_meta(document)
    processor = processor_overwrite or processor_router.get_provider(document_meta)
    return await processor.process(document_meta)

process_documents async #

process_documents(documents: Sequence[DocumentMeta | Document | Source], processor_router: DocumentProcessorRouter, processor_overwrite: BaseProvider | None = None) -> list[Element]

Process documents using the given processor and return the resulting elements.

PARAMETER DESCRIPTION
documents

The documents to process.

TYPE: Sequence[DocumentMeta | Document | Source]

processor_router

The document processor router to use.

TYPE: DocumentProcessorRouter

processor_overwrite

Forces the use of a specific processor, instead of the one provided by the router.

TYPE: BaseProvider | None DEFAULT: None

RETURNS DESCRIPTION
list[Element]

A list of elements.

RETURNS DESCRIPTION
list[Element]

A list of elements.

Source code in packages/ragbits-document-search/src/ragbits/document_search/ingestion/processor_strategies/sequential.py
async def process_documents(
    self,
    documents: Sequence[DocumentMeta | Document | Source],
    processor_router: DocumentProcessorRouter,
    processor_overwrite: BaseProvider | None = None,
) -> list[Element]:
    """
    Process documents using the given processor and return the resulting elements.

    Args:
        documents: The documents to process.
        processor_router: The document processor router to use.
        processor_overwrite: Forces the use of a specific processor, instead of the one provided by the router.

    Returns:
        A list of elements.

    Returns:
        A list of elements.
    """
    elements = []
    for document in documents:
        elements.extend(await self.process_document(document, processor_router, processor_overwrite))
    return elements

ragbits.document_search.ingestion.processor_strategies.BatchedAsyncProcessing #

BatchedAsyncProcessing(batch_size: int = 10)

Bases: ProcessingExecutionStrategy

A processing execution strategy that processes documents asynchronously in batches.

Initialize the BatchedAsyncProcessing instance.

PARAMETER DESCRIPTION
batch_size

The size of the batch to process documents in.

TYPE: int DEFAULT: 10

Source code in packages/ragbits-document-search/src/ragbits/document_search/ingestion/processor_strategies/batched.py
def __init__(self, batch_size: int = 10):
    """
    Initialize the BatchedAsyncProcessing instance.

    Args:
        batch_size: The size of the batch to process documents in.
    """
    self.batch_size = batch_size

batch_size instance-attribute #

batch_size = batch_size

from_config classmethod #

from_config(config: dict) -> Self

Creates and returns an instance of the ProcessingExecutionStrategy subclass from the given configuration.

PARAMETER DESCRIPTION
config

A dictionary containing the configuration for initializing the instance.

TYPE: dict

RETURNS DESCRIPTION
Self

An initialized instance of the ProcessingExecutionStrategy subclass.

Source code in packages/ragbits-document-search/src/ragbits/document_search/ingestion/processor_strategies/base.py
@classmethod
def from_config(cls, config: dict) -> Self:
    """
    Creates and returns an instance of the ProcessingExecutionStrategy subclass from the given configuration.

    Args:
        config: A dictionary containing the configuration for initializing the instance.

    Returns:
        An initialized instance of the ProcessingExecutionStrategy subclass.
    """
    return cls(**config)

to_document_meta async staticmethod #

to_document_meta(document: DocumentMeta | Document | Source) -> DocumentMeta

Convert a document, document meta or source to a document meta object.

PARAMETER DESCRIPTION
document

The document to convert.

TYPE: DocumentMeta | Document | Source

RETURNS DESCRIPTION
DocumentMeta

The document meta object.

Source code in packages/ragbits-document-search/src/ragbits/document_search/ingestion/processor_strategies/base.py
@staticmethod
async def to_document_meta(document: DocumentMeta | Document | Source) -> DocumentMeta:
    """
    Convert a document, document meta or source to a document meta object.

    Args:
        document: The document to convert.

    Returns:
        The document meta object.
    """
    if isinstance(document, Source):
        return await DocumentMeta.from_source(document)
    elif isinstance(document, DocumentMeta):
        return document
    else:
        return document.metadata

process_document async #

process_document(document: DocumentMeta | Document | Source, processor_router: DocumentProcessorRouter, processor_overwrite: BaseProvider | None = None) -> list[Element]

Process a single document and return the elements.

PARAMETER DESCRIPTION
document

The document to process.

TYPE: DocumentMeta | Document | Source

processor_router

The document processor router to use.

TYPE: DocumentProcessorRouter

processor_overwrite

Forces the use of a specific processor, instead of the one provided by the router.

TYPE: BaseProvider | None DEFAULT: None

RETURNS DESCRIPTION
list[Element]

A list of elements.

Source code in packages/ragbits-document-search/src/ragbits/document_search/ingestion/processor_strategies/base.py
async def process_document(
    self,
    document: DocumentMeta | Document | Source,
    processor_router: DocumentProcessorRouter,
    processor_overwrite: BaseProvider | None = None,
) -> list[Element]:
    """
    Process a single document and return the elements.

    Args:
        document: The document to process.
        processor_router: The document processor router to use.
        processor_overwrite: Forces the use of a specific processor, instead of the one provided by the router.

    Returns:
        A list of elements.
    """
    document_meta = await self.to_document_meta(document)
    processor = processor_overwrite or processor_router.get_provider(document_meta)
    return await processor.process(document_meta)

process_documents async #

process_documents(documents: Sequence[DocumentMeta | Document | Source], processor_router: DocumentProcessorRouter, processor_overwrite: BaseProvider | None = None) -> list[Element]

Process documents using the given processor and return the resulting elements.

PARAMETER DESCRIPTION
documents

The documents to process.

TYPE: Sequence[DocumentMeta | Document | Source]

processor_router

The document processor router to use.

TYPE: DocumentProcessorRouter

processor_overwrite

Forces the use of a specific processor, instead of the one provided by the router.

TYPE: BaseProvider | None DEFAULT: None

RETURNS DESCRIPTION
list[Element]

A list of elements.

RETURNS DESCRIPTION
list[Element]

A list of elements.

Source code in packages/ragbits-document-search/src/ragbits/document_search/ingestion/processor_strategies/batched.py
async def process_documents(
    self,
    documents: Sequence[DocumentMeta | Document | Source],
    processor_router: DocumentProcessorRouter,
    processor_overwrite: BaseProvider | None = None,
) -> list[Element]:
    """
    Process documents using the given processor and return the resulting elements.

    Args:
        documents: The documents to process.
        processor_router: The document processor router to use.
        processor_overwrite: Forces the use of a specific processor, instead of the one provided by the router.

    Returns:
        A list of elements.

    Returns:
        A list of elements.
    """
    semaphore = asyncio.Semaphore(self.batch_size)

    responses = await asyncio.gather(
        *[
            self._process_with_semaphore(semaphore, document, processor_router, processor_overwrite)
            for document in documents
        ]
    )

    # Return a flattened list of elements
    return [element for response in responses for element in response]