Source code for open_atp.backends.base

"""Compute backend abstraction.

A :class:`ComputeBackend` is the single load-bearing primitive of the platform:
"run a command over a working directory inside a Lean+Mathlib sandbox, and give me
back the output." It is used in two distinct roles:

* **Agentic execution** -- run a coding-agent launch script (AgentProver, NuminaProver).
* **Verification** -- run ``lake env lean ...`` over candidate files (every prover).

This generalises milp_flare's agent-specific ``Runner``/``AgentRun`` into a plain
command runner. Concrete backends (Docker, Modal) are ports of milp_flare's
``runner/docker.py`` and ``runner/modal.py``.
"""

from __future__ import annotations

import abc
from collections.abc import Iterator, Mapping, Sequence
from contextlib import AbstractContextManager
from dataclasses import dataclass, field, fields
from pathlib import Path
from typing import Any, Self

from open_atp.images import DEFAULT_IMAGE, Image


[docs] @dataclass class CommandResult: """Outcome of a finished command run inside a backend.""" exit_code: int stdout: str stderr: str duration_s: float
[docs] @dataclass class CommandHandle(AbstractContextManager["CommandHandle"]): """A live, streaming command. Concrete backends subclass and fill the hooks. Streaming matters for agents (we want incremental stdout for cost/progress parsing) but is equally usable for a blocking ``lake`` invocation via :meth:`wait`. """
[docs] def stream(self) -> Iterator[str]: """Yield stdout lines as they arrive.""" raise NotImplementedError
[docs] def cancel(self) -> None: """Best-effort termination (e.g. ``docker kill`` / sandbox terminate).""" raise NotImplementedError
[docs] def wait(self) -> CommandResult: """Drain to completion and return the final result.""" raise NotImplementedError
def __exit__(self, *exc: object) -> None: self.cancel()
[docs] class ComputeSession(AbstractContextManager["ComputeSession"]): """A persistent sandbox over a workdir, exec'd many times, torn down once. The one-shot :meth:`ComputeBackend.start`/:meth:`ComputeBackend.run` pair creates a sandbox, runs a *single* command, and tears it down. A session keeps the sandbox alive so several commands can run against the same hot filesystem -- the agent's generation *then* the verifier's compile -- without paying a second spin-up. Lifecycle invariant: teardown lives in :meth:`close` (not in the handle's ``wait``), so a session MUST be used as a context manager and :meth:`close` MUST be idempotent -- otherwise an error between exec and close leaks the sandbox. Examples -------- Open a session over a workdir and run several commands against the same hot sandbox -- here generation followed by the compile, with one spin-up (needs a live backend, so this is illustrative rather than a doctest): .. code-block:: python from open_atp.backends.docker import DockerBackend, DockerConfig from open_atp.images import DEFAULT_IMAGE backend = DockerBackend(DockerConfig(image=DEFAULT_IMAGE)) with backend.session(workdir) as session: with session.exec("lake env lean Demo.lean") as handle: result = handle.wait() session.sync_out() # pull the agent's edits back to the host # the sandbox is torn down on exit, even if exec raised """
[docs] def exec( self, command: str, *, env: Mapping[str, str] | None = None, timeout_s: int | None = None, ) -> CommandHandle: """Run ``command`` in the live sandbox; the handle does NOT tear it down.""" raise NotImplementedError
[docs] def sync_out(self) -> None: """Pull the sandbox workdir back to the host (no-op when bind-mounted).""" raise NotImplementedError
[docs] def sync_in(self) -> None: """Push the host workdir into the sandbox (no-op when bind-mounted).""" raise NotImplementedError
[docs] def close(self) -> None: """Tear the sandbox down (pull artifacts first where needed). Idempotent.""" raise NotImplementedError
def __exit__(self, *exc: object) -> None: self.close()
[docs] @dataclass class BackendConfig: """Shared backend knobs. Subclasses add their own (Docker mounts, Modal CPU…). Attributes ---------- image : Image The sandbox image carrying Lean + Mathlib -- its tag plus the toolchain and Mathlib revision the verifier checks projects against. Default :data:`~open_atp.images.DEFAULT_IMAGE`. timeout_s : int Wall-clock cap applied to a command when its call site does not pass an explicit ``timeout_s``. Default ``1800``. env : Mapping[str, str] Environment variables baked into every command run in the sandbox. Default empty. """ image: Image = DEFAULT_IMAGE timeout_s: int = 1800 env: Mapping[str, str] = field(default_factory=dict)
[docs] @classmethod def from_dict(cls, data: Mapping[str, object]) -> Self: """Build a config from a mapping (e.g. parsed JSON), ignoring unknown keys. The inverse of :func:`dataclasses.asdict`: any tuple-typed field (e.g. :attr:`~open_atp.backends.docker.DockerConfig.volumes`) is restored from the list JSON round-trips it to. Unknown keys are dropped so a serialized superset (or a sibling backend's extra knobs) loads cleanly. Subclasses inherit this unchanged -- ``cls`` resolves to the concrete config, so its own fields are honoured. """ known = {f.name: f for f in fields(cls)} kwargs: dict[str, Any] = {} for key, value in data.items(): f = known.get(key) if f is None: continue if ( isinstance(value, list) and isinstance(f.type, str) and f.type.startswith("tuple") ): value = tuple(tuple(v) if isinstance(v, list) else v for v in value) kwargs[key] = value return cls(**kwargs)
def wrap_command(workdir_mount: str, baked_lake: str, command: str) -> str: """``cd`` into the workdir mount and wire the warm Mathlib cache before ``command``. The one image-layout convention the backends own (mirroring milp_flare's ``entrypoint.sh``): symlink the workdir's ``.lake`` to the image-baked olean cache so uploaded projects reuse it. Identical for Docker (bind mount) and Modal (pushed dir), so it lives here rather than in either backend. ``baked_lake`` empty skips the symlink. """ prep = f"cd {workdir_mount}" if baked_lake: prep += f" && {{ [ -e {baked_lake} ] && ln -sfn {baked_lake} .lake || true; }}" return f"{prep}; {command}"
[docs] class ComputeBackend(abc.ABC): """Runs commands over a workdir in a sandbox carrying Lean + Mathlib.""" #: Absolute ``$HOME`` inside the sandbox; per-run credential dirs (an agent's #: ``~/.codex``, say) are mounted under it. Overridden per backend. container_home: str = "/root" def __init__(self, config: BackendConfig) -> None: self.config = config @property @abc.abstractmethod def name(self) -> str: """Short identifier, e.g. ``"docker"`` or ``"modal"``."""
[docs] @abc.abstractmethod def start( self, workdir: Path, command: str, *, env: Mapping[str, str] | None = None, mounts: Sequence[tuple[str, str]] | None = None, timeout_s: int | None = None, ) -> CommandHandle: """Launch ``command`` with ``workdir`` mounted/synced into the sandbox. The workdir is synced in before launch and synced back out on completion so that file mutations (completed proofs) are visible to the caller. ``mounts`` are extra ``(host_path, container_path)`` bind mounts beyond the workdir -- used to forward agent credential dirs (e.g. Codex's ``~/.codex``) per run, without baking them into the backend config. """
[docs] @abc.abstractmethod def session( self, workdir: Path, *, env: Mapping[str, str] | None = None, mounts: Sequence[tuple[str, str]] | None = None, timeout_s: int | None = None, ) -> ComputeSession: """Open a persistent sandbox over ``workdir`` for multiple :meth:`exec` calls. Unlike :meth:`start` (one command, then teardown), the returned :class:`ComputeSession` stays alive until :meth:`ComputeSession.close`, so the same hot sandbox can run generation *and* verification back to back. ``env``/``mounts`` here pin the long-lived sandbox at creation (Docker bind mounts can only be set at ``docker run``); per-command credentials go to :meth:`ComputeSession.exec`'s own ``env``. """
[docs] def run( self, workdir: Path, command: str, *, env: Mapping[str, str] | None = None, mounts: Sequence[tuple[str, str]] | None = None, timeout_s: int | None = None, ) -> CommandResult: """Convenience: :meth:`start` then block via :meth:`CommandHandle.wait`.""" with self.start( workdir, command, env=env, mounts=mounts, timeout_s=timeout_s ) as handle: return handle.wait()