-
Notifications
You must be signed in to change notification settings - Fork 2.7k
Labels
needs-more-infoWaiting for a reply/more info from the authorWaiting for a reply/more info from the authorquestionQuestion about using the SDKQuestion about using the SDK
Description
I'm using Runner.run_streamed(...).stream_events()
to stream responses from an agent. However, after the stream completes, result_stream.context_wrapper.usage
always returns zero. I confirmed that the stream is fully consumed before accessing usage. Is this expected behavior?
result_stream = Runner.run_streamed(agent, prompt, context=context, session=session)
async for event in result_stream.stream_events():
# streaming logic
usage = result_stream.context_wrapper.usage # always zero**
FULL CODE
async def process_user_query(self, user_query: str):
await self.session.add_items([{"role": "user", "content": user_query}])
last_messages = await self.session.get_items(limit=4, user_query=user_query)
temp_session = MemoryManagerSession("router_temp_" + self.session_id)
await temp_session.add_items(last_messages)
query_type = await self.detect_query_type(user_query)
context = AgentContext(principal_ids=self.principal_ids, query_type=query_type)
prompt = f"User Query (answer this): {user_query}\n"
router_agent = self.master_agent.agent
result_stream = Runner.run_streamed(router_agent, prompt, context=context, session=self.session)
accumulated_output = ""
tool_used = None
agent_name = "MasterAgent"
tool_call_events = []
tool_call_results = []
tool_calls_pending = False
try:
async for event in result_stream.stream_events():
if event.type == "agent_updated_stream_event":
agent_name = getattr(event.new_agent, "name", agent_name)
elif event.type == "run_item_stream_event":
if getattr(event.item, "type", None) == "tool_called":
tool_used = getattr(event.item, "name", None)
tool_call_events.append(event)
tool_calls_pending = True
elif getattr(event.item, "type", None) == "tool_call_output":
tool_call_results.append(event)
elif event.type == "raw_response_event" and isinstance(event.data, ResponseTextDeltaEvent):
delta = event.data.delta
if delta and isinstance(delta, str):
accumulated_output += delta
yield {"stream": True, "msg": delta, "agent": agent_name, "tool": tool_used}
except Exception:
pass
synth_output = ""
if tool_calls_pending:
synth_stream = Runner.run_streamed(router_agent, prompt, context=context, session=self.session)
try:
async for event in synth_stream.stream_events():
if event.type == "agent_updated_stream_event":
agent_name = getattr(event.new_agent, "name", agent_name)
elif event.type == "raw_response_event" and isinstance(event.data, ResponseTextDeltaEvent):
delta = event.data.delta
if delta and isinstance(delta, str):
synth_output += delta
yield {"stream": True, "msg": delta, "agent": agent_name, "tool": tool_used}
except Exception:
pass
final_output = (accumulated_output + synth_output).strip()
if not final_output:
fallback_msg = (
"⚠️ Sorry, something went wrong and I couldn't generate a response.\n"
"You can try again with the same query, or refresh the session and try once more."
)
yield {"stream": False, "msg": fallback_msg, "agent": agent_name, "tool": tool_used}
await self.session.add_items([{"role": "assistant", "content": final_output}])
# Ensure stream is fully consumed before accessing usage
usage = getattr(result_stream.context_wrapper, "usage", {})
trace = getattr(result_stream, "trace", {})
print("Usage:", usage)
print("Trace:", trace)
Ali-Olliek
Metadata
Metadata
Assignees
Labels
needs-more-infoWaiting for a reply/more info from the authorWaiting for a reply/more info from the authorquestionQuestion about using the SDKQuestion about using the SDK