Skip to content

GenServer

OTP-style stateful service process for non-AI workloads: rate limiters, caches, coordinators.

See GenServer for a full guide with examples.


civitas.genserver.GenServer(name, **kwargs)

Bases: AgentProcess

OTP-style generic server for stateful service processes.

Override handle_call, handle_cast, handle_info — not handle(). Do not override on_start(); use init() instead.

Source code in civitas/genserver.py
def __init__(self, name: str, **kwargs: Any) -> None:
    super().__init__(name, **kwargs)
    self._send_after_tasks: list[asyncio.Task[None]] = []

init() async

Called once when the process starts. Initialize self.state here.

Source code in civitas/genserver.py
async def init(self) -> None:
    """Called once when the process starts. Initialize self.state here."""

handle_call(payload, from_) async

Synchronous request. Must return a dict reply.

Source code in civitas/genserver.py
async def handle_call(self, payload: dict[str, Any], from_: str) -> dict[str, Any]:
    """Synchronous request. Must return a dict reply."""
    raise NotImplementedError(
        f"{type(self).__name__} received a call but handle_call() is not implemented"
    )

handle_cast(payload) async

Async fire-and-forget. No reply.

Source code in civitas/genserver.py
async def handle_cast(self, payload: dict[str, Any]) -> None:
    """Async fire-and-forget. No reply."""

handle_info(payload) async

Internal messages — timers, ticks, out-of-band signals.

Source code in civitas/genserver.py
async def handle_info(self, payload: dict[str, Any]) -> None:
    """Internal messages — timers, ticks, out-of-band signals."""

send_after(delay_ms, payload)

Schedule a handle_info message to self after delay_ms milliseconds.

Source code in civitas/genserver.py
def send_after(self, delay_ms: int, payload: dict[str, Any]) -> None:
    """Schedule a handle_info message to self after delay_ms milliseconds."""

    async def _fire() -> None:
        await asyncio.sleep(delay_ms / 1000)
        if self._bus is not None:
            msg = Message(
                type="genserver.info",
                sender=self.name,
                recipient=self.name,
                payload=payload,
                span_id=_new_span_id(),
            )
            try:
                await self._bus.route(msg)
            except Exception:
                pass  # process may have stopped; routing errors are expected

    # Prune completed tasks before appending to prevent unbounded growth
    self._send_after_tasks = [t for t in self._send_after_tasks if not t.done()]
    self._send_after_tasks.append(asyncio.create_task(_fire()))