Transport¶
The pluggable message delivery layer. Swap transports by changing one line in your topology YAML — agent code is unchanged.
See Transports for a full guide with architecture diagrams.
Protocol¶
civitas.transport.Transport
¶
Bases: Protocol
Protocol that all transports implement.
Five methods. A new transport plugin implements these five methods and the entire Civitas runtime works on it.
start()
async
¶
Initialize connections, bind sockets.
stop()
async
¶
Gracefully close connections, flush pending messages.
subscribe(address, handler)
async
¶
Register a handler for messages arriving at this address.
publish(address, data)
async
¶
Send a message to an address (fire-and-forget).
request(address, data, timeout)
async
¶
Send a message and await a reply (request-reply).
wait_ready()
async
¶
Wait for connections and subscriptions to stabilize. No-op by default.
Transports with slow-joiner problems (e.g. ZMQ PUB/SUB) should override this to sleep or poll until messages will be reliably delivered. Callers invoke this after all subscribe() calls are done but before publishing.
has_reply_address(address)
¶
Return True if address is an active ephemeral reply endpoint.
Ephemeral reply addresses are created by transport.request() and are not registered agents. The bus uses this to route reply messages without going through the Registry.
Implementations¶
civitas.transport.inprocess.InProcessTransport(serializer, mailbox_size=1000)
¶
Transport for single-process deployments.
Messages are delivered by putting serialized bytes into the recipient's asyncio.Queue. Despite being in-process, messages are still serialized through the configured Serializer to ensure transport-swap compatibility.
Source code in civitas/transport/inprocess.py
wait_ready()
async
¶
request(address, data, timeout)
async
¶
Send a request and await a reply.
Creates a temporary reply address, injects reply_to into the message, publishes the request, and awaits the reply with a timeout.
Source code in civitas/transport/inprocess.py
civitas.transport.zmq.ZMQTransport(serializer, pub_addr='tcp://127.0.0.1:5559', sub_addr='tcp://127.0.0.1:5560', start_proxy=False, curve_config=None)
¶
Transport for multi-process deployments using ZeroMQ.
Implements the five-method Transport protocol. Messages flow through an XSUB/XPUB proxy for PUB/SUB delivery. Request-reply uses temporary PUB/SUB topics with reply queues, identical to InProcessTransport.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
serializer
|
Serializer
|
Serializer for message encode/decode. |
required |
pub_addr
|
str
|
Address of the proxy XSUB frontend (PUB connects here). |
'tcp://127.0.0.1:5559'
|
sub_addr
|
str
|
Address of the proxy XPUB backend (SUB connects here). |
'tcp://127.0.0.1:5560'
|
start_proxy
|
bool
|
If True, start a ZMQProxy in this process. |
False
|
Source code in civitas/transport/zmq.py
start()
async
¶
Initialize sockets and connect to the proxy.
Source code in civitas/transport/zmq.py
wait_ready()
async
¶
Wait for ZMQ connections and subscriptions to stabilize.
Call after all subscribe() calls are done. Mitigates the ZMQ 'slow joiner' problem where PUB/SUB needs time for the connection handshake and subscription propagation through the proxy.
Source code in civitas/transport/zmq.py
stop()
async
¶
Close sockets, stop proxy, clean up.
Source code in civitas/transport/zmq.py
subscribe(address, handler)
async
¶
Subscribe to messages arriving at this address.
Source code in civitas/transport/zmq.py
publish(address, data)
async
¶
Send a message to an address via PUB/SUB through the proxy.
Same-process reply queues are checked first (short-circuit for local request-reply without going through the proxy).
Source code in civitas/transport/zmq.py
request(address, data, timeout)
async
¶
Send a request and await a reply over PUB/SUB.
Creates a temporary reply topic, subscribes to it, injects reply_to into the message, publishes the request, and awaits the reply.
Source code in civitas/transport/zmq.py
civitas.transport.nats.NATSTransport(serializer, servers='nats://localhost:4222', jetstream=False, stream_name='AGENCY', create_stream_if_missing=True, tls_config=None)
¶
Transport for distributed deployments using NATS.
Implements the five-method Transport protocol. Messages flow through a NATS server. Request-reply uses temporary subscriptions with reply addresses, consistent with InProcess and ZMQ transports.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
serializer
|
Serializer
|
Serializer for message encode/decode. |
required |
servers
|
str | list[str]
|
NATS server URL(s) to connect to. |
'nats://localhost:4222'
|
jetstream
|
bool
|
If True, use JetStream durable subscriptions. |
False
|
stream_name
|
str
|
JetStream stream name (only used if jetstream=True). |
'AGENCY'
|
Source code in civitas/transport/nats.py
start()
async
¶
Connect to the NATS server.
Source code in civitas/transport/nats.py
wait_ready()
async
¶
stop()
async
¶
Disconnect from NATS, clean up subscriptions.
Source code in civitas/transport/nats.py
subscribe(address, handler)
async
¶
Subscribe to messages arriving at this address.
Source code in civitas/transport/nats.py
publish(address, data)
async
¶
Publish a message to an address (fire-and-forget).
Checks local reply queues first (same-process request-reply short-circuit), then publishes via NATS.
Source code in civitas/transport/nats.py
request(address, data, timeout)
async
¶
Send a request and await a reply.
Creates a temporary reply address, subscribes to it, injects reply_to into the message, publishes the request, and awaits the reply. Mirrors the InProcess/ZMQ pattern for consistency.
Source code in civitas/transport/nats.py
Worker¶
civitas.worker.Worker(agents, transport='zmq', zmq_pub_addr='tcp://127.0.0.1:5559', zmq_sub_addr='tcp://127.0.0.1:5560', nats_servers='nats://localhost:4222', nats_jetstream=False, serializer=None, model_provider=None, tool_registry=None, state_store=None, max_restarts=3, components=None)
¶
Hosts agents in a worker process, connecting to an existing broker.
Supports ZMQ (connect to proxy) and NATS (connect to server) transports.
The Worker provides: - Transport connectivity (ZMQ or NATS) - Local registry and message bus - Heartbeat auto-response (handled by AgentProcess._message_loop) - Restart command handling (_agency.restart messages)
Source code in civitas/worker.py
start()
async
¶
Start the worker: connect to proxy, wire agents, start loops.
Source code in civitas/worker.py
stop()
async
¶
Stop all agents and disconnect from the proxy.