Skip to content

Transport

The pluggable message delivery layer. Swap transports by changing one line in your topology YAML — agent code is unchanged.

See Transports for a full guide with architecture diagrams.


Protocol

civitas.transport.Transport

Bases: Protocol

Protocol that all transports implement.

Five methods. A new transport plugin implements these five methods and the entire Civitas runtime works on it.

start() async

Initialize connections, bind sockets.

stop() async

Gracefully close connections, flush pending messages.

subscribe(address, handler) async

Register a handler for messages arriving at this address.

publish(address, data) async

Send a message to an address (fire-and-forget).

request(address, data, timeout) async

Send a message and await a reply (request-reply).

wait_ready() async

Wait for connections and subscriptions to stabilize. No-op by default.

Transports with slow-joiner problems (e.g. ZMQ PUB/SUB) should override this to sleep or poll until messages will be reliably delivered. Callers invoke this after all subscribe() calls are done but before publishing.

has_reply_address(address)

Return True if address is an active ephemeral reply endpoint.

Ephemeral reply addresses are created by transport.request() and are not registered agents. The bus uses this to route reply messages without going through the Registry.


Implementations

civitas.transport.inprocess.InProcessTransport(serializer, mailbox_size=1000)

Transport for single-process deployments.

Messages are delivered by putting serialized bytes into the recipient's asyncio.Queue. Despite being in-process, messages are still serialized through the configured Serializer to ensure transport-swap compatibility.

Source code in civitas/transport/inprocess.py
def __init__(self, serializer: Serializer, mailbox_size: int = 1000) -> None:
    self._serializer = serializer
    self._mailbox_size = mailbox_size
    self._handlers: dict[str, Callable[[bytes], Awaitable[None]]] = {}
    self._reply_queues: dict[str, asyncio.Queue[bytes]] = {}
    self._started = False

wait_ready() async

No-op for in-process transport — always ready after start().

Source code in civitas/transport/inprocess.py
async def wait_ready(self) -> None:
    """No-op for in-process transport — always ready after start()."""

request(address, data, timeout) async

Send a request and await a reply.

Creates a temporary reply address, injects reply_to into the message, publishes the request, and awaits the reply with a timeout.

Source code in civitas/transport/inprocess.py
async def request(self, address: str, data: bytes, timeout: float) -> bytes:
    """Send a request and await a reply.

    Creates a temporary reply address, injects reply_to into the message,
    publishes the request, and awaits the reply with a timeout.
    """
    reply_address = f"_reply.{_uuid7()}"
    reply_queue: asyncio.Queue[bytes] = asyncio.Queue(maxsize=1)
    self._reply_queues[reply_address] = reply_queue

    try:
        # Deserialize to inject reply_to, then re-serialize
        message = self._serializer.deserialize(data)
        message.reply_to = reply_address
        data = self._serializer.serialize(message)

        # Publish the request
        handler = self._handlers.get(address)
        if handler is None:
            raise RuntimeError(f"No handler registered for address: {address}")
        await handler(data)

        # Await the reply
        async with asyncio.timeout(timeout):
            reply_data = await reply_queue.get()
        return reply_data
    finally:
        self._reply_queues.pop(reply_address, None)

has_reply_address(address)

Return True if address is an active ephemeral reply queue.

Source code in civitas/transport/inprocess.py
def has_reply_address(self, address: str) -> bool:
    """Return True if address is an active ephemeral reply queue."""
    return address in self._reply_queues

civitas.transport.zmq.ZMQTransport(serializer, pub_addr='tcp://127.0.0.1:5559', sub_addr='tcp://127.0.0.1:5560', start_proxy=False, curve_config=None)

Transport for multi-process deployments using ZeroMQ.

Implements the five-method Transport protocol. Messages flow through an XSUB/XPUB proxy for PUB/SUB delivery. Request-reply uses temporary PUB/SUB topics with reply queues, identical to InProcessTransport.

Parameters:

Name Type Description Default
serializer Serializer

Serializer for message encode/decode.

required
pub_addr str

Address of the proxy XSUB frontend (PUB connects here).

'tcp://127.0.0.1:5559'
sub_addr str

Address of the proxy XPUB backend (SUB connects here).

'tcp://127.0.0.1:5560'
start_proxy bool

If True, start a ZMQProxy in this process.

False
Source code in civitas/transport/zmq.py
def __init__(
    self,
    serializer: Serializer,
    pub_addr: str = "tcp://127.0.0.1:5559",
    sub_addr: str = "tcp://127.0.0.1:5560",
    start_proxy: bool = False,
    curve_config: ZmqCurveConfig | None = None,
) -> None:
    self._serializer = serializer
    self._pub_addr = pub_addr
    self._sub_addr = sub_addr
    self._start_proxy = start_proxy
    self._curve_config = curve_config

    self._context: zmq.asyncio.Context | None = None
    self._pub: zmq.asyncio.Socket | None = None
    self._sub: zmq.asyncio.Socket | None = None
    self._proxy: ZMQProxy | None = None

    self._handlers: dict[str, Callable[[bytes], Awaitable[None]]] = {}
    self._reply_queues: dict[str, asyncio.Queue[bytes]] = {}
    self._receiver_task: asyncio.Task[None] | None = None
    self._started = False

start() async

Initialize sockets and connect to the proxy.

Source code in civitas/transport/zmq.py
async def start(self) -> None:
    """Initialize sockets and connect to the proxy."""
    if self._started:
        return

    if self._start_proxy:
        self._proxy = ZMQProxy(
            frontend=self._pub_addr,
            backend=self._sub_addr,
            curve_config=self._curve_config,
        )
        # Run blocking proxy start in a thread executor to avoid blocking
        # the event loop during the ready-wait.
        loop = asyncio.get_running_loop()
        await loop.run_in_executor(None, self._proxy.start)

    self._context = zmq.asyncio.Context()

    # PUB connects to proxy XSUB frontend
    self._pub = self._context.socket(zmq.PUB)
    # SUB connects to proxy XPUB backend
    self._sub = self._context.socket(zmq.SUB)

    if self._curve_config is not None and self._curve_config.enabled:
        cfg = self._curve_config
        for sock in (self._pub, self._sub):
            sock.curve_serverkey = cfg.server_public_key.encode()
            sock.curve_secretkey = cfg.client_secret_key.encode()
            sock.curve_publickey = cfg.client_public_key.encode()

    self._pub.connect(self._pub_addr)
    self._sub.connect(self._sub_addr)

    # Start background receiver
    self._receiver_task = asyncio.create_task(self._receiver_loop())

    self._started = True

wait_ready() async

Wait for ZMQ connections and subscriptions to stabilize.

Call after all subscribe() calls are done. Mitigates the ZMQ 'slow joiner' problem where PUB/SUB needs time for the connection handshake and subscription propagation through the proxy.

Source code in civitas/transport/zmq.py
async def wait_ready(self) -> None:
    """Wait for ZMQ connections and subscriptions to stabilize.

    Call after all subscribe() calls are done. Mitigates the ZMQ
    'slow joiner' problem where PUB/SUB needs time for the connection
    handshake and subscription propagation through the proxy.
    """
    await asyncio.sleep(0.3)

stop() async

Close sockets, stop proxy, clean up.

Source code in civitas/transport/zmq.py
async def stop(self) -> None:
    """Close sockets, stop proxy, clean up."""
    self._started = False

    if self._receiver_task is not None:
        self._receiver_task.cancel()
        try:
            await self._receiver_task
        except asyncio.CancelledError:
            pass

    if self._pub is not None:
        self._pub.close(linger=0)
    if self._sub is not None:
        self._sub.close(linger=0)
    if self._context is not None:
        self._context.term()

    if self._proxy is not None:
        self._proxy.stop()

    self._handlers.clear()
    self._reply_queues.clear()

subscribe(address, handler) async

Subscribe to messages arriving at this address.

Source code in civitas/transport/zmq.py
async def subscribe(self, address: str, handler: Callable[[bytes], Awaitable[None]]) -> None:
    """Subscribe to messages arriving at this address."""
    if self._sub is None:
        raise RuntimeError("ZMQTransport not started")
    self._handlers[address] = handler
    self._sub.subscribe(address.encode() + _TOPIC_SEP)

publish(address, data) async

Send a message to an address via PUB/SUB through the proxy.

Same-process reply queues are checked first (short-circuit for local request-reply without going through the proxy).

Source code in civitas/transport/zmq.py
async def publish(self, address: str, data: bytes) -> None:
    """Send a message to an address via PUB/SUB through the proxy.

    Same-process reply queues are checked first (short-circuit for
    local request-reply without going through the proxy).
    """
    # Short-circuit for local reply queues
    if address in self._reply_queues:
        await self._reply_queues[address].put(data)
        return

    if self._pub is None:
        raise RuntimeError("ZMQTransport not started")
    topic = address.encode() + _TOPIC_SEP
    await self._pub.send_multipart([topic, data])

request(address, data, timeout) async

Send a request and await a reply over PUB/SUB.

Creates a temporary reply topic, subscribes to it, injects reply_to into the message, publishes the request, and awaits the reply.

Source code in civitas/transport/zmq.py
async def request(self, address: str, data: bytes, timeout: float) -> bytes:
    """Send a request and await a reply over PUB/SUB.

    Creates a temporary reply topic, subscribes to it, injects reply_to
    into the message, publishes the request, and awaits the reply.
    """
    reply_address = f"_reply.{_uuid7()}"
    reply_queue: asyncio.Queue[bytes] = asyncio.Queue(maxsize=1)

    # Subscribe to the reply topic
    if self._sub is None:
        raise RuntimeError("ZMQTransport not started")
    self._sub.subscribe(reply_address.encode() + _TOPIC_SEP)
    self._reply_queues[reply_address] = reply_queue

    try:
        # Inject reply_to and re-serialize
        message = self._serializer.deserialize(data)
        message.reply_to = reply_address
        data = self._serializer.serialize(message)

        # Publish the request
        if self._pub is None:
            raise RuntimeError("ZMQTransport not started")
        topic = address.encode() + _TOPIC_SEP
        await self._pub.send_multipart([topic, data])

        # Await the reply
        async with asyncio.timeout(timeout):
            reply_data = await reply_queue.get()
        return reply_data
    finally:
        self._reply_queues.pop(reply_address, None)
        if self._sub is not None:
            self._sub.unsubscribe(reply_address.encode() + _TOPIC_SEP)

has_reply_address(address)

Return True if address is an active ephemeral reply queue.

Source code in civitas/transport/zmq.py
def has_reply_address(self, address: str) -> bool:
    """Return True if address is an active ephemeral reply queue."""
    return address in self._reply_queues

civitas.transport.nats.NATSTransport(serializer, servers='nats://localhost:4222', jetstream=False, stream_name='AGENCY', create_stream_if_missing=True, tls_config=None)

Transport for distributed deployments using NATS.

Implements the five-method Transport protocol. Messages flow through a NATS server. Request-reply uses temporary subscriptions with reply addresses, consistent with InProcess and ZMQ transports.

Parameters:

Name Type Description Default
serializer Serializer

Serializer for message encode/decode.

required
servers str | list[str]

NATS server URL(s) to connect to.

'nats://localhost:4222'
jetstream bool

If True, use JetStream durable subscriptions.

False
stream_name str

JetStream stream name (only used if jetstream=True).

'AGENCY'
Source code in civitas/transport/nats.py
def __init__(
    self,
    serializer: Serializer,
    servers: str | list[str] = "nats://localhost:4222",
    jetstream: bool = False,
    stream_name: str = "AGENCY",
    create_stream_if_missing: bool = True,
    tls_config: NatsTlsConfig | None = None,
) -> None:
    self._serializer = serializer
    self._servers = servers if isinstance(servers, list) else [servers]
    self._use_jetstream = jetstream
    self._stream_name = stream_name
    self._create_stream_if_missing = create_stream_if_missing
    self._tls_config = tls_config

    self._nc: NATSClient | None = None
    self._js: nats.js.JetStreamContext | None = None
    self._handlers: dict[str, Callable[[bytes], Awaitable[None]]] = {}
    self._subscriptions: dict[str, Any] = {}  # Subscription or PushSubscription
    self._reply_queues: dict[str, asyncio.Queue[bytes]] = {}
    self._started = False

start() async

Connect to the NATS server.

Source code in civitas/transport/nats.py
async def start(self) -> None:
    """Connect to the NATS server."""
    if self._started:
        return

    async def _on_disconnected() -> None:
        logger.warning("[NATSTransport] disconnected from server")

    async def _on_reconnected() -> None:
        logger.info("[NATSTransport] reconnected to server")

    async def _on_error(exc: Exception) -> None:
        logger.error("[NATSTransport] error: %s", exc)

    connect_kwargs: dict[str, Any] = {
        "servers": self._servers,
        "disconnected_cb": _on_disconnected,
        "reconnected_cb": _on_reconnected,
        "error_cb": _on_error,
    }

    if self._tls_config is not None and self._tls_config.enabled:
        connect_kwargs["tls"] = self._tls_config.build_ssl_context()

    if self._tls_config is not None and self._tls_config.nkey_seed:
        try:
            import nkeys
        except ImportError as exc:
            raise ImportError(
                "NATS nkeys auth requires 'nkeys'. "
                "Install it with: pip install 'civitas[nkeys]'"
            ) from exc
        connect_kwargs["nkeys_seed"] = nkeys.from_seed(self._tls_config.nkey_seed.encode())

    self._nc = await nats.connect(**connect_kwargs)

    if self._use_jetstream:
        self._js = self._nc.jetstream()
        try:
            await self._js.find_stream_name_by_subject(f"{_SUBJECT_PREFIX}>")
        except nats.js.errors.NotFoundError as exc:
            if not self._create_stream_if_missing:
                raise RuntimeError(
                    f"JetStream stream '{self._stream_name}' not found and "
                    f"create_stream_if_missing=False"
                ) from exc
            logger.warning(
                "[NATSTransport] auto-creating JetStream stream '%s' — "
                "set create_stream_if_missing=False in production",
                self._stream_name,
            )
            await self._js.add_stream(
                name=self._stream_name,
                subjects=[f"{_SUBJECT_PREFIX}>"],
            )

    self._started = True

wait_ready() async

No-op for NATS — connection is established synchronously in start().

Source code in civitas/transport/nats.py
async def wait_ready(self) -> None:
    """No-op for NATS — connection is established synchronously in start()."""

stop() async

Disconnect from NATS, clean up subscriptions.

Source code in civitas/transport/nats.py
async def stop(self) -> None:
    """Disconnect from NATS, clean up subscriptions."""
    self._started = False

    for sub in self._subscriptions.values():
        try:
            await sub.unsubscribe()
        except Exception:  # noqa: BLE001 — best-effort cleanup during shutdown
            continue
    self._subscriptions.clear()

    if self._nc is not None and self._nc.is_connected:
        await self._nc.drain()
        self._nc = None

    self._handlers.clear()
    self._reply_queues.clear()

subscribe(address, handler) async

Subscribe to messages arriving at this address.

Source code in civitas/transport/nats.py
async def subscribe(self, address: str, handler: Callable[[bytes], Awaitable[None]]) -> None:
    """Subscribe to messages arriving at this address."""
    if self._nc is None:
        raise RuntimeError("NATSTransport not started")
    self._handlers[address] = handler
    subject = self._to_subject(address)

    async def _on_msg(msg: Msg) -> None:
        # Reply queues take priority (for request-reply responses)
        if address in self._reply_queues:
            await self._reply_queues[address].put(msg.data)
            return
        h = self._handlers.get(address)
        if h is not None:
            await h(msg.data)

    sub: Any
    if self._use_jetstream and self._js is not None:
        sub = await self._js.subscribe(
            subject,
            durable=address.replace(".", "_").replace("-", "_"),
            cb=_on_msg,
        )
    else:
        sub = await self._nc.subscribe(subject, cb=_on_msg)

    self._subscriptions[address] = sub

publish(address, data) async

Publish a message to an address (fire-and-forget).

Checks local reply queues first (same-process request-reply short-circuit), then publishes via NATS.

Source code in civitas/transport/nats.py
async def publish(self, address: str, data: bytes) -> None:
    """Publish a message to an address (fire-and-forget).

    Checks local reply queues first (same-process request-reply
    short-circuit), then publishes via NATS.
    """
    # Short-circuit for local reply queues
    if address in self._reply_queues:
        await self._reply_queues[address].put(data)
        return

    if self._nc is None:
        raise RuntimeError("NATSTransport not started")
    subject = self._to_subject(address)
    await self._nc.publish(subject, data)

request(address, data, timeout) async

Send a request and await a reply.

Creates a temporary reply address, subscribes to it, injects reply_to into the message, publishes the request, and awaits the reply. Mirrors the InProcess/ZMQ pattern for consistency.

Source code in civitas/transport/nats.py
async def request(self, address: str, data: bytes, timeout: float) -> bytes:
    """Send a request and await a reply.

    Creates a temporary reply address, subscribes to it, injects reply_to
    into the message, publishes the request, and awaits the reply.
    Mirrors the InProcess/ZMQ pattern for consistency.
    """
    if self._nc is None:
        raise RuntimeError("NATSTransport not started")

    reply_address = f"_reply.{_uuid7()}"
    reply_queue: asyncio.Queue[bytes] = asyncio.Queue(maxsize=1)
    reply_subject = self._to_subject(reply_address)

    # Subscribe to the reply subject
    async def _on_reply(msg: Msg) -> None:
        if reply_address in self._reply_queues:
            await self._reply_queues[reply_address].put(msg.data)

    sub = await self._nc.subscribe(reply_subject, cb=_on_reply)
    self._reply_queues[reply_address] = reply_queue

    try:
        # Inject reply_to and re-serialize
        message = self._serializer.deserialize(data)
        message.reply_to = reply_address
        data = self._serializer.serialize(message)

        # Publish the request
        subject = self._to_subject(address)
        await self._nc.publish(subject, data)
        await self._nc.flush()

        # Await the reply
        async with asyncio.timeout(timeout):
            reply_data = await reply_queue.get()
        return reply_data
    finally:
        self._reply_queues.pop(reply_address, None)
        await sub.unsubscribe()

has_reply_address(address)

Return True if address is an active ephemeral reply queue.

Source code in civitas/transport/nats.py
def has_reply_address(self, address: str) -> bool:
    """Return True if address is an active ephemeral reply queue."""
    return address in self._reply_queues

Worker

civitas.worker.Worker(agents, transport='zmq', zmq_pub_addr='tcp://127.0.0.1:5559', zmq_sub_addr='tcp://127.0.0.1:5560', nats_servers='nats://localhost:4222', nats_jetstream=False, serializer=None, model_provider=None, tool_registry=None, state_store=None, max_restarts=3, components=None)

Hosts agents in a worker process, connecting to an existing broker.

Supports ZMQ (connect to proxy) and NATS (connect to server) transports.

The Worker provides: - Transport connectivity (ZMQ or NATS) - Local registry and message bus - Heartbeat auto-response (handled by AgentProcess._message_loop) - Restart command handling (_agency.restart messages)

Source code in civitas/worker.py
def __init__(
    self,
    agents: list[AgentProcess],
    transport: str = "zmq",
    zmq_pub_addr: str = "tcp://127.0.0.1:5559",
    zmq_sub_addr: str = "tcp://127.0.0.1:5560",
    nats_servers: str | list[str] = "nats://localhost:4222",
    nats_jetstream: bool = False,
    serializer: Serializer | None = None,
    model_provider: Any = None,
    tool_registry: Any = None,
    state_store: Any = None,
    max_restarts: int = 3,
    components: ComponentSet | None = None,
) -> None:
    self._transport_type = transport
    self._zmq_pub_addr = zmq_pub_addr
    self._zmq_sub_addr = zmq_sub_addr
    self._nats_servers = nats_servers
    self._nats_jetstream = nats_jetstream
    self._custom_serializer = serializer
    self._model_provider = model_provider
    self._tool_registry = tool_registry
    self._state_store = state_store
    self._max_restarts = max_restarts
    self._components = components

    # O(1) agent lookup by name (F02-8)
    self._agents: dict[str, AgentProcess] = {a.name: a for a in agents}
    self._restart_counts: dict[str, int] = {a.name: 0 for a in agents}

    # Set during start()
    self._serializer: Serializer | None = None
    self._transport: Any = None
    self._registry: Any = None
    self._bus: Any = None
    self._started = False
    self._stop_event = asyncio.Event()

start() async

Start the worker: connect to proxy, wire agents, start loops.

Source code in civitas/worker.py
async def start(self) -> None:
    """Start the worker: connect to proxy, wire agents, start loops."""
    # Workers never use in-process transport — they connect to an existing broker
    if self._components is None and self._transport_type not in ("zmq", "nats"):
        raise ConfigurationError(
            f"Unknown transport: {self._transport_type!r}. Expected 'zmq' or 'nats'."
        )

    # Build or use provided ComponentSet
    if self._components is not None:
        cs = self._components
    else:
        cs = build_component_set(
            transport_type=self._transport_type,
            serializer=self._custom_serializer,
            model_provider=self._model_provider,
            tool_registry=self._tool_registry,
            state_store=self._state_store,
            zmq_pub_addr=self._zmq_pub_addr,
            zmq_sub_addr=self._zmq_sub_addr,
            zmq_start_proxy=False,  # Workers connect to an existing proxy
            nats_servers=self._nats_servers,
            nats_jetstream=self._nats_jetstream,
        )

    # Expose on self for _on_restart_command and stop()
    self._serializer = cs.serializer
    self._transport = cs.transport
    self._registry = cs.registry
    self._bus = cs.bus

    # Start transport first — must be running before setup_agent (F02-16)
    await self._transport.start()

    # Wait for ZMQ subscriptions to propagate (slow joiner).
    # Also lets the worker's PUB socket establish its connection to the proxy
    # before we publish registration announcements below.
    if hasattr(self._transport, "wait_ready"):
        await self._transport.wait_ready()

    # Wire and subscribe agents
    for agent in self._agents.values():
        cs.inject(agent)
        self._registry.register(agent.name)
        await self._bus.setup_agent(agent)

    # Subscribe to restart commands and register in registry so bus.route() can find it (F03-2)
    await self._transport.subscribe("_agency.worker.restart", self._on_restart_command)
    self._registry.register("_agency.worker.restart")

    # Start agent message loops
    for agent in self._agents.values():
        await agent._start()

    # Announce agents (and the restart handler) to the runtime's registry for
    # cross-process routing. Published AFTER wait_ready so the PUB socket
    # connection is stable. The brief sleep gives the runtime's receiver loop
    # time to process the announcements before worker.start() returns.
    announce_names = list(self._agents) + ["_agency.worker.restart"]
    for name in announce_names:
        maybe_agent: AgentProcess | None = self._agents.get(name)
        payload: dict[str, Any] = {"name": name}
        if maybe_agent is not None:
            payload["capabilities"] = list(maybe_agent.capabilities)
            payload["capability_metadata"] = dict(maybe_agent.capability_metadata)
        await self._transport.publish(
            "_agency.register",
            self._serializer.serialize(Message(type="_agency.register", payload=payload)),
        )
    await asyncio.sleep(0.1)

    self._started = True

stop() async

Stop all agents and disconnect from the proxy.

Source code in civitas/worker.py
async def stop(self) -> None:
    """Stop all agents and disconnect from the proxy."""
    if not self._started:
        return

    for agent in reversed(list(self._agents.values())):
        await agent._stop()

    # Deregister agents from the runtime's registry before disconnecting
    if self._serializer is not None and self._transport is not None:
        for name in self._agents:
            try:
                await self._transport.publish(
                    "_agency.deregister",
                    self._serializer.serialize(
                        Message(type="_agency.deregister", payload={"name": name})
                    ),
                )
            except Exception:  # noqa: BLE001 — best-effort during shutdown
                pass

    if self._transport is not None:
        await self._transport.stop()

    self._started = False
    self._stop_event.set()