Skip to content

Observability

Automatic OTEL tracing for every message, LLM call, tool invocation, and supervisor restart. No instrumentation required.

See Observability for setup guides and the full span attribute reference.


Tracer

civitas.observability.tracer.Tracer(span_queue=None)

Creates and enriches spans for Civitas operations.

Behaviour depends on the environment: - opentelemetry-sdk installed + OTEL_EXPORTER_OTLP_ENDPOINT set -> OTLP exporter - opentelemetry-sdk installed, no endpoint -> OTEL ConsoleSpanExporter - opentelemetry-sdk not installed -> built-in print-based console output

When span_queue is provided, completed spans are additionally pushed to the queue for consumption by OTELAgent (async, non-blocking export path).

Source code in civitas/observability/tracer.py
def __init__(self, span_queue: SpanQueue | None = None) -> None:
    self._span_queue = span_queue
    self._use_otel = False
    self._otel_tracer: Any = None
    self._provider: Any = None  # F08-1/F08-5: instance-scoped provider
    self._console_fallback = True

    if _HAS_OTEL:
        provider = TracerProvider()
        endpoint = settings.otel_endpoint

        if endpoint:
            try:
                from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import (
                    OTLPSpanExporter,
                )

                exporter = OTLPSpanExporter(endpoint=endpoint)
                # F08-2: BatchSpanProcessor exports in background thread —
                # avoids blocking the event loop during OTLP network I/O.
                provider.add_span_processor(BatchSpanProcessor(exporter))
            except ImportError:
                provider.add_span_processor(SimpleSpanProcessor(OTELConsoleSpanExporter()))
        else:
            provider.add_span_processor(SimpleSpanProcessor(OTELConsoleSpanExporter()))

        # F08-1: store provider as instance attr; do NOT set the global
        self._provider = provider
        self._otel_tracer = provider.get_tracer("civitas", "0.1.0")
        self._use_otel = True
        self._console_fallback = False
    else:
        # F08-7: warn at startup if OTEL not available so operators notice
        logger.warning(
            "[Tracer] opentelemetry-sdk not installed — using console fallback; "
            "install opentelemetry-sdk for structured tracing"
        )

start_span(name, trace_id='', parent_span_id=None, attributes=None)

Create a general-purpose span (for agent lifecycle, LLM calls, etc).

Source code in civitas/observability/tracer.py
def start_span(
    self,
    name: str,
    trace_id: str = "",
    parent_span_id: str | None = None,
    attributes: dict[str, Any] | None = None,
) -> Span:
    """Create a general-purpose span (for agent lifecycle, LLM calls, etc)."""
    return self._make_span(
        name=name,
        trace_id=trace_id or os.urandom(16).hex(),
        span_id=_new_span_id(),
        parent_span_id=parent_span_id,
        attributes=attributes or {},
    )

start_send_span(message)

Create a span for an outbound message send.

Source code in civitas/observability/tracer.py
def start_send_span(self, message: Message) -> Span:
    """Create a span for an outbound message send."""
    span = self._make_span(
        name=f"send {message.type}",
        trace_id=message.trace_id,
        span_id=message.span_id,
        parent_span_id=message.parent_span_id,
        attributes={
            "civitas.sender": message.sender,
            "civitas.recipient": message.recipient,
            "civitas.message_type": message.type,
            "civitas.message_id": message.id,
        },
    )
    if self._console_fallback:
        ts = time.strftime("%H:%M:%S", time.localtime(span.start_time))
        ms = f"{span.start_time % 1:.3f}"[1:]
        logger.debug(  # F08-7: debug so operators can opt in
            "[%s%s] %s -> %s: %s",
            ts,
            ms,
            message.sender,
            message.recipient,
            message.type,
        )
    return span

start_receive_span(message)

Create a span for an inbound message receive.

Source code in civitas/observability/tracer.py
def start_receive_span(self, message: Message) -> Span:
    """Create a span for an inbound message receive."""
    return self._make_span(
        name=f"recv {message.type}",
        trace_id=message.trace_id,
        span_id=_new_span_id(),
        parent_span_id=message.span_id,
        attributes={
            "civitas.sender": message.sender,
            "civitas.recipient": message.recipient,
            "civitas.message_type": message.type,
            "civitas.message_id": message.id,
        },
    )

start_llm_span(model, trace_id, parent_span_id=None)

Create a span for an LLM call. Call end_llm_span() after the call completes.

Source code in civitas/observability/tracer.py
def start_llm_span(
    self,
    model: str,
    trace_id: str,  # F08-3: required — callers must supply a trace_id
    parent_span_id: str | None = None,
) -> Span:
    """Create a span for an LLM call. Call end_llm_span() after the call completes."""
    return self.start_span(
        name=f"llm.chat {model}",
        trace_id=trace_id,
        parent_span_id=parent_span_id,
        attributes={"llm.model": model},
    )

end_llm_span(span, *, tokens_in=0, tokens_out=0, cost_usd=0.0)

Enrich and close an LLM span with response metrics.

Source code in civitas/observability/tracer.py
def end_llm_span(
    self,
    span: Span,
    *,
    tokens_in: int = 0,
    tokens_out: int = 0,
    cost_usd: float = 0.0,
) -> None:
    """Enrich and close an LLM span with response metrics."""
    span.set_attribute("llm.tokens_in", tokens_in)
    span.set_attribute("llm.tokens_out", tokens_out)
    span.set_attribute("llm.cost_usd", cost_usd)
    latency_ms = (time.time() - span.start_time) * 1000
    span.set_attribute("llm.latency_ms", round(latency_ms, 2))
    span.end()
    if self._console_fallback:
        model = span.attributes.get("llm.model", "?")
        logger.debug(  # F08-7
            "  [llm] %s: %din/%dout $%.4f %.0fms",
            model,
            tokens_in,
            tokens_out,
            cost_usd,
            latency_ms,
        )

start_tool_span(tool_name, trace_id='', parent_span_id=None)

Create a span for a tool invocation.

Source code in civitas/observability/tracer.py
def start_tool_span(
    self,
    tool_name: str,
    trace_id: str = "",
    parent_span_id: str | None = None,
) -> Span:
    """Create a span for a tool invocation."""
    return self.start_span(
        name=f"tool.execute {tool_name}",
        trace_id=trace_id,
        parent_span_id=parent_span_id,
        attributes={"tool.name": tool_name},
    )

end_tool_span(span, *, status='ok')

Enrich and close a tool span with result status.

Source code in civitas/observability/tracer.py
def end_tool_span(
    self,
    span: Span,
    *,
    status: str = "ok",
) -> None:
    """Enrich and close a tool span with result status."""
    span.set_attribute("tool.result_status", status)
    latency_ms = (time.time() - span.start_time) * 1000
    span.set_attribute("tool.latency_ms", round(latency_ms, 2))
    span.end()
    if self._console_fallback:
        tool_name = span.attributes.get("tool.name", "?")
        logger.debug("  [tool] %s: %s %.0fms", tool_name, status, latency_ms)  # F08-7

new_trace_id()

Generate a new trace ID (32-hex-char).

Source code in civitas/observability/tracer.py
def new_trace_id(self) -> str:
    """Generate a new trace ID (32-hex-char)."""
    return os.urandom(16).hex()

flush()

Force-export any pending spans via this tracer's provider.

Source code in civitas/observability/tracer.py
def flush(self) -> None:
    """Force-export any pending spans via this tracer's provider."""
    if self._use_otel and self._provider is not None:  # F08-5: use instance provider
        self._provider.force_flush()

SpanQueue

civitas.observability.span_queue.SpanQueue(maxsize=10000)

Thin asyncio.Queue wrapper for completed SpanData.

The Tracer puts spans here via put_nowait() (never blocks). OTELAgent drains this queue and calls the ExportBackend.

Source code in civitas/observability/span_queue.py
def __init__(self, maxsize: int = 10_000) -> None:
    self._queue: asyncio.Queue[SpanData] = asyncio.Queue(maxsize=maxsize)

put_nowait(span)

Enqueue a completed span. Drops oldest if full (never blocks).

Source code in civitas/observability/span_queue.py
def put_nowait(self, span: SpanData) -> None:
    """Enqueue a completed span. Drops oldest if full (never blocks)."""
    try:
        self._queue.put_nowait(span)
    except asyncio.QueueFull:
        # Drop the oldest span to make room — losing a span is better
        # than blocking the message loop.
        try:
            self._queue.get_nowait()
        except asyncio.QueueEmpty:
            pass
        self._queue.put_nowait(span)

get() async

Dequeue a span. Awaits until one is available.

Source code in civitas/observability/span_queue.py
async def get(self) -> SpanData:
    """Dequeue a span. Awaits until one is available."""
    return await self._queue.get()

civitas.observability.span_queue.SpanData(name, trace_id, span_id, parent_span_id, start_time, end_time, attributes=dict(), status='ok', error_message=None) dataclass

Completed span ready for export. All fields are plain Python types.


Export Backends

civitas.observability.export_backend.ExportBackend

Bases: Protocol

Receives batches of completed spans and ships them to a backend.

export(spans) async

Export a batch of completed spans.

shutdown() async

Flush any pending data and close connections.


civitas.observability.export_backend.ConsoleBackend

Prints a human-readable summary of each span to stdout via logging.


civitas.observability.export_backend.FanOutBackend(backends)

Exports spans to multiple backends in sequence.

Errors from one backend are logged and do not prevent others from running.

Source code in civitas/observability/export_backend.py
def __init__(self, backends: list[ExportBackend]) -> None:
    self._backends = backends