Observability
Agentix provides structured JSON logging and OpenTelemetry tracing out of the box.
Structured logging
from agentix.log import setup_logging
# JSON structured logging (recommended for production)
setup_logging(level="INFO", json=True)
# Plain text (development)
setup_logging(level="DEBUG", json=False)
This configures the agentix logger namespace in isolation (propagate=False). All Agentix internal logs flow through this namespace.
JSONFormatter
Wire it into your own logging setup:
from agentix.log import JSONFormatter
import logging
handler = logging.StreamHandler()
handler.setFormatter(JSONFormatter())
logging.getLogger("agentix").addHandler(handler)
logging.getLogger("agentix").setLevel(logging.INFO)
Each JSON log line includes:
| Field | Description |
|---|---|
timestamp | ISO 8601 with milliseconds |
level | Log level name (DEBUG, INFO, WARNING, etc.) |
logger | Logger name |
message | Log message |
module | Source module name |
function | Source function name |
line | Source line number |
exception | Full traceback (only when exc_info present) |
data | Structured key-value dict (when emitted via log_event()) |
Structured log events
from agentix.log import log_event, get_logger
import logging
log = get_logger("mymodule") # returns logging.getLogger("agentix.mymodule")
log_event(log, logging.INFO, "tool_executed",
tool="Bash", duration_ms=42, exit_code=0)
# → {"timestamp": "...", "level": "INFO", "message": "tool_executed",
# "data": {"tool": "Bash", "duration_ms": 42, "exit_code": 0}}
OpenTelemetry tracing
Agentix instruments LLM calls, tool executions, sub-agent dispatches, and gateway dispatches as OpenTelemetry spans. There is zero overhead when opentelemetry-api is not installed.
Setup
from agentix.tracing import setup_tracing
# Auto-configure with ConsoleSpanExporter (quick local debugging)
setup_tracing()
# Use your own TracerProvider (production)
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
provider = TracerProvider()
provider.add_span_processor(
BatchSpanProcessor(OTLPSpanExporter(endpoint="http://otel-collector:4317"))
)
setup_tracing(provider=provider)
Install the SDK and exporter separately (not bundled with Agentix):
pip install opentelemetry-sdk opentelemetry-exporter-otlp-proto-grpc
Span hierarchy
| Span name | Trigger | Key attributes |
|---|---|---|
agentix.run | Each query() call | agentix.session_id, agentix.agent_name, agentix.model, agentix.provider |
agentix.llm_call | Each LLM API call | agentix.model, agentix.turn |
agentix.tool_call | Each tool execution | agentix.tool_name, agentix.tool_use_id |
agentix.subagent | Each sub-agent dispatch | agentix.child_name |
agentix.gateway_dispatch | Each gateway message | agentix.channel_type, agentix.user_id, agentix.session_id |
Propagating trace context
Pass W3C trace headers to sub-processes or external services:
from agentix.tracing import get_current_trace_context
ctx = get_current_trace_context()
# {"traceparent": "00-abc123...", "tracestate": ""}
# Pass as HTTP headers or env vars to downstream services
Returns an empty dict when no active span or when tracing is disabled.
Usage metrics in ResultMessage
Every ResultMessage includes per-turn and per-tool-call timing without any additional setup:
async for msg in client.query("..."):
if isinstance(msg, ResultMessage) and msg.usage:
# Per-turn aggregate timings
for turn in msg.usage.get("turn_timings", []):
print(f"Turn {turn['turn']}: "
f"llm={turn['llm_ms']:.0f}ms "
f"tool={turn['tool_ms']:.0f}ms "
f"tokens={turn['tokens']}")
# Per-tool-call granular metrics
for call in msg.usage.get("tool_metrics", []):
print(f" {call['tool_name']}: {call['duration_ms']:.1f}ms "
f"error={call['is_error']} chars={call['result_length']}")
Hook-based metrics
Combine hooks with any metrics backend for custom instrumentation:
import time
from agentix import AgentixAgentOptions, HookMatcher
_start: dict[str, float] = {}
async def record_start(hook_input: dict, context) -> None:
_start[hook_input["tool_use_id"]] = time.monotonic()
async def record_end(hook_input: dict, context) -> None:
elapsed_ms = (time.monotonic() - _start.pop(hook_input["tool_use_id"], 0)) * 1000
# send to statsd / prometheus / datadog / etc.
metrics.histogram("tool.duration_ms", elapsed_ms,
tags={"tool": hook_input["tool_name"]})
options = AgentixAgentOptions(
hooks={
"PreToolUse": [HookMatcher(matcher=None, hooks=[record_start])],
"PostToolUse": [HookMatcher(matcher=None, hooks=[record_end])],
}
)