Message Types
query() yields a stream of Message objects. Import them all from agentix.
Message union
Message = UserMessage | AssistantMessage | SystemMessage | ResultMessage | StreamEvent
AssistantMessage
The agent's complete response for a single turn.
AssistantMessage(
content: list[TextBlock | ThinkingBlock | ToolUseBlock | ToolResultBlock],
model: str, # model that produced this response
parent_tool_use_id: str | None, # set when this is a sub-agent response
error: AssistantMessageError | None, # provider-level error if the LLM call failed
)
AssistantMessageError is a literal: "authentication_failed", "billing_error", "rate_limit", "invalid_request", "server_error", "unknown".
Content blocks
| Type | Fields | Description |
|---|---|---|
TextBlock | text: str | Plain text response |
ThinkingBlock | thinking: str, signature: str | Extended reasoning output (Anthropic) |
ToolUseBlock | id: str, name: str, input: dict | Tool invocation |
ToolResultBlock | tool_use_id: str, content, is_error: bool | None | Tool execution result |
from agentix import AssistantMessage, TextBlock, ToolUseBlock
async for msg in client.query("What files are in /tmp?"):
if isinstance(msg, AssistantMessage):
for block in msg.content:
if isinstance(block, TextBlock):
print(block.text)
elif isinstance(block, ToolUseBlock):
print(f"Using tool: {block.name}({block.input})")
ResultMessage
The final message at the end of every query. Always present regardless of include_partial_messages.
ResultMessage(
is_error: bool,
result: str | None, # always safe to access (never raises)
subtype: str | None,
stop_reason: str | None,
structured_output: Any, # parsed JSON if output_format was set
session_id: str,
usage: dict | None,
)
Stop subtypes
subtype | Meaning |
|---|---|
None | Successful completion |
"error_max_turns" | Exceeded max_iterations |
"error_max_budget_usd" | Exceeded max_tokens_budget |
"error_timeout" | Agent timed out |
"error_unknown" | Unexpected error |
Stop reasons
stop_reason | Meaning |
|---|---|
"end_turn" | Normal completion |
"max_tokens" | LLM max token limit hit |
"interrupt" | client.interrupt() was called |
"max_iterations" | Iteration limit reached |
Usage metrics
async for msg in client.query("..."):
if isinstance(msg, ResultMessage) and msg.usage:
# Per-turn aggregate timings
for turn in msg.usage.get("turn_timings", []):
# {"turn": 1, "llm_ms": 450.0, "tool_ms": 120.0, "tokens": 512}
# Per-tool-call granular metrics
for call in msg.usage.get("tool_metrics", []):
# {"tool_name": "Bash", "tool_use_id": "tu-1",
# "duration_ms": 150.3, "is_error": False,
# "result_length": 42, "turn": 1}
StreamEvent
Raw provider streaming events when include_partial_messages=True.
StreamEvent(
uuid: str, # unique event identifier
session_id: str, # session this event belongs to
event: dict[str, Any], # raw provider-specific SSE payload
parent_tool_use_id: str | None, # set when event is part of a tool input stream
)
StreamEvent.event is the unprocessed payload from the LLM provider's streaming API — its structure is provider-specific. See Streaming for usage patterns.
UserMessage
Emitted once at the start of each query() call.
UserMessage(
content: str | list[ContentBlock],
uuid: str | None,
parent_tool_use_id: str | None,
tool_use_result: dict[str, Any] | None,
)
SystemMessage
Framework metadata events. Safe to ignore if you only care about agent output.
SystemMessage(
subtype: str, # e.g. "session_start", "context_summarised"
data: dict[str, Any], # arbitrary metadata for the event
)
Deprecated exports
The following are exported from agentix but are deprecated stubs kept for backward compatibility only. Do not use them in new code — they will be removed in a future version:
UserInputRequestedEventUserInputResolvedEventUserInputTimedOutEventsubscribe_user_input_events
Use the can_use_tool callback and AskUserQuestion tool instead.