Skip to content

Runtime

Assembles and manages the full Civitas runtime. Wires transport, registry, serializer, tracer, plugins, and the supervision tree.

See Deployment and Topology & CLI for usage.


civitas.runtime.Runtime(supervisor=None, transport='in_process', serializer=None, model_provider=None, tool_registry=None, state_store=None, zmq_pub_addr='tcp://127.0.0.1:5559', zmq_sub_addr='tcp://127.0.0.1:5560', zmq_start_proxy=True, nats_servers='nats://localhost:4222', nats_jetstream=False, nats_stream_name='AGENCY', components=None)

Assembles and manages the full Civitas runtime.

Startup sequence (from Implementation Guide §3): 1. Read configuration 2. Create Serializer 3. Create Tracer 4. Create Transport 5. Create Registry 6. Create MessageBus 7. Create plugin instances 8. Instantiate / wire all AgentProcesses 9. Register all AgentProcesses in Registry 10. Start Transport 11. Walk supervision tree bottom-up, start each agent 12. Start all Supervisors 13. Runtime is ready

Source code in civitas/runtime.py
def __init__(
    self,
    supervisor: Supervisor | None = None,
    transport: str = "in_process",
    serializer: Serializer | None = None,
    model_provider: Any = None,
    tool_registry: Any = None,
    state_store: Any = None,
    zmq_pub_addr: str = "tcp://127.0.0.1:5559",
    zmq_sub_addr: str = "tcp://127.0.0.1:5560",
    zmq_start_proxy: bool = True,
    nats_servers: str | list[str] = "nats://localhost:4222",
    nats_jetstream: bool = False,
    nats_stream_name: str = "AGENCY",
    components: ComponentSet | None = None,
) -> None:
    self._root_supervisor = supervisor
    self._transport_type = transport
    self._custom_serializer = serializer
    self._model_provider = model_provider
    self._tool_registry = tool_registry
    self._state_store = state_store
    self._components = components

    # ZMQ-specific config
    self._zmq_pub_addr = zmq_pub_addr
    self._zmq_sub_addr = zmq_sub_addr
    self._zmq_start_proxy = zmq_start_proxy

    # NATS-specific config
    self._nats_servers = nats_servers
    self._nats_jetstream = nats_jetstream
    self._nats_stream_name = nats_stream_name

    # MCP server configs parsed from topology YAML
    self._mcp_configs: list[Any] = []

    # Security config — populated by from_config() when a security: block is present
    self._security_config: Any = None
    self._topology_public_keys: dict[str, str] = {}

    # Per-agent credentials — populated by from_config() from credentials: blocks
    self._agent_credentials: dict[str, dict[str, str]] = {}

    # Per-agent capabilities — populated by from_config() from capabilities: blocks
    self._agent_capabilities: dict[str, tuple[list[str], dict[str, Any]]] = {}

    # Audit sink — populated by from_config() when an audit: block is present
    self._audit_sink: Any = None

    # Transport security (ZMQ CURVE / NATS TLS) — populated by from_config()
    self._transport_security: Any = None

    # Set during start() — exposed for stop(), ask()/send(), and get_agent()
    self._serializer: Serializer | None = None
    self._tracer: Any = None
    self._transport: Any = None
    self._registry: Any = None
    self._bus: Any = None
    self._agents_by_name: dict[str, AgentProcess] = {}  # F04-10: O(1) live process lookup
    self._started = False

from_config(path, agent_classes=None) classmethod

Build a Runtime from a YAML topology file.

The agent_classes dict maps type strings (e.g. "MyAgent") to the actual Python class. If not provided, types are resolved via importlib from dotted module paths (e.g. "myapp.agents.MyAgent").

Source code in civitas/runtime.py
@classmethod
def from_config(
    cls,
    path: str | Path,
    agent_classes: dict[str, type[AgentProcess]] | None = None,
) -> Runtime:
    """Build a Runtime from a YAML topology file.

    The ``agent_classes`` dict maps type strings (e.g. "MyAgent") to the
    actual Python class. If not provided, types are resolved via
    ``importlib`` from dotted module paths (e.g. "myapp.agents.MyAgent").
    """
    config = yaml.safe_load(Path(path).read_text())
    config = substitute_vars(config)
    classes = agent_classes or {}

    def _resolve_class(type_str: str) -> type[AgentProcess]:
        if type_str in classes:
            return classes[type_str]
        # Try dotted import path: "myapp.agents.MyAgent"
        module_path, _, class_name = type_str.rpartition(".")
        if not module_path:
            raise ValueError(
                f"Cannot resolve agent type '{type_str}'. "
                f"Provide it in agent_classes or use a dotted path."
            )
        try:
            module = importlib.import_module(module_path)
            return cast(type[AgentProcess], getattr(module, class_name))
        except (ImportError, AttributeError) as exc:
            raise ConfigurationError(
                f"Cannot load agent type '{type_str}': {exc}. "
                f"Check that the module is installed and the class name is correct."
            ) from exc

    def _build_exporters(cfgs: list[dict[str, Any]]) -> list[EvalExporter]:
        result: list[EvalExporter] = []
        for cfg in cfgs:
            kind = cfg.get("type", "")
            if kind == "arize":
                result.append(
                    ArizeExporter(
                        endpoint=cfg.get("endpoint", "http://localhost:6006/v1/traces"),
                        service_name=cfg.get("service_name", "civitas"),
                    )
                )
            elif kind == "langfuse":
                result.append(
                    LangfuseExporter(
                        public_key=cfg["public_key"],
                        secret_key=cfg["secret_key"],
                        host=cfg.get("host", "https://cloud.langfuse.com"),
                    )
                )
            elif kind == "braintrust":
                result.append(
                    BraintrustExporter(
                        api_key=cfg["api_key"],
                        project=cfg.get("project", "civitas"),
                    )
                )
            elif kind == "langsmith":
                result.append(
                    LangSmithExporter(
                        api_key=cfg["api_key"],
                        project=cfg.get("project", "civitas"),
                    )
                )
            elif kind == "fiddler":
                result.append(
                    FiddlerExporter(
                        url=cfg["url"],
                        token=cfg["token"],
                        org_id=cfg["org_id"],
                        project_id=cfg["project_id"],
                        model_id=cfg["model_id"],
                    )
                )
            else:
                logger.warning("Unknown eval exporter type '%s' — skipping", kind)
        return result

    def _build_node(node: dict[str, Any]) -> AgentProcess | Supervisor:
        if "supervisor" in node:
            sup_cfg = node["supervisor"]
            children = [_build_node(c) for c in sup_cfg.get("children", [])]
            return Supervisor(
                name=sup_cfg["name"],
                children=children,
                strategy=sup_cfg.get("strategy", "ONE_FOR_ONE").upper(),
                max_restarts=sup_cfg.get("max_restarts", 3),
                restart_window=sup_cfg.get("restart_window", 60.0),
                backoff=sup_cfg.get("backoff", "CONSTANT").upper(),
                backoff_base=sup_cfg.get("backoff_base", 1.0),
                backoff_max=sup_cfg.get("backoff_max", 60.0),
            )
        elif node.get("type") == "topology_server":
            cfg = node.get("config", {})
            return TopologyServer(
                name=node.get("name", "topology_server"),
                host=cfg.get("host", "127.0.0.1"),
                port=cfg.get("port", 6789),
            )
        elif node.get("type") == "dynamic_supervisor" and "name" in node:
            return DynamicSupervisor(
                name=node["name"],
                max_children=node.get("max_children"),
                max_total_spawns=node.get("max_total_spawns"),
                restart=node.get("restart", "transient"),
                max_restarts=node.get("max_restarts", 3),
                restart_window=node.get("restart_window", 60.0),
            )
        elif node.get("type") == "eval_agent" and "name" in node:
            return EvalAgent(
                name=node["name"],
                max_corrections_per_window=node.get("max_corrections_per_window", 10),
                window_seconds=node.get("window_seconds", 60.0),
                exporters=_build_exporters(node.get("exporters", [])),
            )
        elif node.get("type") == "http_gateway" and "name" in node:
            cfg_dict = node.get("config", {})
            gw_config = GatewayConfig(
                host=cfg_dict.get("host", "0.0.0.0"),
                port=cfg_dict.get("port", 8080),
                port_quic=cfg_dict.get("port_quic"),
                tls_cert=cfg_dict.get("tls_cert"),
                tls_key=cfg_dict.get("tls_key"),
                request_timeout=cfg_dict.get("request_timeout", 30.0),
                enable_http3=cfg_dict.get("enable_http3", False),
                routes=cfg_dict.get("routes", []),
                middleware=cfg_dict.get("middleware", []),
                docs_enabled=cfg_dict.get("docs", {}).get("enabled", True),
                docs_path=cfg_dict.get("docs", {}).get("path", "/docs"),
            )
            return HTTPGateway(name=node["name"], config=gw_config)
        elif "agent" in node:
            agent_cfg = node["agent"]
            agent_cls = _resolve_class(agent_cfg["type"])
            return agent_cls(name=agent_cfg["name"])
        elif (
            node.get("type") in ("gen_server", "agent") and "module" in node and "class" in node
        ):
            cls_path = f"{node['module']}.{node['class']}"
            agent_cls = _resolve_class(cls_path)
            return agent_cls(name=node["name"])
        elif "type" in node and "name" in node:
            # Flat dotted-path format: {type: "module.Class", name: "agent_name"}
            agent_cls = _resolve_class(node["type"])
            return agent_cls(name=node["name"])
        else:
            raise ValueError(f"Unknown node type in config: {node}")

    sup_cfg = config.get("supervision") or config.get("supervisor")
    if not sup_cfg:
        raise ConfigurationError("YAML topology must define a top-level 'supervision' key.")
    # Top-level is always a supervisor
    children = [_build_node(c) for c in sup_cfg.get("children", [])]
    root = Supervisor(
        name=sup_cfg.get("name", "root"),
        children=children,
        strategy=sup_cfg.get("strategy", "ONE_FOR_ONE").upper(),
        max_restarts=sup_cfg.get("max_restarts", 3),
        restart_window=sup_cfg.get("restart_window", 60.0),
        backoff=sup_cfg.get("backoff", "CONSTANT").upper(),
        backoff_base=sup_cfg.get("backoff_base", 1.0),
        backoff_max=sup_cfg.get("backoff_max", 60.0),
    )

    # Transport config
    transport_cfg = config.get("transport", {})
    transport_type = transport_cfg.get("type", "in_process")

    kwargs: dict[str, Any] = {"supervisor": root, "transport": transport_type}
    if transport_type == "zmq":
        if "pub_addr" in transport_cfg:
            kwargs["zmq_pub_addr"] = transport_cfg["pub_addr"]
        if "sub_addr" in transport_cfg:
            kwargs["zmq_sub_addr"] = transport_cfg["sub_addr"]
        if "start_proxy" in transport_cfg:
            kwargs["zmq_start_proxy"] = transport_cfg["start_proxy"]
    elif transport_type == "nats":
        if "servers" in transport_cfg:
            kwargs["nats_servers"] = transport_cfg["servers"]
        if "jetstream" in transport_cfg:
            kwargs["nats_jetstream"] = transport_cfg["jetstream"]
        if "stream_name" in transport_cfg:
            kwargs["nats_stream_name"] = transport_cfg["stream_name"]

    # Plugin config
    if "plugins" in config:
        loaded = load_plugins_from_config(config)
        if loaded["model_providers"]:
            if len(loaded["model_providers"]) > 1:
                logger.warning(
                    "Multiple model providers found in YAML; using the first one. "
                    "Additional providers are ignored."
                )
            kwargs["model_provider"] = loaded["model_providers"][0]
        if loaded["state_store"] is not None:
            kwargs["state_store"] = loaded["state_store"]

    runtime = cls(**kwargs)

    # MCP server config — parsed here, connected during start()
    mcp_section = config.get("mcp", {})
    if mcp_section.get("servers"):
        for srv in mcp_section["servers"]:
            sandbox = None
            if srv.get("sandbox"):
                sandbox = SandboxConfig.from_dict(srv["sandbox"])
            runtime._mcp_configs.append(
                MCPServerConfig(
                    name=srv["name"],
                    transport=srv["transport"],
                    command=srv.get("command"),
                    args=srv.get("args", []),
                    env=srv.get("env"),
                    url=srv.get("url"),
                    sandbox=sandbox,
                )
            )

    # Per-agent credentials — parsed here, applied in start()
    runtime._agent_credentials = _extract_agent_credentials(config)

    # Per-agent capabilities — parsed here, applied in start()
    runtime._agent_capabilities = _extract_agent_capabilities(config)

    # Security config — parsed here, applied in start()
    security_section = config.get("security")
    if security_section:
        runtime._security_config = SecurityConfig.from_dict(security_section)
        runtime._topology_public_keys = _extract_public_keys(config)

    # Audit sink — parsed here, threaded into ComponentSet during start()
    audit_section = config.get("audit")
    if audit_section:
        runtime._audit_sink = sink_from_config(audit_section)

    # Transport security (CURVE / TLS) — parsed from security.transport block
    if security_section:
        from civitas.security.config import TransportSecurityConfig

        transport_section = security_section.get("transport", {})
        if transport_section:
            runtime._transport_security = TransportSecurityConfig.from_dict(transport_section)

    return runtime

start() async

Start the runtime following the canonical initialization sequence.

Source code in civitas/runtime.py
async def start(self) -> None:
    """Start the runtime following the canonical initialization sequence."""
    if self._started:
        return

    # Steps 2–6: build or use provided ComponentSet.
    # Note: if a pre-built ComponentSet is provided, its transport must support
    # being started by this call — transport.start() is always called below. (F04-11)
    if self._components is not None:
        cs = self._components
    else:
        ts = self._transport_security
        cs = build_component_set(
            transport_type=self._transport_type,
            serializer=self._custom_serializer,
            model_provider=self._model_provider,
            tool_registry=self._tool_registry,
            state_store=self._state_store,
            audit_sink=self._audit_sink,
            zmq_pub_addr=self._zmq_pub_addr,
            zmq_sub_addr=self._zmq_sub_addr,
            zmq_start_proxy=self._zmq_start_proxy,
            zmq_curve_config=ts.zmq if ts is not None and ts.zmq.enabled else None,
            nats_servers=self._nats_servers,
            nats_jetstream=self._nats_jetstream,
            nats_stream_name=self._nats_stream_name,
            nats_tls_config=ts.nats if ts is not None and ts.nats.enabled else None,
        )

    # Expose on self for stop(), ask(), send(), and get_agent()
    self._serializer = cs.serializer
    self._tracer = cs.tracer
    self._transport = cs.transport
    self._registry = cs.registry
    self._bus = cs.bus
    self._state_store = cs.store

    if self._root_supervisor is None:
        self._started = True
        return

    # 8. Inject dependencies into all AgentProcesses
    all_agents = self._root_supervisor.all_agents()

    # Security: build signing infrastructure if configured for non-InProcess transports.
    # InProcess transport skips signing entirely (D9 — same OS process, no wire to protect).
    if (
        self._security_config is not None
        and self._security_config.signing.enabled
        and self._transport_type != "in_process"
    ):
        key_dir = self._security_config.identity.key_dir
        identities: dict[str, AgentIdentity] = {}
        for agent in all_agents:
            if isinstance(agent, DynamicSupervisor | TopologyServer):
                continue
            if self._security_config.identity.mode == "auto":
                identities[agent.name] = AgentIdentity.load_or_generate(agent.name, key_dir)
            else:
                identities[agent.name] = AgentIdentity.load(agent.name, key_dir)

        registry = KeyRegistry()
        for name, identity in identities.items():
            registry.register(name, identity.verify_key)
        for name, pub_b64 in self._topology_public_keys.items():
            if name not in registry:
                registry.register_b64(name, pub_b64)

        signer = MessageSigner(identities, registry, self._security_config.signing)
        signing_ser = SigningSerializer(signer, self._security_config.signing)
        self._serializer = signing_ser
        cs.bus._serializer = signing_ser

    for agent in all_agents:
        cs.inject(agent)
        # Wire per-agent credentials from topology credentials: blocks
        if self._agent_credentials:
            agent._credentials = self._agent_credentials.get(agent.name, {})

    # Inject into supervisors (supervisor-specific wiring, not via ComponentSet)
    for sup in self._root_supervisor.all_supervisors():
        sup._bus = cs.bus
        sup._registry = cs.registry
        sup._tracer = cs.tracer

    # Wire _dynamic_supervisor_name for all agents based on the static topology.
    # Each agent receives the name of the nearest DynamicSupervisor in its
    # ancestor-or-sibling subtree, enabling self.spawn() without explicit naming.
    def _wire_dyn_sup(
        node: Supervisor | AgentProcess,
        nearest_dyn: str | None,
    ) -> None:
        if isinstance(node, DynamicSupervisor):
            node._dynamic_supervisor_name = node.name  # spawns into itself
        elif isinstance(node, Supervisor):
            dyn_child = next(
                (c for c in node.children if isinstance(c, DynamicSupervisor)), None
            )
            new_nearest = dyn_child.name if dyn_child is not None else nearest_dyn
            for child in node.children:
                _wire_dyn_sup(child, new_nearest)
        else:
            node._dynamic_supervisor_name = nearest_dyn

    _wire_dyn_sup(self._root_supervisor, None)

    # 9. Register all AgentProcesses in Registry; build O(1) name→process map (F04-10)
    for agent in all_agents:
        yaml_caps = self._agent_capabilities.get(agent.name)
        if yaml_caps is not None:
            caps, meta = yaml_caps
        else:
            caps = list(agent.capabilities)
            meta = dict(agent.capability_metadata)
        self._registry.register(agent.name, capabilities=caps, capability_metadata=meta)
    self._agents_by_name = {a.name: a for a in all_agents}

    # Inject topology server references before supervision tree starts
    for agent in all_agents:
        if isinstance(agent, TopologyServer):
            agent._root_supervisor = self._root_supervisor
            agent._agents = self._agents_by_name

    # 10. Start Transport
    await self._transport.start()

    # Set up transport subscriptions for each agent
    for agent in all_agents:
        await self._bus.setup_agent(agent)

    # Subscribe to cross-process agent announcements from Worker processes.
    # Workers publish _agency.register on startup so this runtime's bus can
    # route messages to remote agents without a shared registry service.
    async def _on_remote_register(data: bytes) -> None:
        msg = cs.serializer.deserialize(data)
        name: str = msg.payload.get("name", "")
        if name:
            try:
                self._registry.register_remote(
                    name,
                    capabilities=msg.payload.get("capabilities"),
                    capability_metadata=msg.payload.get("capability_metadata"),
                )
            except ValueError:
                pass  # already registered locally — ignore

    async def _on_remote_deregister(data: bytes) -> None:
        msg = cs.serializer.deserialize(data)
        name: str = msg.payload.get("name", "")
        if name:
            entry = self._registry.lookup(name)
            if entry is not None and not entry.is_local:
                self._registry.deregister(name)

    await self._transport.subscribe("_agency.register", _on_remote_register)
    await self._transport.subscribe("_agency.deregister", _on_remote_deregister)

    # Wait for subscriptions to propagate (ZMQ slow joiner mitigation)
    if hasattr(self._transport, "wait_ready"):
        await self._transport.wait_ready()

    # Connect MCP servers declared in topology YAML to all agents
    if self._mcp_configs:
        for agent in all_agents:
            for mcp_cfg in self._mcp_configs:
                try:
                    await agent.connect_mcp(mcp_cfg)
                except Exception as exc:
                    logger.warning(
                        "Failed to connect agent '%s' to MCP server '%s': %s",
                        agent.name,
                        mcp_cfg.name,
                        exc,
                    )

    # 11-12. Start supervision tree (supervisors start their children)
    await self._root_supervisor.start()

    # 13. Runtime is ready
    self._started = True

stop() async

Shutdown sequence: stop agents, transport, flush tracer.

Source code in civitas/runtime.py
async def stop(self) -> None:
    """Shutdown sequence: stop agents, transport, flush tracer."""
    if not self._started:
        return

    # 1. Stop supervision tree (sends shutdown, awaits on_stop for each agent)
    if self._root_supervisor is not None:
        await self._root_supervisor.stop()

    # 2. Stop Transport
    if self._transport is not None:
        await self._transport.stop()

    # 3. Close StateStore
    if self._state_store is not None and hasattr(self._state_store, "close"):
        await self._state_store.close()

    # 4. Flush Tracer
    if self._tracer is not None:
        self._tracer.flush()

    # 5. Close Audit sink
    if self._audit_sink is not None:
        await self._audit_sink.close()

    self._agents_by_name.clear()
    self._started = False

ask(agent_name, payload, timeout=30.0, message_type='message') async

Send a message to an agent and await a reply.

Source code in civitas/runtime.py
async def ask(
    self,
    agent_name: str,
    payload: dict[str, Any],
    timeout: float = 30.0,
    message_type: str = "message",
) -> Message:
    """Send a message to an agent and await a reply."""
    if self._bus is None or self._tracer is None:
        raise RuntimeError("Runtime not started")

    trace_id = self._tracer.new_trace_id()
    message = Message(
        type=message_type,
        sender="_runtime",
        recipient=agent_name,
        payload=payload,
        correlation_id=_uuid7(),
        trace_id=trace_id,
        span_id=_new_span_id(),
    )
    return cast(Message, await self._bus.request(message, timeout=timeout))

send(agent_name, payload, message_type='message') async

Fire-and-forget: send a message to an agent.

Source code in civitas/runtime.py
async def send(
    self,
    agent_name: str,
    payload: dict[str, Any],
    message_type: str = "message",
) -> None:
    """Fire-and-forget: send a message to an agent."""
    if self._bus is None or self._tracer is None:
        raise RuntimeError("Runtime not started")

    trace_id = self._tracer.new_trace_id()
    message = Message(
        type=message_type,
        sender="_runtime",
        recipient=agent_name,
        payload=payload,
        trace_id=trace_id,
        span_id=_new_span_id(),
    )
    await self._bus.route(message)

get_agent(name)

Return the live AgentProcess instance by name, or None.

O(1) lookup via the agents-by-name dict built during start(). Use this when you need to inspect process state (e.g. status). For routing messages use runtime.send/ask instead.

Source code in civitas/runtime.py
def get_agent(self, name: str) -> AgentProcess | None:
    """Return the live AgentProcess instance by name, or None.

    O(1) lookup via the agents-by-name dict built during start().
    Use this when you need to inspect process state (e.g. status).
    For routing messages use runtime.send/ask instead.
    """
    return self._agents_by_name.get(name)

all_agents()

Return all AgentProcess instances in the supervision tree.

Source code in civitas/runtime.py
def all_agents(self) -> list[AgentProcess]:
    """Return all AgentProcess instances in the supervision tree."""
    if self._root_supervisor is None:
        return []
    return self._root_supervisor.all_agents()

print_tree()

Return an ASCII representation of the supervision tree.

Source code in civitas/runtime.py
def print_tree(self) -> str:
    """Return an ASCII representation of the supervision tree."""
    if self._root_supervisor is None:
        return "(no supervision tree)"

    lines: list[str] = []

    def _walk(node: Supervisor | AgentProcess, prefix: str, is_last: bool) -> None:
        connector = "└── " if is_last else "├── "
        if isinstance(node, Supervisor):
            label = f"[sup] {node.name} ({node.strategy.value})"
        else:
            status = node.status.value if hasattr(node, "status") else "?"
            if isinstance(node, DynamicSupervisor):
                prefix_tag = "[dyn]"
            elif isinstance(node, TopologyServer):
                prefix_tag = "[topo]"
            elif isinstance(node, EvalAgent):
                prefix_tag = "[eval]"
            elif isinstance(node, HTTPGateway):
                prefix_tag = "[http]"
            elif isinstance(node, GenServer):
                prefix_tag = "[srv]"
            else:
                prefix_tag = "[agent]"
            label = f"{prefix_tag} {node.name} ({status})"
        lines.append(f"{prefix}{connector}{label}")

        if isinstance(node, Supervisor):
            child_prefix = prefix + ("    " if is_last else "│   ")
            for i, child in enumerate(node.children):
                _walk(child, child_prefix, i == len(node.children) - 1)

    # Root
    root = self._root_supervisor
    lines.append(f"[sup] {root.name} ({root.strategy.value})")
    for i, child in enumerate(root.children):
        _walk(child, "", i == len(root.children) - 1)

    return "\n".join(lines)

spawn(supervisor_name, agent_class, name, config=None) async

Spawn a dynamic agent via the named DynamicSupervisor.

Returns the agent name on success. Raises SpawnError on failure.

Source code in civitas/runtime.py
async def spawn(
    self,
    supervisor_name: str,
    agent_class: type[AgentProcess],
    name: str,
    config: dict[str, Any] | None = None,
) -> str:
    """Spawn a dynamic agent via the named DynamicSupervisor.

    Returns the agent name on success. Raises SpawnError on failure.
    """
    class_path = f"{agent_class.__module__}.{agent_class.__qualname__}"
    reply = await self.ask(
        supervisor_name,
        {"class_path": class_path, "name": name, "config": config or {}, "spawner": "_runtime"},
        message_type="civitas.dynamic.spawn",
    )
    if reply.payload.get("status") != "ok":
        raise SpawnError(reply.payload.get("reason", "spawn failed"))
    return name

despawn(supervisor_name, name) async

Hard-stop a dynamic child via the named DynamicSupervisor.

Source code in civitas/runtime.py
async def despawn(self, supervisor_name: str, name: str) -> None:
    """Hard-stop a dynamic child via the named DynamicSupervisor."""
    reply = await self.ask(
        supervisor_name,
        {"name": name},
        message_type="civitas.dynamic.despawn",
    )
    if reply.payload.get("status") != "ok":
        raise SpawnError(reply.payload.get("reason", "despawn failed"))

stop_agent(supervisor_name, name, drain='current', timeout=30.0) async

Soft-stop a dynamic child via the named DynamicSupervisor.

Source code in civitas/runtime.py
async def stop_agent(
    self,
    supervisor_name: str,
    name: str,
    drain: str = "current",
    timeout: float = 30.0,
) -> None:
    """Soft-stop a dynamic child via the named DynamicSupervisor."""
    reply = await self.ask(
        supervisor_name,
        {"name": name, "drain": drain, "timeout": timeout},
        message_type="civitas.dynamic.stop",
        timeout=timeout + 5.0,
    )
    if reply.payload.get("status") != "ok":
        raise SpawnError(reply.payload.get("reason", "stop failed"))

civitas.components.ComponentSet(transport, registry, serializer, tracer, store=None, model_provider=None, tool_registry=None, audit_sink=None) dataclass

Assembled infrastructure wiring for a single Runtime or Worker.

MessageBus is derived automatically from the other four fields in post_init — callers should not construct it separately.

Attributes:

Name Type Description
transport Any

Transport layer (InProcess, ZMQ, or NATS).

registry Registry

LocalRegistry for agent name → address mapping.

serializer Serializer

Serializer used by transport and bus.

tracer Tracer

Tracer instance for span emission.

store Any

StateStore for agent checkpoint/restore. None means no persistence; callers should default to InMemoryStateStore when appropriate.

model_provider Any

Injected into agent.llm at startup.

tool_registry Any

Injected into agent.tools at startup.

bus MessageBus

MessageBus built from the other four fields.

inject(agent)

Inject bus and plugin references into an agent process.

Source code in civitas/components.py
def inject(self, agent: AgentProcess) -> None:
    """Inject bus and plugin references into an agent process."""
    agent._bus = self.bus
    agent._tracer = self.tracer
    agent._registry = self.registry
    agent.llm = self.model_provider
    agent.tools = self.tool_registry
    agent.store = self.store
    agent._audit_sink = self.audit_sink

civitas.components.build_component_set(transport_type='in_process', serializer=None, model_provider=None, tool_registry=None, state_store=None, audit_sink=None, zmq_pub_addr='tcp://127.0.0.1:5559', zmq_sub_addr='tcp://127.0.0.1:5560', zmq_start_proxy=True, zmq_curve_config=None, nats_servers='nats://localhost:4222', nats_jetstream=False, nats_stream_name='AGENCY', nats_tls_config=None)

Build a ComponentSet from primitive configuration values.

Called by Runtime.start() and Worker.start() when no pre-built ComponentSet is provided. Handles serializer selection, transport construction, and store defaulting.

Source code in civitas/components.py
def build_component_set(
    transport_type: str = "in_process",
    serializer: Serializer | None = None,
    model_provider: Any = None,
    tool_registry: Any = None,
    state_store: Any = None,
    audit_sink: AuditSink | None = None,
    zmq_pub_addr: str = "tcp://127.0.0.1:5559",
    zmq_sub_addr: str = "tcp://127.0.0.1:5560",
    zmq_start_proxy: bool = True,
    zmq_curve_config: ZmqCurveConfig | None = None,
    nats_servers: str | list[str] = "nats://localhost:4222",
    nats_jetstream: bool = False,
    nats_stream_name: str = "AGENCY",
    nats_tls_config: NatsTlsConfig | None = None,
) -> ComponentSet:
    """Build a ComponentSet from primitive configuration values.

    Called by Runtime.start() and Worker.start() when no pre-built
    ComponentSet is provided. Handles serializer selection, transport
    construction, and store defaulting.
    """
    # Serializer
    if serializer is not None:
        built_serializer = serializer
    elif settings.serializer == "json":
        built_serializer = JsonSerializer()
    else:
        built_serializer = MsgpackSerializer()

    # Tracer
    built_tracer = Tracer()

    # Transport — imports are intentionally scoped here: ZMQ and NATS are optional
    # extras (pyzmq, nats-py) that may not be installed. Importing at module level
    # would cause ImportError on every civitas import for users without those extras.
    built_transport: Transport
    if transport_type == "zmq":
        from civitas.transport.zmq import ZMQTransport

        built_transport = ZMQTransport(
            built_serializer,
            pub_addr=zmq_pub_addr,
            sub_addr=zmq_sub_addr,
            start_proxy=zmq_start_proxy,
            curve_config=zmq_curve_config,
        )
    elif transport_type == "nats":
        from civitas.transport.nats import NATSTransport

        built_transport = NATSTransport(
            built_serializer,
            servers=nats_servers,
            jetstream=nats_jetstream,
            stream_name=nats_stream_name,
            tls_config=nats_tls_config,
        )
    else:
        from civitas.transport.inprocess import InProcessTransport

        built_transport = InProcessTransport(built_serializer)

    # Registry
    built_registry = LocalRegistry()

    # State store default
    if state_store is None:
        state_store = InMemoryStateStore()

    return ComponentSet(
        transport=built_transport,
        registry=built_registry,
        serializer=built_serializer,
        tracer=built_tracer,
        store=state_store,
        model_provider=model_provider,
        tool_registry=tool_registry,
        audit_sink=audit_sink,
    )