Skip to content

Plugins

Structural protocols — implement the right methods and any class qualifies. No base class, no registration decorator.

See Plugins for a full authoring guide.


ModelProvider

civitas.plugins.model.ModelProvider

Bases: Protocol

Protocol for LLM invocation abstraction.

chat(model, messages, tools=None) async

Send a chat completion request to the model.


civitas.plugins.model.ModelResponse(content, model, tokens_in, tokens_out, cost_usd=None, tool_calls=None) dataclass

Response from an LLM call.

cost_usd = None class-attribute instance-attribute

Computed cost in USD, or None if the model's pricing is not known.


civitas.plugins.model.ToolCall(id, name, input) dataclass

A single tool call requested by the model.

Normalized across providers: Anthropic tool_use blocks and LiteLLM tool_calls entries both map to this shape.


Implementations

civitas.plugins.anthropic.AnthropicProvider(api_key=None, default_model='claude-sonnet-4-6', max_tokens=4096, max_retries=3)

ModelProvider implementation backed by the Anthropic SDK.

Requires pip install civitas[anthropic].

Source code in civitas/plugins/anthropic.py
def __init__(
    self,
    api_key: str | None = None,
    default_model: str = "claude-sonnet-4-6",
    max_tokens: int = 4096,
    max_retries: int = 3,
) -> None:
    if not _HAS_ANTHROPIC:
        raise ImportError(
            "The 'anthropic' package is required. "
            "Install it with: pip install civitas[anthropic]"
        )
    # max_retries uses the SDK's built-in retry with exponential backoff,
    # handling RateLimitError (429) and OverloadedError (529) automatically.
    self._client = anthropic.AsyncAnthropic(api_key=api_key, max_retries=max_retries)
    self._default_model = default_model
    self._max_tokens = max_tokens

chat(model, messages, tools=None) async

Send a chat request to the Anthropic API.

Source code in civitas/plugins/anthropic.py
async def chat(
    self,
    model: str,
    messages: list[dict[str, Any]],
    tools: list[Any] | None = None,
) -> ModelResponse:
    """Send a chat request to the Anthropic API."""
    resolved_model = model or self._default_model

    kwargs: dict[str, Any] = {
        "model": resolved_model,
        "max_tokens": self._max_tokens,
        "messages": messages,
    }
    if tools:
        kwargs["tools"] = tools

    response = await self._client.messages.create(**kwargs)

    content = ""
    tool_calls: list[ToolCall] = []
    for block in response.content:
        if hasattr(block, "text"):
            content += block.text
        elif block.type == "tool_use":
            tool_calls.append(ToolCall(id=block.id, name=block.name, input=block.input))

    return ModelResponse(
        content=content,
        model=response.model,
        tokens_in=response.usage.input_tokens,
        tokens_out=response.usage.output_tokens,
        cost_usd=_compute_cost(
            response.model, response.usage.input_tokens, response.usage.output_tokens
        ),
        tool_calls=tool_calls or None,
    )


ToolProvider & ToolRegistry

civitas.plugins.tools.ToolProvider

Bases: Protocol

Protocol for external tool/API invocation with schema.

name property

Human-readable tool name used for lookup.

schema property

JSON Schema describing the tool's input parameters.

execute(**kwargs) async

Invoke the tool with the given keyword arguments.


civitas.plugins.tools.ToolRegistry()

Holds registered tools, injected into AgentProcess as self.tools.

Source code in civitas/plugins/tools.py
def __init__(self) -> None:
    self._tools: dict[str, ToolProvider] = {}

register(tool)

Register a tool by its name.

Raises ValueError on duplicate name — silent overwrite would cause the wrong implementation to be called since names are used by the model.

Source code in civitas/plugins/tools.py
def register(self, tool: ToolProvider) -> None:
    """Register a tool by its name.

    Raises ``ValueError`` on duplicate name — silent overwrite would cause
    the wrong implementation to be called since names are used by the model.
    """
    if tool.name in self._tools:
        raise ValueError(
            f"Tool already registered: {tool.name!r}. Deregister the existing tool first."
        )
    self._tools[tool.name] = tool

get(name)

Look up a tool by name.

Source code in civitas/plugins/tools.py
def get(self, name: str) -> ToolProvider | None:
    """Look up a tool by name."""
    return self._tools.get(name)

StateStore

civitas.plugins.state.StateStore

Bases: Protocol

Protocol for agent state persistence.

get(agent_name) async

Retrieve the persisted state for an agent, or None if absent.

set(agent_name, state) async

Persist state for an agent.

delete(agent_name) async

Remove persisted state for an agent.

list_agents() async

Return all agent names with persisted state.

close() async

Release resources held by the store (connections, file handles).


civitas.plugins.state.InMemoryStateStore()

Default state store — in-memory, no persistence. State is lost on restart.

Source code in civitas/plugins/state.py
def __init__(self) -> None:
    self._data: dict[str, dict[str, Any]] = {}

get(agent_name) async

Retrieve the in-memory state for an agent.

Source code in civitas/plugins/state.py
async def get(self, agent_name: str) -> dict[str, Any] | None:
    """Retrieve the in-memory state for an agent."""
    return self._data.get(agent_name)

set(agent_name, state) async

Store state for an agent in memory (shallow copy to prevent aliasing).

Without copying, subsequent mutations to the caller's dict would also mutate the stored checkpoint, making restore-from-checkpoint a no-op.

Source code in civitas/plugins/state.py
async def set(self, agent_name: str, state: dict[str, Any]) -> None:
    """Store state for an agent in memory (shallow copy to prevent aliasing).

    Without copying, subsequent mutations to the caller's dict would also
    mutate the stored checkpoint, making restore-from-checkpoint a no-op.
    """
    self._data[agent_name] = dict(state)

delete(agent_name) async

Remove state for an agent from memory.

Source code in civitas/plugins/state.py
async def delete(self, agent_name: str) -> None:
    """Remove state for an agent from memory."""
    self._data.pop(agent_name, None)

list_agents() async

Return all agent names with persisted state, sorted.

Source code in civitas/plugins/state.py
async def list_agents(self) -> list[str]:
    """Return all agent names with persisted state, sorted."""
    return sorted(self._data.keys())

close() async

No-op — in-memory store holds no external resources.

Source code in civitas/plugins/state.py
async def close(self) -> None:
    """No-op — in-memory store holds no external resources."""

civitas.plugins.sqlite_store.SQLiteStateStore(db_path='agency_state.db')

SQLite-backed StateStore implementing the StateStore protocol.

State is scoped per-agent. Stateless agents (those that never call checkpoint()) incur zero overhead — no rows are written.

All I/O runs in a thread executor so SQLite operations do not block the asyncio event loop. close() is the authoritative cleanup path and is called by Runtime.stop(). __del__ is a safety net only.

Source code in civitas/plugins/sqlite_store.py
def __init__(self, db_path: str = "agency_state.db") -> None:
    self._db_path = db_path
    self._conn = sqlite3.connect(db_path, check_same_thread=False)
    self._conn.execute(
        "CREATE TABLE IF NOT EXISTS agent_state "
        "(agent_name TEXT PRIMARY KEY, state TEXT NOT NULL)"
    )
    self._conn.commit()

get(agent_name) async

Load agent state from SQLite (non-blocking).

Source code in civitas/plugins/sqlite_store.py
async def get(self, agent_name: str) -> dict[str, Any] | None:
    """Load agent state from SQLite (non-blocking)."""
    loop = asyncio.get_running_loop()
    return await loop.run_in_executor(None, self._sync_get, agent_name)

set(agent_name, state) async

Save agent state to SQLite (upsert, non-blocking).

Source code in civitas/plugins/sqlite_store.py
async def set(self, agent_name: str, state: dict[str, Any]) -> None:
    """Save agent state to SQLite (upsert, non-blocking)."""
    blob = json.dumps(state)
    loop = asyncio.get_running_loop()
    await loop.run_in_executor(None, self._sync_set, agent_name, blob)

delete(agent_name) async

Remove agent state from SQLite (non-blocking).

Source code in civitas/plugins/sqlite_store.py
async def delete(self, agent_name: str) -> None:
    """Remove agent state from SQLite (non-blocking)."""
    loop = asyncio.get_running_loop()
    await loop.run_in_executor(None, self._sync_delete, agent_name)

close() async

Close the database connection. Authoritative cleanup path.

Source code in civitas/plugins/sqlite_store.py
async def close(self) -> None:
    """Close the database connection. Authoritative cleanup path."""
    self._conn.close()

Plugin Loading

civitas.plugins.loader.load_plugins_from_config(config)

Load all plugins from a YAML configuration dict.

Expected YAML structure

plugins: models: - type: anthropic config: api_key: "sk-..." default_model: "claude-sonnet-4-20250514" exporters: - type: console - type: otel config: endpoint: "http://localhost:4317" state: type: in_memory

Returns a dict with keys: model_providers, exporters, state_store.

Source code in civitas/plugins/loader.py
def load_plugins_from_config(config: dict[str, Any]) -> dict[str, Any]:
    """Load all plugins from a YAML configuration dict.

    Expected YAML structure:
        plugins:
          models:
            - type: anthropic
              config:
                api_key: "sk-..."
                default_model: "claude-sonnet-4-20250514"
          exporters:
            - type: console
            - type: otel
              config:
                endpoint: "http://localhost:4317"
          state:
            type: in_memory

    Returns a dict with keys: model_providers, exporters, state_store.
    """
    plugins_cfg = config.get("plugins", {})
    result: dict[str, Any] = {
        "model_providers": [],
        "exporters": [],
        "state_store": None,
    }

    # Models
    for model_cfg in plugins_cfg.get("models", []):
        name = model_cfg.get("type")
        if not name:
            raise PluginError(
                "model", "<missing>", "Plugin config entry is missing a 'type' field."
            )
        plugin_config = model_cfg.get("config", {})
        provider = load_plugin("model", name, plugin_config)
        result["model_providers"].append(provider)

    # Exporters
    for exp_cfg in plugins_cfg.get("exporters", []):
        name = exp_cfg.get("type")
        if not name:
            raise PluginError(
                "exporter", "<missing>", "Plugin config entry is missing a 'type' field."
            )
        plugin_config = exp_cfg.get("config", {})
        exporter = load_plugin("exporter", name, plugin_config)
        result["exporters"].append(exporter)

    # State store
    state_cfg = plugins_cfg.get("state")
    if state_cfg is not None:
        name = state_cfg.get("type", "in_memory")
        plugin_config = state_cfg.get("config", {})
        result["state_store"] = load_plugin("state", name, plugin_config)

    return result