Skip to main content

Observability

Agentix provides structured JSON logging and OpenTelemetry tracing out of the box.


Structured logging

from agentix.log import setup_logging

# JSON structured logging (recommended for production)
setup_logging(level="INFO", json=True)

# Plain text (development)
setup_logging(level="DEBUG", json=False)

This configures the agentix logger namespace in isolation (propagate=False). All Agentix internal logs flow through this namespace.

JSONFormatter

Wire it into your own logging setup:

from agentix.log import JSONFormatter
import logging

handler = logging.StreamHandler()
handler.setFormatter(JSONFormatter())
logging.getLogger("agentix").addHandler(handler)
logging.getLogger("agentix").setLevel(logging.INFO)

Each JSON log line includes:

FieldDescription
timestampISO 8601 with milliseconds
levelLog level name (DEBUG, INFO, WARNING, etc.)
loggerLogger name
messageLog message
moduleSource module name
functionSource function name
lineSource line number
exceptionFull traceback (only when exc_info present)
dataStructured key-value dict (when emitted via log_event())

Structured log events

from agentix.log import log_event, get_logger
import logging

log = get_logger("mymodule") # returns logging.getLogger("agentix.mymodule")

log_event(log, logging.INFO, "tool_executed",
tool="Bash", duration_ms=42, exit_code=0)
# → {"timestamp": "...", "level": "INFO", "message": "tool_executed",
# "data": {"tool": "Bash", "duration_ms": 42, "exit_code": 0}}

OpenTelemetry tracing

Agentix instruments LLM calls, tool executions, sub-agent dispatches, and gateway dispatches as OpenTelemetry spans. There is zero overhead when opentelemetry-api is not installed.

Setup

from agentix.tracing import setup_tracing

# Auto-configure with ConsoleSpanExporter (quick local debugging)
setup_tracing()

# Use your own TracerProvider (production)
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter

provider = TracerProvider()
provider.add_span_processor(
BatchSpanProcessor(OTLPSpanExporter(endpoint="http://otel-collector:4317"))
)

setup_tracing(provider=provider)

Install the SDK and exporter separately (not bundled with Agentix):

pip install opentelemetry-sdk opentelemetry-exporter-otlp-proto-grpc

Span hierarchy

Span nameTriggerKey attributes
agentix.runEach query() callagentix.session_id, agentix.agent_name, agentix.model, agentix.provider
agentix.llm_callEach LLM API callagentix.model, agentix.turn
agentix.tool_callEach tool executionagentix.tool_name, agentix.tool_use_id
agentix.subagentEach sub-agent dispatchagentix.child_name
agentix.gateway_dispatchEach gateway messageagentix.channel_type, agentix.user_id, agentix.session_id

Propagating trace context

Pass W3C trace headers to sub-processes or external services:

from agentix.tracing import get_current_trace_context

ctx = get_current_trace_context()
# {"traceparent": "00-abc123...", "tracestate": ""}
# Pass as HTTP headers or env vars to downstream services

Returns an empty dict when no active span or when tracing is disabled.


Usage metrics in ResultMessage

Every ResultMessage includes per-turn and per-tool-call timing without any additional setup:

async for msg in client.query("..."):
if isinstance(msg, ResultMessage) and msg.usage:
# Per-turn aggregate timings
for turn in msg.usage.get("turn_timings", []):
print(f"Turn {turn['turn']}: "
f"llm={turn['llm_ms']:.0f}ms "
f"tool={turn['tool_ms']:.0f}ms "
f"tokens={turn['tokens']}")

# Per-tool-call granular metrics
for call in msg.usage.get("tool_metrics", []):
print(f" {call['tool_name']}: {call['duration_ms']:.1f}ms "
f"error={call['is_error']} chars={call['result_length']}")

Hook-based metrics

Combine hooks with any metrics backend for custom instrumentation:

import time
from agentix import AgentixAgentOptions, HookMatcher

_start: dict[str, float] = {}

async def record_start(hook_input: dict, context) -> None:
_start[hook_input["tool_use_id"]] = time.monotonic()

async def record_end(hook_input: dict, context) -> None:
elapsed_ms = (time.monotonic() - _start.pop(hook_input["tool_use_id"], 0)) * 1000
# send to statsd / prometheus / datadog / etc.
metrics.histogram("tool.duration_ms", elapsed_ms,
tags={"tool": hook_input["tool_name"]})

options = AgentixAgentOptions(
hooks={
"PreToolUse": [HookMatcher(matcher=None, hooks=[record_start])],
"PostToolUse": [HookMatcher(matcher=None, hooks=[record_end])],
}
)