Skip to content

AgentProcess

Base class for all agents. Subclass it and implement handle().

See Core Concepts and Getting Started for usage examples.


civitas.process.AgentProcess(name, mailbox_size=1000, max_retries=3, shutdown_timeout=30.0)

Base class for all agent processes in Civitas.

Developers subclass this and override lifecycle hooks: - on_start(): called once before the first message - handle(message): called for every incoming message - on_error(error, message): called when handle() raises - on_stop(): called on graceful shutdown (always — even on crash)

Messaging methods available inside hooks: - send(recipient, payload, message_type): fire-and-forget - ask(recipient, payload, message_type, timeout): request-reply - send_capable(capability, payload, message_type): route to any capable agent - broadcast(pattern, payload): send to all matching agents - reply(payload): return from handle() for request-reply

Observability helpers (call from inside handle()): - llm_span(model, attrs): context manager for LLM call spans - tool_span(tool_name, attrs): context manager for tool call spans

Capability declaration (class-level, inherited and overridable): capabilities: list[str] = ["text.summarize", "text.translate"] capability_metadata: dict[str, Any] = { "text.summarize": {"description": "...", "version": "1"} }

Source code in civitas/process.py
def __init__(
    self,
    name: str,
    mailbox_size: int = 1000,
    max_retries: int = 3,
    shutdown_timeout: float = 30.0,
) -> None:
    self.name = name
    self.id: str = _uuid7()
    self.state: dict[str, Any] = {}
    self._status = ProcessStatus.INITIALIZING
    self._mailbox = Mailbox(maxsize=mailbox_size)
    self._task: asyncio.Task[None] | None = None
    self._max_retries = max_retries
    self._shutdown_timeout = shutdown_timeout

    # Injected by Runtime/Worker during setup
    self._bus: MessageBus | None = None
    self._tracer: Tracer | None = None
    self._registry: Registry | None = None
    self.llm: ModelProvider | None = None
    self.tools: ToolRegistry | None = None
    self.store: StateStore | None = None

    # Per-agent credential map — populated from topology credentials: block.
    # Keys are provider names (e.g. "anthropic"); values are credential strings.
    self._credentials: dict[str, str] = {}

    # Audit sink — injected by ComponentSet, None when auditing is disabled.
    self._audit_sink: AuditSink | None = None

    # Set by Runtime to the nearest DynamicSupervisor ancestor name (if any)
    self._dynamic_supervisor_name: str | None = None

    # MCP clients opened via connect_mcp() — keyed by server name
    self._mcp_clients: dict[str, Any] = {}

    # Current message context for reply/tracing
    self._current_message: Message | None = None
    self._current_handle_span: Span | None = None

    # Signalled when the message loop enters RUNNING
    self._running_event: asyncio.Event | None = None

on_start() async

Called once before the first message. Initialize self.state here.

Source code in civitas/process.py
async def on_start(self) -> None:
    """Called once before the first message. Initialize self.state here."""

handle(message) async

Called for every incoming message.

Return self.reply(...) for request-reply. Return None for fire-and-forget.

Source code in civitas/process.py
async def handle(self, message: Message) -> Message | None:
    """Called for every incoming message.

    Return self.reply(...) for request-reply. Return None for fire-and-forget.
    """
    return None

on_error(error, message) async

Called when handle() raises an exception.

Return an ErrorAction. Default: ESCALATE (crash, let supervisor decide).

Source code in civitas/process.py
async def on_error(self, error: Exception, message: Message) -> ErrorAction:
    """Called when handle() raises an exception.

    Return an ErrorAction. Default: ESCALATE (crash, let supervisor decide).
    """
    return ErrorAction.ESCALATE

on_stop() async

Called on graceful shutdown. Always called — even on crash.

Source code in civitas/process.py
async def on_stop(self) -> None:
    """Called on graceful shutdown. Always called — even on crash."""

on_child_terminated(name, reason) async

Called when a dynamically spawned child is permanently removed.

reason is one of: "restarts_exhausted", "despawned", "clean_exit". Default implementation logs a warning. Override to re-spawn, alert, etc.

Source code in civitas/process.py
async def on_child_terminated(self, name: str, reason: str) -> None:
    """Called when a dynamically spawned child is permanently removed.

    reason is one of: "restarts_exhausted", "despawned", "clean_exit".
    Default implementation logs a warning. Override to re-spawn, alert, etc.
    """
    logger.warning("[%s] dynamic child '%s' terminated: %s", self.name, name, reason)

send(recipient, payload, message_type='message') async

Fire-and-forget: send a message to another agent by name.

Source code in civitas/process.py
async def send(
    self,
    recipient: str,
    payload: dict[str, Any],
    message_type: str = "message",
) -> None:
    """Fire-and-forget: send a message to another agent by name."""
    if self._bus is None:
        raise RuntimeError("AgentProcess not wired to a MessageBus")
    trace_id = ""
    parent_span_id: str | None = None
    if self._current_message is not None:
        trace_id = self._current_message.trace_id
        parent_span_id = self._current_message.span_id

    message = Message(
        type=message_type,
        sender=self.name,
        recipient=recipient,
        payload=payload,
        trace_id=trace_id,
        span_id=_new_span_id(),
        parent_span_id=parent_span_id,
    )
    await self._bus.route(message)

ask(recipient, payload, message_type='message', timeout=30.0) async

Request-reply: send a message and await a response.

Source code in civitas/process.py
async def ask(
    self,
    recipient: str,
    payload: dict[str, Any],
    message_type: str = "message",
    timeout: float = 30.0,
) -> Message:
    """Request-reply: send a message and await a response."""
    if self._bus is None:
        raise RuntimeError("AgentProcess not wired to a MessageBus")
    trace_id = ""
    parent_span_id: str | None = None
    if self._current_message is not None:
        trace_id = self._current_message.trace_id
        parent_span_id = self._current_message.span_id

    correlation_id = _uuid7()
    message = Message(
        type=message_type,
        sender=self.name,
        recipient=recipient,
        payload=payload,
        correlation_id=correlation_id,
        trace_id=trace_id,
        span_id=_new_span_id(),
        parent_span_id=parent_span_id,
    )
    return await self._bus.request(message, timeout=timeout)

broadcast(pattern, payload) async

Send a message to all agents matching a glob pattern.

Source code in civitas/process.py
async def broadcast(self, pattern: str, payload: dict[str, Any]) -> None:
    """Send a message to all agents matching a glob pattern."""
    if self._bus is None:
        raise RuntimeError("AgentProcess not wired to a MessageBus")
    targets = self._bus.lookup_all(pattern)
    for target in targets:
        await self.send(target.name, payload)

reply(payload)

Create a reply message. Return this from handle() for request-reply.

Source code in civitas/process.py
def reply(self, payload: dict[str, Any]) -> Message:
    """Create a reply message. Return this from handle() for request-reply."""
    if self._current_message is None:
        raise RuntimeError("reply() called outside of handle()")
    msg = self._current_message
    return Message(
        type=payload.get("type", "reply"),
        sender=self.name,
        recipient=msg.reply_to or msg.sender,
        payload=payload,
        correlation_id=msg.correlation_id,
        trace_id=msg.trace_id,
        span_id=_new_span_id(),
        parent_span_id=msg.span_id,
    )

checkpoint() async

Save self.state to the configured StateStore.

Call this from handle() after completing a meaningful unit of work. On restart, self.state is automatically restored from the last checkpoint. Agents that never call checkpoint() incur zero overhead.

Source code in civitas/process.py
async def checkpoint(self) -> None:
    """Save self.state to the configured StateStore.

    Call this from handle() after completing a meaningful unit of work.
    On restart, self.state is automatically restored from the last checkpoint.
    Agents that never call checkpoint() incur zero overhead.
    """
    if self.store is not None:
        await self.store.set(self.name, self.state)

spawn(agent_class, name, config=None) async

Spawn a dynamic agent via the nearest ancestor DynamicSupervisor.

Sends a civitas.dynamic.spawn message and awaits confirmation. Raises SpawnError if no DynamicSupervisor ancestor exists or spawn is denied. Returns the agent name on success.

Source code in civitas/process.py
async def spawn(
    self,
    agent_class: type,
    name: str,
    config: dict[str, Any] | None = None,
) -> str:
    """Spawn a dynamic agent via the nearest ancestor DynamicSupervisor.

    Sends a civitas.dynamic.spawn message and awaits confirmation.
    Raises SpawnError if no DynamicSupervisor ancestor exists or spawn is denied.
    Returns the agent name on success.
    """
    if self._dynamic_supervisor_name is None:
        raise SpawnError("No DynamicSupervisor ancestor found in supervision tree")
    class_path = f"{agent_class.__module__}.{agent_class.__qualname__}"
    reply = await self.ask(
        self._dynamic_supervisor_name,
        {"class_path": class_path, "name": name, "config": config or {}, "spawner": self.name},
        message_type="civitas.dynamic.spawn",
    )
    if reply.payload.get("status") != "ok":
        raise SpawnError(reply.payload.get("reason", "spawn failed"))
    return name

despawn(name) async

Hard-stop a dynamic child immediately.

Cancels the agent's task. on_stop() still fires. Pending ask() callers into the agent receive SpawnError. The slot is freed immediately.

Source code in civitas/process.py
async def despawn(self, name: str) -> None:
    """Hard-stop a dynamic child immediately.

    Cancels the agent's task. on_stop() still fires. Pending ask() callers
    into the agent receive SpawnError. The slot is freed immediately.
    """
    if self._dynamic_supervisor_name is None:
        raise SpawnError("No DynamicSupervisor ancestor found in supervision tree")
    reply = await self.ask(
        self._dynamic_supervisor_name,
        {"name": name},
        message_type="civitas.dynamic.despawn",
    )
    if reply.payload.get("status") != "ok":
        raise SpawnError(reply.payload.get("reason", "despawn failed"))

stop(name, drain='current', timeout=30.0) async

Soft-stop a dynamic child. Awaitable — returns when fully stopped.

drain="current" — finishes the message currently being handled, then stops. drain="all" — drains the full mailbox, then stops. timeout — fallback hard stop if drain isn't complete in time.

Source code in civitas/process.py
async def stop(
    self,
    name: str,
    drain: str = "current",
    timeout: float = 30.0,
) -> None:
    """Soft-stop a dynamic child. Awaitable — returns when fully stopped.

    drain="current" — finishes the message currently being handled, then stops.
    drain="all"     — drains the full mailbox, then stops.
    timeout         — fallback hard stop if drain isn't complete in time.
    """
    if self._dynamic_supervisor_name is None:
        raise SpawnError("No DynamicSupervisor ancestor found in supervision tree")
    reply = await self.ask(
        self._dynamic_supervisor_name,
        {"name": name, "drain": drain, "timeout": timeout},
        message_type="civitas.dynamic.stop",
        timeout=timeout + 5.0,
    )
    if reply.payload.get("status") != "ok":
        raise SpawnError(reply.payload.get("reason", "stop failed"))

civitas.process.ProcessStatus

Bases: Enum

Lifecycle states for an AgentProcess.


civitas.process.Mailbox(maxsize=1000)

Bounded async queue for incoming messages with priority support.

High-priority system messages (priority > 0) are placed at the front. Normal messages follow FIFO order. Backpressure is applied when the mailbox is full — the sender awaits until space is available.

Source code in civitas/process.py
def __init__(self, maxsize: int = 1000) -> None:
    self._queue: asyncio.Queue[Message] = asyncio.Queue(maxsize=maxsize)
    self._priority_queue: asyncio.Queue[Message] = asyncio.Queue(maxsize=100)
    self._notify: asyncio.Event = asyncio.Event()

put(message) async

Enqueue a message. Priority messages bypass the normal queue.

Source code in civitas/process.py
async def put(self, message: Message) -> None:
    """Enqueue a message. Priority messages bypass the normal queue."""
    if message.priority > 0:
        await self._priority_queue.put(message)
        self._notify.set()
    else:
        await self._queue.put(message)
        self._notify.set()

get() async

Dequeue the next message. Priority messages are served first.

Source code in civitas/process.py
async def get(self) -> Message:
    """Dequeue the next message. Priority messages are served first."""
    while True:
        # Check priority queue first
        if not self._priority_queue.empty():
            return self._priority_queue.get_nowait()
        # Then normal queue
        if not self._queue.empty():
            return self._queue.get_nowait()
        # Wait for a notification
        self._notify.clear()
        # Double-check after clearing (avoid race)
        if not self._priority_queue.empty() or not self._queue.empty():
            continue
        await self._notify.wait()

empty()

Return True if both priority and normal queues are empty.

Source code in civitas/process.py
def empty(self) -> bool:
    """Return True if both priority and normal queues are empty."""
    return self._priority_queue.empty() and self._queue.empty()