Skip to main content

Message Types

query() yields a stream of Message objects. Import them all from agentix.

Message union

Message = UserMessage | AssistantMessage | SystemMessage | ResultMessage | StreamEvent

AssistantMessage

The agent's complete response for a single turn.

AssistantMessage(
content: list[TextBlock | ThinkingBlock | ToolUseBlock | ToolResultBlock],
model: str, # model that produced this response
parent_tool_use_id: str | None, # set when this is a sub-agent response
error: AssistantMessageError | None, # provider-level error if the LLM call failed
)

AssistantMessageError is a literal: "authentication_failed", "billing_error", "rate_limit", "invalid_request", "server_error", "unknown".

Content blocks

TypeFieldsDescription
TextBlocktext: strPlain text response
ThinkingBlockthinking: str, signature: strExtended reasoning output (Anthropic)
ToolUseBlockid: str, name: str, input: dictTool invocation
ToolResultBlocktool_use_id: str, content, is_error: bool | NoneTool execution result
from agentix import AssistantMessage, TextBlock, ToolUseBlock

async for msg in client.query("What files are in /tmp?"):
if isinstance(msg, AssistantMessage):
for block in msg.content:
if isinstance(block, TextBlock):
print(block.text)
elif isinstance(block, ToolUseBlock):
print(f"Using tool: {block.name}({block.input})")

ResultMessage

The final message at the end of every query. Always present regardless of include_partial_messages.

ResultMessage(
is_error: bool,
result: str | None, # always safe to access (never raises)
subtype: str | None,
stop_reason: str | None,
structured_output: Any, # parsed JSON if output_format was set
session_id: str,
usage: dict | None,
)

Stop subtypes

subtypeMeaning
NoneSuccessful completion
"error_max_turns"Exceeded max_iterations
"error_max_budget_usd"Exceeded max_tokens_budget
"error_timeout"Agent timed out
"error_unknown"Unexpected error

Stop reasons

stop_reasonMeaning
"end_turn"Normal completion
"max_tokens"LLM max token limit hit
"interrupt"client.interrupt() was called
"max_iterations"Iteration limit reached

Usage metrics

async for msg in client.query("..."):
if isinstance(msg, ResultMessage) and msg.usage:
# Per-turn aggregate timings
for turn in msg.usage.get("turn_timings", []):
# {"turn": 1, "llm_ms": 450.0, "tool_ms": 120.0, "tokens": 512}

# Per-tool-call granular metrics
for call in msg.usage.get("tool_metrics", []):
# {"tool_name": "Bash", "tool_use_id": "tu-1",
# "duration_ms": 150.3, "is_error": False,
# "result_length": 42, "turn": 1}

StreamEvent

Raw provider streaming events when include_partial_messages=True.

StreamEvent(
uuid: str, # unique event identifier
session_id: str, # session this event belongs to
event: dict[str, Any], # raw provider-specific SSE payload
parent_tool_use_id: str | None, # set when event is part of a tool input stream
)

StreamEvent.event is the unprocessed payload from the LLM provider's streaming API — its structure is provider-specific. See Streaming for usage patterns.

UserMessage

Emitted once at the start of each query() call.

UserMessage(
content: str | list[ContentBlock],
uuid: str | None,
parent_tool_use_id: str | None,
tool_use_result: dict[str, Any] | None,
)

SystemMessage

Framework metadata events. Safe to ignore if you only care about agent output.

SystemMessage(
subtype: str, # e.g. "session_start", "context_summarised"
data: dict[str, Any], # arbitrary metadata for the event
)

Deprecated exports

The following are exported from agentix but are deprecated stubs kept for backward compatibility only. Do not use them in new code — they will be removed in a future version:

  • UserInputRequestedEvent
  • UserInputResolvedEvent
  • UserInputTimedOutEvent
  • subscribe_user_input_events

Use the can_use_tool callback and AskUserQuestion tool instead.