Skip to content

Server#

ragbits.agents.mcp.server.MCPServer #

Bases: ABC

Base class for Model Context Protocol servers.

name abstractmethod property #

name: str

A readable name for the server.

connect abstractmethod async #

connect() -> None

Connect to the server. For example, this might mean spawning a subprocess or opening a network connection. The server is expected to remain connected until cleanup() is called.

Source code in packages/ragbits-agents/src/ragbits/agents/mcp/server.py
@abstractmethod
async def connect(self) -> None:
    """
    Connect to the server. For example, this might mean spawning a subprocess or
    opening a network connection. The server is expected to remain connected until `cleanup()` is called.
    """

cleanup abstractmethod async #

cleanup() -> None

Cleanup the server. For example, this might mean closing a subprocess or closing a network connection.

Source code in packages/ragbits-agents/src/ragbits/agents/mcp/server.py
@abstractmethod
async def cleanup(self) -> None:
    """
    Cleanup the server. For example, this might mean closing a subprocess or closing a network connection.
    """

list_tools abstractmethod async #

list_tools() -> list[Tool]

List the tools available on the server.

Source code in packages/ragbits-agents/src/ragbits/agents/mcp/server.py
@abstractmethod
async def list_tools(self) -> list["MCPTool"]:
    """
    List the tools available on the server.
    """

call_tool abstractmethod async #

call_tool(tool_name: str, arguments: dict[str, Any] | None) -> CallToolResult

Invoke a tool on the server.

Source code in packages/ragbits-agents/src/ragbits/agents/mcp/server.py
@abstractmethod
async def call_tool(self, tool_name: str, arguments: dict[str, Any] | None) -> "CallToolResult":
    """
    Invoke a tool on the server.
    """

ragbits.agents.mcp.server.MCPServerStdioParams #

Bases: TypedDict

Mirrors mcp.client.stdio.StdioServerParameters, but lets you pass params without another import.

command instance-attribute #

command: str

The executable to run to start the server. For example, python or node.

args instance-attribute #

args: NotRequired[list[str]]

Command line args to pass to the command executable. For example, ['foo.py'] or ['server.js', '--port', '8080'].

env instance-attribute #

env: NotRequired[dict[str, str]]

The environment variables to set for the server. .

cwd instance-attribute #

cwd: NotRequired[str | Path]

The working directory to use when spawning the process.

encoding instance-attribute #

encoding: NotRequired[str]

The text encoding used when sending/receiving messages to the server. Defaults to utf-8.

encoding_error_handler instance-attribute #

encoding_error_handler: NotRequired[Literal['strict', 'ignore', 'replace']]

The text encoding error handler. Defaults to strict.

See https://docs.python.org/3/library/codecs.html#codec-base-classes for explanations of possible values.

ragbits.agents.mcp.server.MCPServerStdio #

MCPServerStdio(params: MCPServerStdioParams, cache_tools_list: bool = False, name: str | None = None, client_session_timeout_seconds: float | None = 5)

Bases: _MCPServerWithClientSession

MCP server implementation that uses the stdio transport. See the spec for details.

Create a new MCP server based on the stdio transport.

PARAMETER DESCRIPTION
params

The params that configure the server. This includes the command to run to start the server, the args to pass to the command, the environment variables to set for the server, the working directory to use when spawning the process, and the text encoding used when sending/receiving messages to the server.

TYPE: MCPServerStdioParams

cache_tools_list

Whether to cache the tools list. If True, the tools list will be cached and only fetched from the server once. If False, the tools list will be fetched from the server on each call to list_tools(). The cache can be invalidated by calling invalidate_tools_cache(). You should set this to True if you know the server will not change its tools list, because it can drastically improve latency (by avoiding a round-trip to the server every time).

TYPE: bool DEFAULT: False

name

A readable name for the server. If not provided, we'll create one from the command.

TYPE: str | None DEFAULT: None

client_session_timeout_seconds

the read timeout passed to the MCP ClientSession.

TYPE: float | None DEFAULT: 5

Source code in packages/ragbits-agents/src/ragbits/agents/mcp/server.py
def __init__(
    self,
    params: MCPServerStdioParams,
    cache_tools_list: bool = False,
    name: str | None = None,
    client_session_timeout_seconds: float | None = 5,
) -> None:
    """
    Create a new MCP server based on the stdio transport.

    Args:
        params: The params that configure the server. This includes the command to run to
            start the server, the args to pass to the command, the environment variables to
            set for the server, the working directory to use when spawning the process, and
            the text encoding used when sending/receiving messages to the server.
        cache_tools_list: Whether to cache the tools list. If `True`, the tools list will be
            cached and only fetched from the server once. If `False`, the tools list will be
            fetched from the server on each call to `list_tools()`. The cache can be
            invalidated by calling `invalidate_tools_cache()`. You should set this to `True`
            if you know the server will not change its tools list, because it can drastically
            improve latency (by avoiding a round-trip to the server every time).
        name: A readable name for the server. If not provided, we'll create one from the
            command.
        client_session_timeout_seconds: the read timeout passed to the MCP ClientSession.
    """
    super().__init__(cache_tools_list, client_session_timeout_seconds)

    self.params = StdioServerParameters(
        command=params["command"],
        args=params.get("args", []),
        env=params.get("env"),
        cwd=params.get("cwd"),
        encoding=params.get("encoding", "utf-8"),
        encoding_error_handler=params.get("encoding_error_handler", "strict"),
    )

    self._name = name or f"stdio: {self.params.command}"

session instance-attribute #

session: ClientSession | None = None

exit_stack instance-attribute #

exit_stack = AsyncExitStack()

cache_tools_list instance-attribute #

cache_tools_list = cache_tools_list

server_initialize_result instance-attribute #

server_initialize_result: InitializeResult | None = None

client_session_timeout_seconds instance-attribute #

client_session_timeout_seconds = client_session_timeout_seconds

params instance-attribute #

params = StdioServerParameters(command=params['command'], args=get('args', []), env=get('env'), cwd=get('cwd'), encoding=get('encoding', 'utf-8'), encoding_error_handler=get('encoding_error_handler', 'strict'))

name property #

name: str

A readable name for the server.

connect async #

connect() -> None

Connect to the server.

Source code in packages/ragbits-agents/src/ragbits/agents/mcp/server.py
async def connect(self) -> None:
    """
    Connect to the server.
    """
    try:
        transport = await self.exit_stack.enter_async_context(self.create_streams())
        # streamablehttp_client returns (read, write, get_session_id)
        # sse_client returns (read, write)

        read, write, *_ = transport

        session = await self.exit_stack.enter_async_context(
            ClientSession(
                read,
                write,
                timedelta(seconds=self.client_session_timeout_seconds)
                if self.client_session_timeout_seconds
                else None,
            )
        )
        server_result = await session.initialize()
        self.server_initialize_result = server_result
        self.session = session
    except Exception as e:
        logger.error(f"Error initializing MCP server: {e}")
        await self.cleanup()
        raise

cleanup async #

cleanup() -> None

Cleanup the server.

Source code in packages/ragbits-agents/src/ragbits/agents/mcp/server.py
async def cleanup(self) -> None:
    """
    Cleanup the server.
    """
    async with self._cleanup_lock:
        try:
            await self.exit_stack.aclose()
        except Exception as e:
            logger.error(f"Error cleaning up server: {e}")
        finally:
            self.session = None

list_tools async #

list_tools() -> list[Tool]

List the tools available on the server.

Source code in packages/ragbits-agents/src/ragbits/agents/mcp/server.py
async def list_tools(self) -> list["MCPTool"]:
    """
    List the tools available on the server.
    """
    if not self.session:
        raise RuntimeError("Server not initialized. Make sure you call `connect()` first.")

    # Return from cache if caching is enabled, we have tools, and the cache is not dirty
    if self.cache_tools_list and not self._cache_dirty and self._tools_list:
        return self._tools_list

    # Reset the cache dirty to False
    self._cache_dirty = False

    # Fetch the tools from the server
    self._tools_list = (await self.session.list_tools()).tools
    return self._tools_list

call_tool async #

call_tool(tool_name: str, arguments: dict[str, Any] | None) -> CallToolResult

Invoke a tool on the server.

Source code in packages/ragbits-agents/src/ragbits/agents/mcp/server.py
async def call_tool(self, tool_name: str, arguments: dict[str, Any] | None) -> "CallToolResult":
    """
    Invoke a tool on the server.
    """
    if not self.session:
        raise RuntimeError("Server not initialized. Make sure you call `connect()` first.")

    return await self.session.call_tool(tool_name, arguments)

invalidate_tools_cache #

invalidate_tools_cache() -> None

Invalidate the tools cache.

Source code in packages/ragbits-agents/src/ragbits/agents/mcp/server.py
def invalidate_tools_cache(self) -> None:
    """
    Invalidate the tools cache.
    """
    self._cache_dirty = True

create_streams #

create_streams() -> AbstractAsyncContextManager[tuple[MemoryObjectReceiveStream[SessionMessage | Exception], MemoryObjectSendStream[SessionMessage], GetSessionIdCallback | None]]

Create the streams for the server.

Source code in packages/ragbits-agents/src/ragbits/agents/mcp/server.py
def create_streams(
    self,
) -> AbstractAsyncContextManager[
    tuple[
        "MemoryObjectReceiveStream[SessionMessage | Exception]",
        "MemoryObjectSendStream[SessionMessage]",
        "GetSessionIdCallback | None",
    ]
]:
    """
    Create the streams for the server.
    """
    return stdio_client(self.params)

ragbits.agents.mcp.server.MCPServerSseParams #

Bases: TypedDict

Mirrors the params inmcp.client.sse.sse_client.

url instance-attribute #

url: str

The URL of the server.

headers instance-attribute #

headers: NotRequired[dict[str, str]]

The headers to send to the server.

timeout instance-attribute #

timeout: NotRequired[float]

The timeout for the HTTP request. Defaults to 5 seconds.

sse_read_timeout instance-attribute #

sse_read_timeout: NotRequired[float]

The timeout for the SSE connection, in seconds. Defaults to 5 minutes.

ragbits.agents.mcp.server.MCPServerSse #

MCPServerSse(params: MCPServerSseParams, cache_tools_list: bool = False, name: str | None = None, client_session_timeout_seconds: float | None = 5)

Bases: _MCPServerWithClientSession

MCP server implementation that uses the HTTP with SSE transport. See the spec for details.

Create a new MCP server based on the HTTP with SSE transport.

PARAMETER DESCRIPTION
params

The params that configure the server. This includes the URL of the server, the headers to send to the server, the timeout for the HTTP request, and the timeout for the SSE connection.

TYPE: MCPServerSseParams

cache_tools_list

Whether to cache the tools list. If True, the tools list will be cached and only fetched from the server once. If False, the tools list will be fetched from the server on each call to list_tools(). The cache can be invalidated by calling invalidate_tools_cache(). You should set this to True if you know the server will not change its tools list, because it can drastically improve latency (by avoiding a round-trip to the server every time).

TYPE: bool DEFAULT: False

name

A readable name for the server. If not provided, we'll create one from the URL.

TYPE: str | None DEFAULT: None

client_session_timeout_seconds

the read timeout passed to the MCP ClientSession.

TYPE: float | None DEFAULT: 5

Source code in packages/ragbits-agents/src/ragbits/agents/mcp/server.py
def __init__(
    self,
    params: MCPServerSseParams,
    cache_tools_list: bool = False,
    name: str | None = None,
    client_session_timeout_seconds: float | None = 5,
) -> None:
    """
    Create a new MCP server based on the HTTP with SSE transport.

    Args:
        params: The params that configure the server. This includes the URL of the server,
            the headers to send to the server, the timeout for the HTTP request, and the
            timeout for the SSE connection.
        cache_tools_list: Whether to cache the tools list. If `True`, the tools list will be
            cached and only fetched from the server once. If `False`, the tools list will be
            fetched from the server on each call to `list_tools()`. The cache can be
            invalidated by calling `invalidate_tools_cache()`. You should set this to `True`
            if you know the server will not change its tools list, because it can drastically
            improve latency (by avoiding a round-trip to the server every time).
        name: A readable name for the server. If not provided, we'll create one from the
            URL.
        client_session_timeout_seconds: the read timeout passed to the MCP ClientSession.
    """
    super().__init__(cache_tools_list, client_session_timeout_seconds)

    self.params = params
    self._name = name or f"sse: {self.params['url']}"

session instance-attribute #

session: ClientSession | None = None

exit_stack instance-attribute #

exit_stack = AsyncExitStack()

cache_tools_list instance-attribute #

cache_tools_list = cache_tools_list

server_initialize_result instance-attribute #

server_initialize_result: InitializeResult | None = None

client_session_timeout_seconds instance-attribute #

client_session_timeout_seconds = client_session_timeout_seconds

params instance-attribute #

params = params

name property #

name: str

A readable name for the server.

connect async #

connect() -> None

Connect to the server.

Source code in packages/ragbits-agents/src/ragbits/agents/mcp/server.py
async def connect(self) -> None:
    """
    Connect to the server.
    """
    try:
        transport = await self.exit_stack.enter_async_context(self.create_streams())
        # streamablehttp_client returns (read, write, get_session_id)
        # sse_client returns (read, write)

        read, write, *_ = transport

        session = await self.exit_stack.enter_async_context(
            ClientSession(
                read,
                write,
                timedelta(seconds=self.client_session_timeout_seconds)
                if self.client_session_timeout_seconds
                else None,
            )
        )
        server_result = await session.initialize()
        self.server_initialize_result = server_result
        self.session = session
    except Exception as e:
        logger.error(f"Error initializing MCP server: {e}")
        await self.cleanup()
        raise

cleanup async #

cleanup() -> None

Cleanup the server.

Source code in packages/ragbits-agents/src/ragbits/agents/mcp/server.py
async def cleanup(self) -> None:
    """
    Cleanup the server.
    """
    async with self._cleanup_lock:
        try:
            await self.exit_stack.aclose()
        except Exception as e:
            logger.error(f"Error cleaning up server: {e}")
        finally:
            self.session = None

list_tools async #

list_tools() -> list[Tool]

List the tools available on the server.

Source code in packages/ragbits-agents/src/ragbits/agents/mcp/server.py
async def list_tools(self) -> list["MCPTool"]:
    """
    List the tools available on the server.
    """
    if not self.session:
        raise RuntimeError("Server not initialized. Make sure you call `connect()` first.")

    # Return from cache if caching is enabled, we have tools, and the cache is not dirty
    if self.cache_tools_list and not self._cache_dirty and self._tools_list:
        return self._tools_list

    # Reset the cache dirty to False
    self._cache_dirty = False

    # Fetch the tools from the server
    self._tools_list = (await self.session.list_tools()).tools
    return self._tools_list

call_tool async #

call_tool(tool_name: str, arguments: dict[str, Any] | None) -> CallToolResult

Invoke a tool on the server.

Source code in packages/ragbits-agents/src/ragbits/agents/mcp/server.py
async def call_tool(self, tool_name: str, arguments: dict[str, Any] | None) -> "CallToolResult":
    """
    Invoke a tool on the server.
    """
    if not self.session:
        raise RuntimeError("Server not initialized. Make sure you call `connect()` first.")

    return await self.session.call_tool(tool_name, arguments)

invalidate_tools_cache #

invalidate_tools_cache() -> None

Invalidate the tools cache.

Source code in packages/ragbits-agents/src/ragbits/agents/mcp/server.py
def invalidate_tools_cache(self) -> None:
    """
    Invalidate the tools cache.
    """
    self._cache_dirty = True

create_streams #

create_streams() -> AbstractAsyncContextManager[tuple[MemoryObjectReceiveStream[SessionMessage | Exception], MemoryObjectSendStream[SessionMessage], GetSessionIdCallback | None]]

Create the streams for the server.

Source code in packages/ragbits-agents/src/ragbits/agents/mcp/server.py
def create_streams(
    self,
) -> AbstractAsyncContextManager[
    tuple[
        "MemoryObjectReceiveStream[SessionMessage | Exception]",
        "MemoryObjectSendStream[SessionMessage]",
        "GetSessionIdCallback | None",
    ]
]:
    """
    Create the streams for the server.
    """
    return sse_client(
        url=self.params["url"],
        headers=self.params.get("headers", None),
        timeout=self.params.get("timeout", 5),
        sse_read_timeout=self.params.get("sse_read_timeout", 60 * 5),
    )

ragbits.agents.mcp.server.MCPServerStreamableHttpParams #

Bases: TypedDict

Mirrors the params inmcp.client.streamable_http.streamablehttp_client.

url instance-attribute #

url: str

The URL of the server.

headers instance-attribute #

headers: NotRequired[dict[str, str]]

The headers to send to the server.

timeout instance-attribute #

timeout: NotRequired[timedelta | float]

The timeout for the HTTP request. Defaults to 5 seconds.

sse_read_timeout instance-attribute #

sse_read_timeout: NotRequired[timedelta | float]

The timeout for the SSE connection, in seconds. Defaults to 5 minutes.

terminate_on_close instance-attribute #

terminate_on_close: NotRequired[bool]

Terminate on close

ragbits.agents.mcp.server.MCPServerStreamableHttp #

MCPServerStreamableHttp(params: MCPServerStreamableHttpParams, cache_tools_list: bool = False, name: str | None = None, client_session_timeout_seconds: float | None = 5)

Bases: _MCPServerWithClientSession

MCP server implementation that uses the Streamable HTTP transport. See the spec for details.

Create a new MCP server based on the Streamable HTTP transport.

PARAMETER DESCRIPTION
params

The params that configure the server. This includes the URL of the server, the headers to send to the server, the timeout for the HTTP request, and the timeout for the Streamable HTTP connection and whether we need to terminate on close.

TYPE: MCPServerStreamableHttpParams

cache_tools_list

Whether to cache the tools list. If True, the tools list will be cached and only fetched from the server once. If False, the tools list will be fetched from the server on each call to list_tools(). The cache can be invalidated by calling invalidate_tools_cache(). You should set this to True if you know the server will not change its tools list, because it can drastically improve latency (by avoiding a round-trip to the server every time).

TYPE: bool DEFAULT: False

name

A readable name for the server. If not provided, we'll create one from the URL.

TYPE: str | None DEFAULT: None

client_session_timeout_seconds

the read timeout passed to the MCP ClientSession.

TYPE: float | None DEFAULT: 5

Source code in packages/ragbits-agents/src/ragbits/agents/mcp/server.py
def __init__(
    self,
    params: MCPServerStreamableHttpParams,
    cache_tools_list: bool = False,
    name: str | None = None,
    client_session_timeout_seconds: float | None = 5,
) -> None:
    """
    Create a new MCP server based on the Streamable HTTP transport.

    Args:
        params: The params that configure the server. This includes the URL of the server,
            the headers to send to the server, the timeout for the HTTP request, and the
            timeout for the Streamable HTTP connection and whether we need to
            terminate on close.
        cache_tools_list: Whether to cache the tools list. If `True`, the tools list will be
            cached and only fetched from the server once. If `False`, the tools list will be
            fetched from the server on each call to `list_tools()`. The cache can be
            invalidated by calling `invalidate_tools_cache()`. You should set this to `True`
            if you know the server will not change its tools list, because it can drastically
            improve latency (by avoiding a round-trip to the server every time).
        name: A readable name for the server. If not provided, we'll create one from the
            URL.
        client_session_timeout_seconds: the read timeout passed to the MCP ClientSession.
    """
    super().__init__(cache_tools_list, client_session_timeout_seconds)

    self.params = params
    self._name = name or f"streamable_http: {self.params['url']}"

session instance-attribute #

session: ClientSession | None = None

exit_stack instance-attribute #

exit_stack = AsyncExitStack()

cache_tools_list instance-attribute #

cache_tools_list = cache_tools_list

server_initialize_result instance-attribute #

server_initialize_result: InitializeResult | None = None

client_session_timeout_seconds instance-attribute #

client_session_timeout_seconds = client_session_timeout_seconds

params instance-attribute #

params = params

name property #

name: str

A readable name for the server.

connect async #

connect() -> None

Connect to the server.

Source code in packages/ragbits-agents/src/ragbits/agents/mcp/server.py
async def connect(self) -> None:
    """
    Connect to the server.
    """
    try:
        transport = await self.exit_stack.enter_async_context(self.create_streams())
        # streamablehttp_client returns (read, write, get_session_id)
        # sse_client returns (read, write)

        read, write, *_ = transport

        session = await self.exit_stack.enter_async_context(
            ClientSession(
                read,
                write,
                timedelta(seconds=self.client_session_timeout_seconds)
                if self.client_session_timeout_seconds
                else None,
            )
        )
        server_result = await session.initialize()
        self.server_initialize_result = server_result
        self.session = session
    except Exception as e:
        logger.error(f"Error initializing MCP server: {e}")
        await self.cleanup()
        raise

cleanup async #

cleanup() -> None

Cleanup the server.

Source code in packages/ragbits-agents/src/ragbits/agents/mcp/server.py
async def cleanup(self) -> None:
    """
    Cleanup the server.
    """
    async with self._cleanup_lock:
        try:
            await self.exit_stack.aclose()
        except Exception as e:
            logger.error(f"Error cleaning up server: {e}")
        finally:
            self.session = None

list_tools async #

list_tools() -> list[Tool]

List the tools available on the server.

Source code in packages/ragbits-agents/src/ragbits/agents/mcp/server.py
async def list_tools(self) -> list["MCPTool"]:
    """
    List the tools available on the server.
    """
    if not self.session:
        raise RuntimeError("Server not initialized. Make sure you call `connect()` first.")

    # Return from cache if caching is enabled, we have tools, and the cache is not dirty
    if self.cache_tools_list and not self._cache_dirty and self._tools_list:
        return self._tools_list

    # Reset the cache dirty to False
    self._cache_dirty = False

    # Fetch the tools from the server
    self._tools_list = (await self.session.list_tools()).tools
    return self._tools_list

call_tool async #

call_tool(tool_name: str, arguments: dict[str, Any] | None) -> CallToolResult

Invoke a tool on the server.

Source code in packages/ragbits-agents/src/ragbits/agents/mcp/server.py
async def call_tool(self, tool_name: str, arguments: dict[str, Any] | None) -> "CallToolResult":
    """
    Invoke a tool on the server.
    """
    if not self.session:
        raise RuntimeError("Server not initialized. Make sure you call `connect()` first.")

    return await self.session.call_tool(tool_name, arguments)

invalidate_tools_cache #

invalidate_tools_cache() -> None

Invalidate the tools cache.

Source code in packages/ragbits-agents/src/ragbits/agents/mcp/server.py
def invalidate_tools_cache(self) -> None:
    """
    Invalidate the tools cache.
    """
    self._cache_dirty = True

create_streams #

create_streams() -> AbstractAsyncContextManager[tuple[MemoryObjectReceiveStream[SessionMessage | Exception], MemoryObjectSendStream[SessionMessage], GetSessionIdCallback | None]]

Create the streams for the server.

Source code in packages/ragbits-agents/src/ragbits/agents/mcp/server.py
def create_streams(
    self,
) -> AbstractAsyncContextManager[
    tuple[
        "MemoryObjectReceiveStream[SessionMessage | Exception]",
        "MemoryObjectSendStream[SessionMessage]",
        "GetSessionIdCallback | None",
    ]
]:
    """
    Create the streams for the server.
    """
    return streamablehttp_client(
        url=self.params["url"],
        headers=self.params.get("headers", None),
        timeout=self.params.get("timeout", 5),
        sse_read_timeout=self.params.get("sse_read_timeout", 60 * 5),
        terminate_on_close=self.params.get("terminate_on_close", True),
    )