Skip to content

Supervisor

Monitors child agents and supervisors. Applies restart strategies on failure.

See Supervision for a full guide.


civitas.supervisor.Supervisor(name, children=None, strategy='ONE_FOR_ONE', max_restarts=3, restart_window=60.0, backoff='CONSTANT', backoff_base=1.0, backoff_max=60.0)

Manages child processes with restart strategies.

When a child crashes, the supervisor applies the configured restart strategy. If max_restarts is exceeded within restart_window, the supervisor escalates to its parent or stops permanently.

Source code in civitas/supervisor.py
def __init__(
    self,
    name: str,
    children: list[AgentProcess | Supervisor] | None = None,
    strategy: str = "ONE_FOR_ONE",
    max_restarts: int = 3,
    restart_window: float = 60.0,
    backoff: str = "CONSTANT",
    backoff_base: float = 1.0,
    backoff_max: float = 60.0,
) -> None:
    self.name = name
    self.children: list[AgentProcess | Supervisor] = children or []
    self.strategy = RestartStrategy(strategy)
    self.max_restarts = max_restarts
    self.restart_window = restart_window
    self.backoff = BackoffPolicy(backoff)
    self.backoff_base = backoff_base
    self.backoff_max = backoff_max

    # Internal state
    self._restart_timestamps: deque[float] = deque()  # F03-10: deque for O(1) sliding window
    self._restart_counts: dict[str, int] = {}
    self._child_tasks: dict[str, asyncio.Task[None]] = {}
    self._children_by_name: dict[str, AgentProcess | Supervisor] = {  # F03-11: O(1) lookup
        c.name: c for c in self.children
    }
    self._pending_crash_tasks: set[asyncio.Task[None]] = set()  # F03-4: track handlers
    self._running = False
    self._parent: Supervisor | None = None

    # Injected by Runtime
    self._bus: MessageBus | None = None
    self._registry: Registry | None = None
    self._tracer: Tracer | None = None

    # Heartbeat monitoring for remote agents
    self._remote_children: set[str] = set()
    self._heartbeat_task: asyncio.Task[None] | None = None
    self._missed_heartbeats: dict[str, int] = {}
    self._remote_child_config: dict[str, dict[str, float | int]] = {}  # F03-3: per-child config

start() async

Start all children and begin monitoring them.

Source code in civitas/supervisor.py
async def start(self) -> None:
    """Start all children and begin monitoring them."""
    self._running = True

    # Set parent references for child supervisors
    for child in self.children:
        if isinstance(child, Supervisor):
            child._parent = self

    # Start children bottom-up (supervisors first start their children)
    for child in self.children:
        if isinstance(child, Supervisor):
            await child.start()
        else:
            await self._start_child(child)

    # Start heartbeat monitoring for remote children
    await self._start_heartbeat_monitor()

stop() async

Stop all children gracefully.

Source code in civitas/supervisor.py
async def stop(self) -> None:
    """Stop all children gracefully."""
    self._running = False

    # Cancel pending crash handlers before tearing down children (F03-4)
    for t in list(self._pending_crash_tasks):
        t.cancel()
    if self._pending_crash_tasks:
        await asyncio.gather(*self._pending_crash_tasks, return_exceptions=True)

    await self._stop_heartbeat_monitor()
    for child in reversed(self.children):
        if isinstance(child, Supervisor):
            await child.stop()
        else:
            await child._stop()

add_remote_child(name, heartbeat_interval=5.0, heartbeat_timeout=2.0, missed_heartbeats_threshold=3)

Register a remote child for heartbeat-based monitoring.

Remote children are agents running in a Worker process. They are monitored via periodic heartbeat pings instead of task callbacks.

Source code in civitas/supervisor.py
def add_remote_child(
    self,
    name: str,
    heartbeat_interval: float = 5.0,
    heartbeat_timeout: float = 2.0,
    missed_heartbeats_threshold: int = 3,
) -> None:
    """Register a remote child for heartbeat-based monitoring.

    Remote children are agents running in a Worker process. They are
    monitored via periodic heartbeat pings instead of task callbacks.
    """
    self._remote_children.add(name)
    self._missed_heartbeats[name] = 0
    # F03-3: per-child config stored in dict, not shared scalars
    self._remote_child_config[name] = {
        "interval": heartbeat_interval,
        "timeout": heartbeat_timeout,
        "threshold": missed_heartbeats_threshold,
    }

all_agents()

Recursively collect all AgentProcess instances in the tree.

Source code in civitas/supervisor.py
def all_agents(self) -> list[AgentProcess]:
    """Recursively collect all AgentProcess instances in the tree."""
    agents: list[AgentProcess] = []
    for child in self.children:
        if isinstance(child, Supervisor):
            agents.extend(child.all_agents())
        else:
            agents.append(child)
    return agents

all_supervisors()

Recursively collect all Supervisor instances (including self).

Source code in civitas/supervisor.py
def all_supervisors(self) -> list[Supervisor]:
    """Recursively collect all Supervisor instances (including self)."""
    supervisors: list[Supervisor] = [self]
    for child in self.children:
        if isinstance(child, Supervisor):
            supervisors.extend(child.all_supervisors())
    return supervisors

civitas.supervisor.RestartStrategy

Bases: Enum

Strategy used by a Supervisor when a child process crashes.


civitas.supervisor.BackoffPolicy

Bases: Enum

Delay strategy applied between successive restart attempts.


DynamicSupervisor

Starts empty. Children are added and removed at runtime via self.spawn() / self.despawn(). Always uses ONE_FOR_ONE. See Dynamic supervision for a full guide.


civitas.supervisor.DynamicSupervisor(name, max_children=None, max_total_spawns=None, restart='transient', max_restarts=3, restart_window=60.0, **kwargs)

Bases: AgentProcess

Dynamic supervisor — starts empty, children added at runtime via spawn().

Declared as a static child in topology YAML under type: dynamic_supervisor. Only its children change at runtime. Enforces ONE_FOR_ONE restart semantics — no escalation to parent on restart exhaustion; fires on_child_terminated instead.

Agents call self.spawn() / self.despawn() / self.stop() to manage children. All requests travel as bus messages (civitas.dynamic.*) so the same API works in-process (v0.4) and cross-process (v0.5).

Source code in civitas/supervisor.py
def __init__(
    self,
    name: str,
    max_children: int | None = None,
    max_total_spawns: int | None = None,
    restart: str = "transient",
    max_restarts: int = 3,
    restart_window: float = 60.0,
    **kwargs: Any,
) -> None:
    super().__init__(name, **kwargs)
    self.max_children = max_children
    self.max_total_spawns = max_total_spawns
    self._restart_mode = RestartMode(restart)
    self._ds_max_restarts = max_restarts
    self._ds_restart_window = restart_window

    # Live child tracking
    self._dynamic_children: dict[str, AgentProcess] = {}
    self._child_tasks: dict[str, asyncio.Task[None]] = {}
    self._spawner_names: dict[str, str] = {}
    self._child_restart_counts: dict[str, int] = {}
    self._child_restart_timestamps: dict[str, deque[float]] = {}
    self._total_spawns: int = 0
    self._pending_child_tasks: set[asyncio.Task[None]] = set()

on_spawn_requested(agent_class, name, config) async

Governance veto hook. Return False to deny the spawn request.

Default implementation approves all requests. Subclass to enforce allowlists, rate limits, or policy checks.

Source code in civitas/supervisor.py
async def on_spawn_requested(
    self, agent_class: type, name: str, config: dict[str, Any]
) -> bool:
    """Governance veto hook. Return False to deny the spawn request.

    Default implementation approves all requests. Subclass to enforce
    allowlists, rate limits, or policy checks.
    """
    return True

all_dynamic_agents()

Return the currently live dynamic children.

Source code in civitas/supervisor.py
def all_dynamic_agents(self) -> list[AgentProcess]:
    """Return the currently live dynamic children."""
    return list(self._dynamic_children.values())

on_stop() async

Cancel all dynamic children on shutdown.

Source code in civitas/supervisor.py
async def on_stop(self) -> None:
    """Cancel all dynamic children on shutdown."""
    for name, _agent in list(self._dynamic_children.items()):
        task = self._child_tasks.get(name)
        if task is not None and not task.done():
            task.cancel()
            try:
                await task
            except (asyncio.CancelledError, Exception):
                pass

    for t in list(self._pending_child_tasks):
        t.cancel()
    if self._pending_child_tasks:
        await asyncio.gather(*self._pending_child_tasks, return_exceptions=True)

civitas.supervisor.RestartMode

Bases: Enum

Restart policy for dynamic children.