Skip to content

Hybrid Vector Store & Fusion Strategies#

ragbits.core.vector_stores.hybrid.HybridSearchVectorStore #

HybridSearchVectorStore(*vector_stores: VectorStore, retrieval_strategy: HybridRetrivalStrategy | None = None)

Bases: VectorStore

A vector store that takes multiple vector store objects and proxies requests to them, returning the union of results.

Constructs a new HybridSearchVectorStore instance.

PARAMETER DESCRIPTION
vector_stores

The vector stores to proxy requests to.

TYPE: VectorStore DEFAULT: ()

retrieval_strategy

The retrieval strategy to use when combining results, uses OrderedHybridRetrivalStrategy by default.

TYPE: HybridRetrivalStrategy | None DEFAULT: None

Source code in packages/ragbits-core/src/ragbits/core/vector_stores/hybrid.py
def __init__(self, *vector_stores: VectorStore, retrieval_strategy: HybridRetrivalStrategy | None = None) -> None:
    """
    Constructs a new HybridSearchVectorStore instance.

    Args:
        vector_stores: The vector stores to proxy requests to.
        retrieval_strategy: The retrieval strategy to use when combining results,
            uses OrderedHybridRetrivalStrategy by default.
    """
    self.vector_stores = vector_stores
    self.retrieval_strategy = retrieval_strategy or OrderedHybridRetrivalStrategy()

default_module class-attribute instance-attribute #

default_module: ClassVar = vector_stores

configuration_key class-attribute instance-attribute #

configuration_key: ClassVar = 'vector_store'

default_options instance-attribute #

default_options: OptionsT = default_options or options_cls()

options_cls class-attribute instance-attribute #

options_cls = VectorStoreOptions

vector_stores instance-attribute #

vector_stores = vector_stores

retrieval_strategy instance-attribute #

subclass_from_config classmethod #

subclass_from_config(config: ObjectConstructionConfig) -> Self

Initializes the class with the provided configuration. May return a subclass of the class, if requested by the configuration.

PARAMETER DESCRIPTION
config

A model containing configuration details for the class.

TYPE: ObjectConstructionConfig

RETURNS DESCRIPTION
Self

An instance of the class initialized with the provided configuration.

RAISES DESCRIPTION
InvalidConfigError

The class can't be found or is not a subclass of the current class.

Source code in packages/ragbits-core/src/ragbits/core/utils/config_handling.py
@classmethod
def subclass_from_config(cls, config: ObjectConstructionConfig) -> Self:
    """
    Initializes the class with the provided configuration. May return a subclass of the class,
    if requested by the configuration.

    Args:
        config: A model containing configuration details for the class.

    Returns:
        An instance of the class initialized with the provided configuration.

    Raises:
        InvalidConfigError: The class can't be found or is not a subclass of the current class.
    """
    subclass = import_by_path(config.type, cls.default_module)
    if not issubclass(subclass, cls):
        raise InvalidConfigError(f"{subclass} is not a subclass of {cls}")

    return subclass.from_config(config.config)

subclass_from_factory classmethod #

subclass_from_factory(factory_path: str) -> Self

Creates the class using the provided factory function. May return a subclass of the class, if requested by the factory.

PARAMETER DESCRIPTION
factory_path

A string representing the path to the factory function in the format of "module.submodule:factory_name".

TYPE: str

RETURNS DESCRIPTION
Self

An instance of the class initialized with the provided factory function.

RAISES DESCRIPTION
InvalidConfigError

The factory can't be found or the object returned is not a subclass of the current class.

Source code in packages/ragbits-core/src/ragbits/core/utils/config_handling.py
@classmethod
def subclass_from_factory(cls, factory_path: str) -> Self:
    """
    Creates the class using the provided factory function. May return a subclass of the class,
    if requested by the factory.

    Args:
        factory_path: A string representing the path to the factory function
            in the format of "module.submodule:factory_name".

    Returns:
        An instance of the class initialized with the provided factory function.

    Raises:
        InvalidConfigError: The factory can't be found or the object returned
            is not a subclass of the current class.
    """
    factory = import_by_path(factory_path, cls.default_module)
    obj = factory()
    if not isinstance(obj, cls):
        raise InvalidConfigError(f"The object returned by factory {factory_path} is not an instance of {cls}")
    return obj

preferred_subclass classmethod #

preferred_subclass(config: CoreConfig, factory_path_override: str | None = None, yaml_path_override: Path | None = None) -> Self

Tries to create an instance by looking at project's component preferences, either from YAML or from the factory. Takes optional overrides for both, which takes a higher precedence.

PARAMETER DESCRIPTION
config

The CoreConfig instance containing preferred factory and configuration details.

TYPE: CoreConfig

factory_path_override

A string representing the path to the factory function in the format of "module.submodule:factory_name".

TYPE: str | None DEFAULT: None

yaml_path_override

A string representing the path to the YAML file containing the Ragstack instance configuration.

TYPE: Path | None DEFAULT: None

RAISES DESCRIPTION
InvalidConfigError

If the default factory or configuration can't be found.

Source code in packages/ragbits-core/src/ragbits/core/utils/config_handling.py
@classmethod
def preferred_subclass(
    cls, config: CoreConfig, factory_path_override: str | None = None, yaml_path_override: Path | None = None
) -> Self:
    """
    Tries to create an instance by looking at project's component preferences, either from YAML
    or from the factory. Takes optional overrides for both, which takes a higher precedence.

    Args:
        config: The CoreConfig instance containing preferred factory and configuration details.
        factory_path_override: A string representing the path to the factory function
            in the format of "module.submodule:factory_name".
        yaml_path_override: A string representing the path to the YAML file containing
            the Ragstack instance configuration.

    Raises:
        InvalidConfigError: If the default factory or configuration can't be found.
    """
    if yaml_path_override:
        preferences = get_config_from_yaml(yaml_path_override)
        if type_config := preferences.get(cls.configuration_key):
            return cls.subclass_from_config(ObjectConstructionConfig.model_validate(type_config))

    if factory_path_override:
        return cls.subclass_from_factory(factory_path_override)

    if preferred_factory := config.component_preference_factories.get(cls.configuration_key):
        return cls.subclass_from_factory(preferred_factory)

    if preferred_config := config.preferred_instances_config.get(cls.configuration_key):
        return cls.subclass_from_config(ObjectConstructionConfig.model_validate(preferred_config))

    raise NoPreferredConfigError(f"Could not find preferred factory or configuration for {cls.configuration_key}")

from_config classmethod #

from_config(config: dict[str, Any]) -> Self

Initializes the class with the provided configuration.

PARAMETER DESCRIPTION
config

A dictionary containing configuration details for the class.

TYPE: dict[str, Any]

RETURNS DESCRIPTION
Self

An instance of the class initialized with the provided configuration.

Source code in packages/ragbits-core/src/ragbits/core/utils/config_handling.py
@classmethod
def from_config(cls, config: dict[str, Any]) -> Self:
    """
    Initializes the class with the provided configuration.

    Args:
        config: A dictionary containing configuration details for the class.

    Returns:
        An instance of the class initialized with the provided configuration.
    """
    default_options = config.pop("default_options", None)
    options = cls.options_cls(**default_options) if default_options else None
    return cls(**config, default_options=options)

store async #

store(entries: list[VectorStoreEntry]) -> None

Store entries in the vector stores.

Sends entries to all vector stores to be stored, although individual vector stores are free to implement their own logic regarding which entries to store. For example, some vector stores may only store entries with specific type of content (images, text, etc.).

PARAMETER DESCRIPTION
entries

The entries to store.

TYPE: list[VectorStoreEntry]

Source code in packages/ragbits-core/src/ragbits/core/vector_stores/hybrid.py
@traceable
async def store(self, entries: list[VectorStoreEntry]) -> None:
    """
    Store entries in the vector stores.

    Sends entries to all vector stores to be stored, although individual vector stores are free to implement
    their own logic regarding which entries to store. For example, some vector stores may only store entries
    with specific type of content (images, text, etc.).

    Args:
        entries: The entries to store.
    """
    store_tasks = (vector_store.store(entries) for vector_store in self.vector_stores)
    await asyncio.gather(*store_tasks)

retrieve async #

retrieve(text: str, options: VectorStoreOptions | None = None) -> list[VectorStoreResult]

Retrieve entries from the vector stores most similar to the provided text. The results are combined using the retrieval strategy provided in the constructor.

PARAMETER DESCRIPTION
text

The text to query the vector store with.

TYPE: str

options

The options for querying the vector stores.

TYPE: VectorStoreOptions | None DEFAULT: None

RETURNS DESCRIPTION
list[VectorStoreResult]

The entries.

Source code in packages/ragbits-core/src/ragbits/core/vector_stores/hybrid.py
@traceable
async def retrieve(
    self,
    text: str,
    options: VectorStoreOptions | None = None,
) -> list[VectorStoreResult]:
    """
    Retrieve entries from the vector stores most similar to the provided text. The results are combined using
    the retrieval strategy provided in the constructor.

    Args:
        text: The text to query the vector store with.
        options: The options for querying the vector stores.

    Returns:
        The entries.
    """
    retrieve_tasks = (vector_store.retrieve(text, options) for vector_store in self.vector_stores)
    results = await asyncio.gather(*retrieve_tasks)

    return self.retrieval_strategy.join(results)

remove async #

remove(ids: list[UUID]) -> None

Remove entries from all vector stores.

PARAMETER DESCRIPTION
ids

The list of entries' IDs to remove.

TYPE: list[UUID]

Source code in packages/ragbits-core/src/ragbits/core/vector_stores/hybrid.py
@traceable
async def remove(self, ids: list[UUID]) -> None:
    """
    Remove entries from all vector stores.

    Args:
        ids: The list of entries' IDs to remove.
    """
    remove_tasks = (vector_store.remove(ids) for vector_store in self.vector_stores)
    await asyncio.gather(*remove_tasks)

list async #

list(where: WhereQuery | None = None, limit: int | None = None, offset: int = 0) -> list[VectorStoreEntry]

List entries from the vector stores. The entries can be filtered, limited and offset. Vector stores are queried in the order they were provided in the constructor.

PARAMETER DESCRIPTION
where

The filter dictionary - the keys are the field names and the values are the values to filter by. Not specifying the key means no filtering.

TYPE: WhereQuery | None DEFAULT: None

limit

The maximum number of entries to return.

TYPE: int | None DEFAULT: None

offset

The number of entries to skip.

TYPE: int DEFAULT: 0

RETURNS DESCRIPTION
list[VectorStoreEntry]

The entries.

Source code in packages/ragbits-core/src/ragbits/core/vector_stores/hybrid.py
@traceable
async def list(
    self, where: WhereQuery | None = None, limit: int | None = None, offset: int = 0
) -> list[VectorStoreEntry]:
    """
    List entries from the vector stores. The entries can be filtered, limited and offset.
    Vector stores are queried in the order they were provided in the constructor.

    Args:
        where: The filter dictionary - the keys are the field names and the values are the values to filter by.
            Not specifying the key means no filtering.
        limit: The maximum number of entries to return.
        offset: The number of entries to skip.

    Returns:
        The entries.
    """
    retrieved_results: dict[UUID, VectorStoreEntry] = {}
    for vector_store in self.vector_stores:
        if limit is not None and (offset + limit - len(retrieved_results)) <= 0:
            break

        store_results = await vector_store.list(where)
        retrieved_results.update({entry.id: entry for entry in store_results})

    results = list(retrieved_results.values())
    results = results[offset:] if limit is None else results[offset : offset + limit]

    return results

ragbits.core.vector_stores.hybrid_strategies.OrderedHybridRetrivalStrategy #

OrderedHybridRetrivalStrategy(sum_scores: bool = False)

Bases: HybridRetrivalStrategy

A class that orders the results by score and deduplicates them by choosing the first occurrence of each entry. This algorithm is also known as "Relative Score Fusion".

Constructs a new OrderedHybridRetrivalStrategy instance.

PARAMETER DESCRIPTION
sum_scores

if True sums the scores of the same entries, otherwise keeps the best score (i.e., the biggest one). Summing scores boosts the results that are present in results from multiple vector stores.

TYPE: bool DEFAULT: False

Source code in packages/ragbits-core/src/ragbits/core/vector_stores/hybrid_strategies.py
def __init__(self, sum_scores: bool = False) -> None:
    """
    Constructs a new OrderedHybridRetrivalStrategy instance.

    Args:
        sum_scores: if True sums the scores of the same entries, otherwise keeps the best score
            (i.e., the biggest one). Summing scores boosts the results that are present in results
            from multiple vector stores.
    """
    self._sum_scores = sum_scores

join #

join(results: list[list[VectorStoreResult]]) -> list[VectorStoreResult]

Joins the multiple lists of results into a single list.

PARAMETER DESCRIPTION
results

The lists of results to join.

TYPE: list[list[VectorStoreResult]]

RETURNS DESCRIPTION
list[VectorStoreResult]

The joined list of results.

Source code in packages/ragbits-core/src/ragbits/core/vector_stores/hybrid_strategies.py
def join(self, results: list[list[VectorStoreResult]]) -> list[VectorStoreResult]:
    """
    Joins the multiple lists of results into a single list.

    Args:
        results: The lists of results to join.

    Returns:
        The joined list of results.
    """
    score_operation = add if self._sum_scores else max
    all_results = [result for sublist in results for result in sublist]
    all_results.sort(key=lambda result: result.score, reverse=True)
    end_results: dict[UUID, VectorStoreResult] = {}
    for result in all_results:
        if result.entry.id not in end_results:
            end_results[result.entry.id] = result.model_copy(update={"subresults": [result]})
        else:
            end_results[result.entry.id].score = score_operation(end_results[result.entry.id].score, result.score)
            end_results[result.entry.id].subresults.append(result)

    ordered = list(end_results.values())
    ordered.sort(key=lambda result: result.score, reverse=True)
    return ordered

ragbits.core.vector_stores.hybrid_strategies.ReciprocalRankFusion #

ReciprocalRankFusion(k_constant: float = 60.0, sum_scores: bool = True)

Bases: HybridRetrivalStrategy

An implementation of Reciprocal Rank Fusion (RRF) for combining search results, based on the paper "Reciprocal Rank Fusion outperforms Condorcet and individual Rank Learning Methods": https://plg.uwaterloo.ca/~gvcormac/cormacksigir09-rrf.pdf

Constructs a new ReciprocalRankFusion instance.

PARAMETER DESCRIPTION
k_constant

The "k" constant used in the RRF formula, meant to mitigate the impact of high rankings by outlier systems. The value of 60 is recommended by the authors of the RRF paper. Qdrant uses a value of 2.

TYPE: float DEFAULT: 60.0

sum_scores

if True sums the scores of the same entries, otherwise keeps the best score. (i.e., the biggest one). Summing scores boosts the results that are present in results from multiple vector stores. Not summing will result in a very simple behavior: the list will include first results from all vector stores, then second results (excluding the duplicates), and so on. The original version of RRF sums the scores, so the default value is True.

TYPE: bool DEFAULT: True

Source code in packages/ragbits-core/src/ragbits/core/vector_stores/hybrid_strategies.py
def __init__(self, k_constant: float = 60.0, sum_scores: bool = True) -> None:
    """
    Constructs a new ReciprocalRankFusion instance.

    Args:
        k_constant: The "k" constant used in the RRF formula, meant to mitigate
            the impact of high rankings by outlier systems. The value of 60 is recommended
            by the authors of the RRF paper. Qdrant uses a value of 2.
        sum_scores: if True sums the scores of the same entries, otherwise keeps the best score.
            (i.e., the biggest one). Summing scores boosts the results that are present in results
            from multiple vector stores. Not summing will result in a very simple behavior:
            the list will include first results from all vector stores, then second results
            (excluding the duplicates), and so on. The original version of RRF sums the scores,
            so the default value is True.
    """
    self._k_constant = k_constant
    self._sum_scores = sum_scores

join #

join(results: list[list[VectorStoreResult]]) -> list[VectorStoreResult]

Joins the multiple lists of results into a single list using Reciprocal Rank Fusion.

PARAMETER DESCRIPTION
results

The lists of results to join.

TYPE: list[list[VectorStoreResult]]

RETURNS DESCRIPTION
list[VectorStoreResult]

The joined list of results.

Source code in packages/ragbits-core/src/ragbits/core/vector_stores/hybrid_strategies.py
def join(self, results: list[list[VectorStoreResult]]) -> list[VectorStoreResult]:
    """
    Joins the multiple lists of results into a single list using Reciprocal Rank Fusion.

    Args:
        results: The lists of results to join.

    Returns:
        The joined list of results.
    """
    score_operation = add if self._sum_scores else max
    end_results: dict[UUID, VectorStoreResult] = {}
    for result_list in results:
        for i, result in enumerate(result_list):
            score = 1.0 / (i + 1 + self._k_constant)
            if result.entry.id not in end_results:
                end_results[result.entry.id] = result.model_copy(update={"score": score, "subresults": [result]})
            else:
                end_results[result.entry.id].score = score_operation(end_results[result.entry.id].score, score)
                end_results[result.entry.id].subresults.append(result)

    ordered = list(end_results.values())
    ordered.sort(key=lambda result: result.score, reverse=True)
    return ordered

ragbits.core.vector_stores.hybrid_strategies.DistributionBasedScoreFusion #

DistributionBasedScoreFusion(sum_scores: bool = False)

Bases: HybridRetrivalStrategy

An implementation of Distribution-Based Score Fusion (DBSF) for combining search results, based on the "Distribution-Based Score Fusion (DBSF), a new approach to Vector Search Ranking" post: https://medium.com/plain-simple-software/distribution-based-score-fusion-dbsf-a-new-approach-to-vector-search-ranking-f87c37488b18

Constructs a new DistributionBasedScoreFusion instance.

PARAMETER DESCRIPTION
sum_scores

if True sums the scores of the same entries, otherwise keeps the best score. (i.e., the biggest one). Summing scores boosts the results that are present in results from multiple vector stores. The original DBSF article remains neutral on this matter, so the default value is False. Many implementations (like Qdrant) use summing.

TYPE: bool DEFAULT: False

Source code in packages/ragbits-core/src/ragbits/core/vector_stores/hybrid_strategies.py
def __init__(self, sum_scores: bool = False) -> None:
    """
    Constructs a new DistributionBasedScoreFusion instance.

    Args:
        sum_scores: if True sums the scores of the same entries, otherwise keeps the best score.
            (i.e., the biggest one). Summing scores boosts the results that are present in results
            from multiple vector stores. The original DBSF article remains neutral on this matter,
            so the default value is False. Many implementations (like Qdrant) use summing.
    """
    self._sum_scores = sum_scores

join #

join(results: list[list[VectorStoreResult]]) -> list[VectorStoreResult]

Joins the multiple lists of results into a single list using Distribution-Based Score Fusion.

PARAMETER DESCRIPTION
results

The lists of results to join.

TYPE: list[list[VectorStoreResult]]

RETURNS DESCRIPTION
list[VectorStoreResult]

The joined list of results.

Source code in packages/ragbits-core/src/ragbits/core/vector_stores/hybrid_strategies.py
def join(self, results: list[list[VectorStoreResult]]) -> list[VectorStoreResult]:  # noqa: PLR6301
    """
    Joins the multiple lists of results into a single list using Distribution-Based Score Fusion.

    Args:
        results: The lists of results to join.

    Returns:
        The joined list of results.
    """
    score_operation = add if self._sum_scores else max
    end_results: dict[UUID, VectorStoreResult] = {}
    scores = np.array([[result.score for result in result_list] for result_list in results])
    mean = np.mean(scores, axis=1)
    std = np.std(scores, axis=1)
    three_std_above = mean + 3 * std
    three_std_below = mean - 3 * std
    normalized_scores = ((scores.T - three_std_below) / (three_std_above - three_std_below)).T

    for i, result_list in enumerate(results):
        for j, result in enumerate(result_list):
            if result.entry.id not in end_results:
                end_results[result.entry.id] = result.model_copy(
                    update={"score": normalized_scores[i][j], "subresults": [result]}
                )
            else:
                end_results[result.entry.id].score = score_operation(
                    end_results[result.entry.id].score, normalized_scores[i][j]
                )
                end_results[result.entry.id].subresults.append(result)

    ordered = list(end_results.values())
    ordered.sort(key=lambda result: result.score, reverse=True)

    return ordered