Skip to content

MessageBus

Central message router. Routes messages from sender to recipient by name, delegates physical delivery to the Transport, and generates tracing spans.

See Architecture for the routing resolution order.


civitas.bus.MessageBus(transport, registry, serializer, tracer, audit_sink=None)

Central message router.

Routes messages from sender to recipient by name, delegates physical delivery to the Transport, applies serialization via the Serializer, and generates tracing spans for every send/receive.

Routing precedence in route(): 1. Registry lookup by recipient name → use RoutingEntry.address 2. Transport ephemeral reply address → publish directly (for request-reply) 3. Neither → raise MessageRoutingError

Source code in civitas/bus.py
def __init__(
    self,
    transport: Transport,
    registry: Registry,
    serializer: Serializer,
    tracer: Tracer,
    audit_sink: AuditSink | None = None,
) -> None:
    self._transport = transport
    self._registry = registry
    self._serializer = serializer
    self._tracer = tracer
    self._audit_sink = audit_sink

setup_agent(agent) async

Subscribe the transport to deliver messages to an agent's mailbox.

Source code in civitas/bus.py
async def setup_agent(self, agent: AgentProcess) -> None:
    """Subscribe the transport to deliver messages to an agent's mailbox."""

    async def _on_message_received(data: bytes) -> None:
        message = self._serializer.deserialize(data)
        span = self._tracer.start_receive_span(message)
        try:
            await agent.receive(message)
        finally:
            span.end()

    await self._transport.subscribe(agent.name, _on_message_received)

route(message) async

Route a message to its recipient.

Validates system message types, creates a send span, serializes the message, and publishes through the transport.

Routing order: 1. Registry lookup → use RoutingEntry.address 2. Transport ephemeral reply address (has_reply_address) → publish directly 3. Neither → raise MessageRoutingError

Source code in civitas/bus.py
async def route(self, message: Message) -> None:
    """Route a message to its recipient.

    Validates system message types, creates a send span, serializes the
    message, and publishes through the transport.

    Routing order:
    1. Registry lookup → use RoutingEntry.address
    2. Transport ephemeral reply address (has_reply_address) → publish directly
    3. Neither → raise MessageRoutingError
    """
    self._validate_message_type(message)

    entry = self._registry.lookup(message.recipient)
    if entry is not None:
        address = entry.address
    elif self._transport.has_reply_address(message.recipient):
        # Ephemeral reply endpoint — same-process request-reply short-circuit
        address = message.recipient
    elif message.recipient.startswith("_reply."):
        # Cross-process reply: the runtime's transport owns this ephemeral topic.
        # Route by address directly — ZMQ/NATS delivery handles it.
        address = message.recipient
    else:
        raise MessageRoutingError(f"No agent registered with name: {message.recipient!r}")

    span = self._tracer.start_send_span(message)
    try:
        data = self._serializer.serialize(message)
        await self._transport.publish(address, data)
    finally:
        span.end()

    if self._audit_sink is not None:
        from datetime import datetime

        await self._audit_sink.emit(
            AuditEvent(
                event="message.route",
                ts=datetime.now(UTC).isoformat(),
                agent=message.sender,
                signer_id=message.sender,  # verified sender == signer when signing is active
                details={
                    "sender": message.sender,
                    "recipient": message.recipient,
                    "type": message.type,
                    "correlation_id": message.correlation_id or "",
                    "message_id": message.id,
                },
            )
        )

request(message, timeout=30.0) async

Send a request message and await a reply.

Used by ask() — delegates to transport.request() which handles correlation and reply routing.

Source code in civitas/bus.py
async def request(self, message: Message, timeout: float = 30.0) -> Message:
    """Send a request message and await a reply.

    Used by ask() — delegates to transport.request() which handles
    correlation and reply routing.
    """
    self._validate_message_type(message)

    entry = self._registry.lookup(message.recipient)
    if entry is None:
        raise MessageRoutingError(f"No agent registered with name: {message.recipient!r}")

    span = self._tracer.start_send_span(message)
    try:
        data = self._serializer.serialize(message)
        reply_data = await self._transport.request(entry.address, data, timeout)
        return self._serializer.deserialize(reply_data)
    finally:
        span.end()

lookup_all(pattern)

Return all registered agents matching a glob pattern.

Source code in civitas/bus.py
def lookup_all(self, pattern: str) -> list[RoutingEntry]:
    """Return all registered agents matching a glob pattern."""
    return self._registry.lookup_all(pattern)

civitas.registry.LocalRegistry()

Single-node in-memory registry.

Default implementation for single-process and same-node deployments. All reads are O(1) dict lookups — no I/O, no async.

Remote agents can be registered via register_remote() so that pattern-based broadcast works across process boundaries; they are represented as RoutingEntry(is_local=False) and carry no object reference.

Capability-based lookups (find_by_capability, find_by_capabilities) work across both local and remote entries — capability tags are included in cross-process Worker announcements so every node has a complete view.

Listeners registered via add_listener() are notified after every register/deregister. Intended for external governance systems (e.g. Presidium) that need a live view of the agent population.

Source code in civitas/registry.py
def __init__(self) -> None:
    self._entries: dict[str, RoutingEntry] = {}
    self._listeners: list[RegistryListener] = []

register(name, address=None, *, is_local=True, capabilities=None, capability_metadata=None)

Register an agent.

address defaults to name when not given, which is correct for in-process and NATS transports. Pass an explicit address for ZMQ TCP endpoints.

Raises ValueError if the name is already registered.

Source code in civitas/registry.py
def register(
    self,
    name: str,
    address: str | None = None,
    *,
    is_local: bool = True,
    capabilities: list[str] | tuple[str, ...] | None = None,
    capability_metadata: dict[str, Any] | None = None,
) -> None:
    """Register an agent.

    ``address`` defaults to ``name`` when not given, which is correct
    for in-process and NATS transports.  Pass an explicit address for
    ZMQ TCP endpoints.

    Raises ``ValueError`` if the name is already registered.
    """
    if name in self._entries:
        raise ValueError(f"Process already registered: {name!r}")
    entry = RoutingEntry(
        name=name,
        address=address if address is not None else name,
        is_local=is_local,
        capabilities=tuple(capabilities) if capabilities else (),
        capability_metadata=dict(capability_metadata) if capability_metadata else {},
    )
    self._entries[name] = entry
    self._fire_listeners(entry, "register")

register_remote(name, capabilities=None, capability_metadata=None)

Register a remote agent for cross-process pattern matching.

Idempotent for repeated announcements of the same remote agent. Raises ValueError if the name is already registered as local.

Source code in civitas/registry.py
def register_remote(
    self,
    name: str,
    capabilities: list[str] | tuple[str, ...] | None = None,
    capability_metadata: dict[str, Any] | None = None,
) -> None:
    """Register a remote agent for cross-process pattern matching.

    Idempotent for repeated announcements of the same remote agent.
    Raises ``ValueError`` if the name is already registered as local.
    """
    existing = self._entries.get(name)
    if existing is not None:
        if existing.is_local:
            raise ValueError(f"Cannot register {name!r} as remote: already registered as local")
        return  # idempotent re-announcement
    entry = RoutingEntry(
        name=name,
        address=name,
        is_local=False,
        capabilities=tuple(capabilities) if capabilities else (),
        capability_metadata=dict(capability_metadata) if capability_metadata else {},
    )
    self._entries[name] = entry
    self._fire_listeners(entry, "register")

deregister(name)

Remove an agent. No-op if not registered.

Source code in civitas/registry.py
def deregister(self, name: str) -> None:
    """Remove an agent. No-op if not registered."""
    entry = self._entries.pop(name, None)
    if entry is not None:
        self._fire_listeners(entry, "deregister")

lookup(name)

Return the RoutingEntry for name, or None if not registered.

Source code in civitas/registry.py
def lookup(self, name: str) -> RoutingEntry | None:
    """Return the RoutingEntry for ``name``, or None if not registered."""
    return self._entries.get(name)

lookup_all(pattern)

Return all entries whose name matches a glob pattern.

Source code in civitas/registry.py
def lookup_all(self, pattern: str) -> list[RoutingEntry]:
    """Return all entries whose name matches a glob pattern."""
    return [entry for name, entry in self._entries.items() if fnmatch.fnmatch(name, pattern)]

civitas.registry.RoutingEntry(name, address, is_local, capabilities=(), capability_metadata=dict()) dataclass

Routing metadata for a registered agent.

address is the transport-level identifier used by the bus when calling transport.publish(). For in-process and NATS deployments this equals the agent name. For ZMQ point-to-point it is the endpoint string (e.g. tcp://host:5555).

is_local is True when the agent runs inside this process, False for agents registered via cross-process discovery.

capabilities is a tuple of capability tag strings declared by the agent (e.g. ("text.summarize", "text.translate")).

capability_metadata is a free-form dict passed through verbatim to registry listeners (e.g. Presidium). The runtime never interprets it.