MessageBus¶
Central message router. Routes messages from sender to recipient by name, delegates physical delivery to the Transport, and generates tracing spans.
See Architecture for the routing resolution order.
civitas.bus.MessageBus(transport, registry, serializer, tracer, audit_sink=None)
¶
Central message router.
Routes messages from sender to recipient by name, delegates physical delivery to the Transport, applies serialization via the Serializer, and generates tracing spans for every send/receive.
Routing precedence in route(): 1. Registry lookup by recipient name → use RoutingEntry.address 2. Transport ephemeral reply address → publish directly (for request-reply) 3. Neither → raise MessageRoutingError
Source code in civitas/bus.py
setup_agent(agent)
async
¶
Subscribe the transport to deliver messages to an agent's mailbox.
Source code in civitas/bus.py
route(message)
async
¶
Route a message to its recipient.
Validates system message types, creates a send span, serializes the message, and publishes through the transport.
Routing order: 1. Registry lookup → use RoutingEntry.address 2. Transport ephemeral reply address (has_reply_address) → publish directly 3. Neither → raise MessageRoutingError
Source code in civitas/bus.py
request(message, timeout=30.0)
async
¶
Send a request message and await a reply.
Used by ask() — delegates to transport.request() which handles correlation and reply routing.
Source code in civitas/bus.py
civitas.registry.LocalRegistry()
¶
Single-node in-memory registry.
Default implementation for single-process and same-node deployments. All reads are O(1) dict lookups — no I/O, no async.
Remote agents can be registered via register_remote() so that
pattern-based broadcast works across process boundaries; they are
represented as RoutingEntry(is_local=False) and carry no object
reference.
Capability-based lookups (find_by_capability, find_by_capabilities)
work across both local and remote entries — capability tags are included
in cross-process Worker announcements so every node has a complete view.
Listeners registered via add_listener() are notified after every
register/deregister. Intended for external governance systems (e.g.
Presidium) that need a live view of the agent population.
Source code in civitas/registry.py
register(name, address=None, *, is_local=True, capabilities=None, capability_metadata=None)
¶
Register an agent.
address defaults to name when not given, which is correct
for in-process and NATS transports. Pass an explicit address for
ZMQ TCP endpoints.
Raises ValueError if the name is already registered.
Source code in civitas/registry.py
register_remote(name, capabilities=None, capability_metadata=None)
¶
Register a remote agent for cross-process pattern matching.
Idempotent for repeated announcements of the same remote agent.
Raises ValueError if the name is already registered as local.
Source code in civitas/registry.py
deregister(name)
¶
lookup(name)
¶
lookup_all(pattern)
¶
Return all entries whose name matches a glob pattern.
civitas.registry.RoutingEntry(name, address, is_local, capabilities=(), capability_metadata=dict())
dataclass
¶
Routing metadata for a registered agent.
address is the transport-level identifier used by the bus when
calling transport.publish(). For in-process and NATS deployments
this equals the agent name. For ZMQ point-to-point it is the endpoint
string (e.g. tcp://host:5555).
is_local is True when the agent runs inside this process, False for
agents registered via cross-process discovery.
capabilities is a tuple of capability tag strings declared by the
agent (e.g. ("text.summarize", "text.translate")).
capability_metadata is a free-form dict passed through verbatim to
registry listeners (e.g. Presidium). The runtime never interprets it.