AgentProcess¶
Base class for all agents. Subclass it and implement handle().
See Core Concepts and Getting Started for usage examples.
civitas.process.AgentProcess(name, mailbox_size=1000, max_retries=3, shutdown_timeout=30.0)
¶
Base class for all agent processes in Civitas.
Developers subclass this and override lifecycle hooks: - on_start(): called once before the first message - handle(message): called for every incoming message - on_error(error, message): called when handle() raises - on_stop(): called on graceful shutdown (always — even on crash)
Messaging methods available inside hooks: - send(recipient, payload, message_type): fire-and-forget - ask(recipient, payload, message_type, timeout): request-reply - send_capable(capability, payload, message_type): route to any capable agent - broadcast(pattern, payload): send to all matching agents - reply(payload): return from handle() for request-reply
Observability helpers (call from inside handle()): - llm_span(model, attrs): context manager for LLM call spans - tool_span(tool_name, attrs): context manager for tool call spans
Capability declaration (class-level, inherited and overridable): capabilities: list[str] = ["text.summarize", "text.translate"] capability_metadata: dict[str, Any] = { "text.summarize": {"description": "...", "version": "1"} }
Source code in civitas/process.py
on_start()
async
¶
handle(message)
async
¶
Called for every incoming message.
Return self.reply(...) for request-reply. Return None for fire-and-forget.
on_error(error, message)
async
¶
Called when handle() raises an exception.
Return an ErrorAction. Default: ESCALATE (crash, let supervisor decide).
on_stop()
async
¶
on_child_terminated(name, reason)
async
¶
Called when a dynamically spawned child is permanently removed.
reason is one of: "restarts_exhausted", "despawned", "clean_exit". Default implementation logs a warning. Override to re-spawn, alert, etc.
Source code in civitas/process.py
send(recipient, payload, message_type='message')
async
¶
Fire-and-forget: send a message to another agent by name.
Source code in civitas/process.py
ask(recipient, payload, message_type='message', timeout=30.0)
async
¶
Request-reply: send a message and await a response.
Source code in civitas/process.py
broadcast(pattern, payload)
async
¶
Send a message to all agents matching a glob pattern.
Source code in civitas/process.py
reply(payload)
¶
Create a reply message. Return this from handle() for request-reply.
Source code in civitas/process.py
checkpoint()
async
¶
Save self.state to the configured StateStore.
Call this from handle() after completing a meaningful unit of work. On restart, self.state is automatically restored from the last checkpoint. Agents that never call checkpoint() incur zero overhead.
Source code in civitas/process.py
spawn(agent_class, name, config=None)
async
¶
Spawn a dynamic agent via the nearest ancestor DynamicSupervisor.
Sends a civitas.dynamic.spawn message and awaits confirmation. Raises SpawnError if no DynamicSupervisor ancestor exists or spawn is denied. Returns the agent name on success.
Source code in civitas/process.py
despawn(name)
async
¶
Hard-stop a dynamic child immediately.
Cancels the agent's task. on_stop() still fires. Pending ask() callers into the agent receive SpawnError. The slot is freed immediately.
Source code in civitas/process.py
stop(name, drain='current', timeout=30.0)
async
¶
Soft-stop a dynamic child. Awaitable — returns when fully stopped.
drain="current" — finishes the message currently being handled, then stops. drain="all" — drains the full mailbox, then stops. timeout — fallback hard stop if drain isn't complete in time.
Source code in civitas/process.py
civitas.process.ProcessStatus
¶
Bases: Enum
Lifecycle states for an AgentProcess.
civitas.process.Mailbox(maxsize=1000)
¶
Bounded async queue for incoming messages with priority support.
High-priority system messages (priority > 0) are placed at the front. Normal messages follow FIFO order. Backpressure is applied when the mailbox is full — the sender awaits until space is available.
Source code in civitas/process.py
put(message)
async
¶
Enqueue a message. Priority messages bypass the normal queue.
Source code in civitas/process.py
get()
async
¶
Dequeue the next message. Priority messages are served first.