How-To: Stream downstream agents with Ragbits#
Ragbits Agent can call other agents as tools, creating a chain of reasoning where downstream agents provide structured results to the parent agent.
Using the streaming API, you can observe every chunk of output as it is generated, including tool calls, tool results, and final text - perfect for real-time monitoring or chat interfaces.
Define a simple tool#
A tool is just a Python function returning a JSON-serializable result. Here’s an example tool returning the current time for a given location:
import json
def get_time(location: str) -> str:
"""
Returns the current time for a given location.
Args:
location: The location to get the time for.
Returns:
The current time for the given location.
"""
loc = location.lower()
if "tokyo" in loc:
return json.dumps({"location": "Tokyo", "time": "10:00 AM"})
elif "paris" in loc:
return json.dumps({"location": "Paris", "time": "04:00 PM"})
elif "san francisco" in loc:
return json.dumps({"location": "San Francisco", "time": "07:00 PM"})
else:
return json.dumps({"location": location, "time": "unknown"})
Create a downstream agent#
The downstream agent wraps the tool with a prompt, allowing the LLM to use it as a function.
from pydantic import BaseModel
from ragbits.core.prompt import Prompt
from ragbits.agents import Agent
from ragbits.agents._main import AgentOptions
from ragbits.core.llms import LiteLLM
class TimePromptInput(BaseModel):
"""Input schema for the TimePrompt, containing the target location."""
location: str
class TimePrompt(Prompt[TimePromptInput]):
"""
Provides instructions for generating the current time in a user-specified
location.
"""
system_prompt = """
You are a helpful assistant that tells the current time in a given city.
"""
user_prompt = """
What time is it in {{ location }}?
"""
llm = LiteLLM(model_name="gpt-4o-2024-08-06", use_structured_output=True)
time_agent = Agent(
name="time_agent",
description="Returns current time for a given location",
llm=llm,
prompt=TimePrompt,
tools=[get_time],
default_options=AgentOptions(max_total_tokens=1000, max_turns=5),
)
Create a parent QA agent#
The parent agent can call downstream agents as tools. This lets the LLM reason and decide when to invoke the downstream agent.
class QAPromptInput(BaseModel):
"""Input schema for the QA agent, containing a natural-language question."""
question: str
class QAPrompt(Prompt[QAPromptInput]):
"""
Guides the agent to respond to user questions.
"""
system_prompt = """
You are a helpful assistant that responds to user questions.
"""
user_prompt = """
{{ question }}.
"""
llm = LiteLLM(model_name="gpt-4o-2024-08-06", use_structured_output=True)
qa_agent = Agent(
name="qa_agent",
llm=llm,
prompt=QAPrompt,
tools=[time_agent],
default_options=AgentOptions(max_total_tokens=1000, max_turns=5),
)
Streaming output from downstream agents#
Use run_streaming with an AgentRunContext to see output as it happens. Each chunk contains either text, a tool call, or a tool result. You can print agent names when they change and handle downstream agent events.
import asyncio
from ragbits.agents import DownstreamAgentResult
async def main() -> None:
"""
Run the QA agent with downstream streaming enabled.
The QA agent processes a sample question ("What time is it in Paris?") and delegates to
the Time Agent when necessary. Streamed results from both agents are printed in real time,
tagged by the agent that produced them.
"""
context = AgentRunContext(stream_downstream_events=True)
async for chunk in qa_agent.run_streaming(QAPromptInput(question="What time is it in Paris?"), context=context):
if isinstance(chunk, DownstreamAgentResult):
agent_name = context.get_agent(chunk.agent_id).name
print(f"[{agent_name}] {chunk.item}")
else:
print(f"[{qa_agent.name}] {chunk}")
if __name__ == "__main__":
asyncio.run(main())