Streaming
By default, query() yields complete AssistantMessage objects — one per agent turn — after each LLM response finishes. Enable partial streaming with include_partial_messages=True to also receive raw StreamEvent messages as tokens arrive.
Enable streaming
from agentix import AgentixAgentOptions, AgentixClient, AssistantMessage, ResultMessage, StreamEvent, TextBlock
options = AgentixAgentOptions(
provider="anthropic",
model="claude-sonnet-4-20250514",
include_partial_messages=True, # yield StreamEvent tokens in addition to full messages
)
async def main():
async with AgentixClient(options) as client:
async for msg in client.query("Write a haiku about Python."):
if isinstance(msg, StreamEvent):
# Raw provider event — extract text delta for display
event = msg.event
if event.get("type") == "content_block_delta":
delta = event.get("delta", {})
if delta.get("type") == "text_delta":
print(delta.get("text", ""), end="", flush=True)
elif isinstance(msg, AssistantMessage):
print() # newline after complete turn
elif isinstance(msg, ResultMessage):
break
StreamEvent fields
StreamEvent(
uuid: str, # unique event identifier
session_id: str, # session this event belongs to
event: dict[str, Any], # raw provider-specific SSE payload
parent_tool_use_id: str | None, # set when event is part of a tool input stream
)
StreamEvent.event is the unprocessed payload from the LLM provider's streaming API. Its structure is provider-specific:
- Anthropic: follows the Anthropic streaming event format
- OpenAI / OpenAI-compatible: follows the OpenAI streaming delta format
For most use cases — displaying the final response, tracking tool calls, measuring latency — reading the complete AssistantMessage and ResultMessage is simpler and provider-agnostic. Use StreamEvent when you need real-time token-by-token display or fine-grained streaming control.
Message order with streaming enabled
StreamEvent ← partial token (one per delta, many per turn)
StreamEvent
...
AssistantMessage ← complete turn message (always emitted)
StreamEvent ← next turn's tokens
...
AssistantMessage
ResultMessage ← always last
Without streaming (include_partial_messages=False, the default):
AssistantMessage
AssistantMessage
...
ResultMessage
Interruption mid-stream
Use client.interrupt() from another task to stop a running query at any point:
import asyncio
from agentix import AgentixClient, ResultMessage
async def run_with_timeout(client, prompt, timeout=10.0):
async def _cancel():
await asyncio.sleep(timeout)
client.interrupt()
task = asyncio.create_task(_cancel())
try:
async for msg in client.query(prompt):
if isinstance(msg, ResultMessage):
return msg
finally:
task.cancel()
The run ends with a ResultMessage where stop_reason="interrupt".
Backpressure
If your consumer processes messages slower than the agent produces them, apply backpressure with max_queue_size:
options = AgentixAgentOptions(
max_queue_size=50, # pause the agent loop when 50 messages are buffered
)