Source code for open_atp.backends.docker

"""Local Docker backend.

Ported from milp_flare ``harness/runner/docker.py``, generalised from "launch an
agent" to "run an arbitrary command over a workdir". Mechanics:

* ``docker run --rm --name afps-<uuid>`` with the workdir bind-mounted at
  ``/workspace/wd`` (so file mutations land on the host with no copy-out step).
* A unique container name so a run can be cancelled with ``docker kill``.
* The image bakes a warm Mathlib olean cache at ``/workspace/.lake``; every command
  is wrapped to ``cd`` into the mount and symlink ``.lake`` to it, mirroring
  milp_flare's ``entrypoint.sh``. This is the one image-layout convention the backend
  owns, keeping the verifier image-agnostic.
"""

from __future__ import annotations

import subprocess
import time
import uuid
from collections.abc import Iterator, Mapping, Sequence
from dataclasses import dataclass, field
from pathlib import Path

from open_atp.backends.base import (
    CommandHandle,
    CommandResult,
    ComputeBackend,
    ComputeSession,
    wrap_command,
)
from open_atp.images import DEFAULT_IMAGE, Image


@dataclass
class DockerCommandHandle(CommandHandle):
    popen: subprocess.Popen[str]
    container: str
    started_at: float
    _stdout_lines: list[str] = field(default_factory=list)

    def stream(self) -> Iterator[str]:
        """Yield the container's stdout lines as they arrive, retaining each."""
        assert self.popen.stdout is not None
        for line in self.popen.stdout:
            line = line.rstrip("\n")
            self._stdout_lines.append(line)
            yield line

    def cancel(self) -> None:
        """``docker kill`` the container (bind-mounted artifacts already on host)."""
        # The workdir is bind-mounted, so partial artifacts are already on the host;
        # just kill the container.
        subprocess.run(
            ["docker", "kill", self.container],
            stdout=subprocess.DEVNULL,
            stderr=subprocess.DEVNULL,
        )

    def wait(self) -> CommandResult:
        """Block for the container to exit and collect its captured output."""
        stdout, stderr = self.popen.communicate()
        # communicate() returns "" for streams already drained via stream().
        out = stdout if stdout else "\n".join(self._stdout_lines)
        return CommandResult(
            exit_code=self.popen.returncode,
            stdout=out,
            stderr=stderr or "",
            duration_s=time.time() - self.started_at,
        )


@dataclass
class DockerSessionHandle(DockerCommandHandle):
    """A command ``docker exec``'d into a live :class:`DockerSession`.

    Identical streaming/wait to the one-shot handle, but :meth:`cancel` is a no-op:
    the session owns the container's lifecycle (``close`` -> ``docker kill``), so one
    exec finishing -- or its context-manager ``__exit__`` -- must not tear the
    container down out from under the next command.
    """

    def cancel(self) -> None:
        """No-op: the owning session, not the handle, kills the container."""


[docs] class DockerBackend(ComputeBackend): """Run sandboxes as local ``docker`` containers over a bind-mounted workdir. The workdir is bind-mounted (not copied), so a command's file edits land directly on the host. Construction is offline -- it just records its knobs; the daemon is only contacted when a command runs. Parameters ---------- image : Image The sandbox image carrying Lean + Mathlib -- its tag plus the toolchain and Mathlib revision the verifier checks projects against. Default :data:`~open_atp.images.DEFAULT_IMAGE`. A mapping is coerced to an :class:`~open_atp.images.Image` (so a parsed config's nested ``image:`` block works). timeout_s : int Wall-clock cap applied to a command when its call site does not pass an explicit ``timeout_s``. Default ``1800``. env : Mapping[str, str], optional Environment variables baked into every command run in the sandbox. Default empty. workdir_mount : str Path inside the container where the workdir is bind-mounted. Default ``/workspace/wd``. baked_lake : str Image-baked warm cache to symlink the workdir's ``.lake`` to. Empty skips the symlink. Default ``/workspace/.lake``. volumes : tuple[tuple[str, str], ...] Extra ``-v host:container`` mounts (e.g. agent credential dirs). Default empty. Examples -------- >>> from open_atp.backends.docker import DockerBackend >>> from open_atp.images import DEFAULT_IMAGE >>> backend = DockerBackend(image=DEFAULT_IMAGE) >>> backend.name 'docker' >>> backend.workdir_mount '/workspace/wd' """ # The image runs as the non-root ``agent`` user (Lean's elan + Claude Code # both live under this HOME); credential mounts land here. container_home = "/home/agent" def __init__( self, *, image: Image | Mapping[str, object] = DEFAULT_IMAGE, timeout_s: int = 1800, env: Mapping[str, str] | None = None, workdir_mount: str = "/workspace/wd", baked_lake: str = "/workspace/.lake", volumes: tuple[tuple[str, str], ...] = (), ) -> None: super().__init__(image=image, timeout_s=timeout_s, env=env) #: Path inside the container where the workdir is bind-mounted. self.workdir_mount = workdir_mount #: Image-baked warm cache to symlink the workdir's ``.lake`` to. self.baked_lake = baked_lake #: Extra ``-v host:container`` mounts. self.volumes = tuple(tuple(v) for v in volumes) @property def name(self) -> str: """Short identifier for the backend: ``"docker"``.""" return "docker" def _wrap(self, command: str) -> str: """cd into the mount and wire up the warm Mathlib cache before ``command``.""" return wrap_command(self.workdir_mount, self.baked_lake, command) def _build_cmd( self, workdir: Path, container: str, env: Mapping[str, str], mounts: Sequence[tuple[str, str]], *, detach: bool = False, ) -> list[str]: """Build the ``docker run`` argv: workdir + extra mounts, env, and image. Parameters ---------- workdir : pathlib.Path Host directory bind-mounted at :attr:`workdir_mount`. container : str ``--name`` for the container, so a run can later be ``docker kill``ed. env : Mapping[str, str] Per-call environment variables, merged over the backend's :attr:`env`. mounts : Sequence[tuple[str, str]] Extra ``(host_path, container_path)`` bind mounts appended after :attr:`volumes`. detach : bool Add ``-d`` for a keep-alive session container (commands then land via ``docker exec``). Default ``False``. Returns ------- list[str] The ``docker run`` argv, up to but not including the in-container command. """ cmd = ["docker", "run", "--rm"] if detach: # Keep-alive container for a session; commands land via ``docker exec``. cmd.append("-d") cmd += ["--name", container] cmd += ["-v", f"{workdir.resolve()}:{self.workdir_mount}"] # Backend-level mounts (baked in) then per-call mounts (e.g. credential dirs). for host, dest in (*self.volumes, *mounts): cmd += ["-v", f"{host}:{dest}"] for key, value in {**self.env, **env}.items(): cmd += ["-e", f"{key}={value}"] cmd += [self.image.name] return cmd def start( self, workdir: Path, command: str, *, env: Mapping[str, str] | None = None, mounts: Sequence[tuple[str, str]] | None = None, timeout_s: int | None = None, ) -> CommandHandle: """``docker run`` ``command`` with ``workdir`` bind-mounted, returning a handle. Edits land on the host directly via the bind mount, so there is no copy-out step; the unique container name lets :meth:`DockerCommandHandle.cancel` kill it. Parameters ---------- workdir : pathlib.Path Host directory bind-mounted at :attr:`workdir_mount`; the command's edits land here directly. command : str The shell command to run inside the container. env : Mapping[str, str], optional Per-call environment variables, merged over the backend's :attr:`env`. Default empty. mounts : Sequence[tuple[str, str]], optional Extra ``(host_path, container_path)`` bind mounts (e.g. agent credential dirs). Default empty. timeout_s : int, optional Unused by Docker (the container has no built-in cap); callers enforce timeouts on the handle. Default ``None``. Returns ------- CommandHandle A live :class:`DockerCommandHandle` to stream or :meth:`~CommandHandle.wait` on. """ container = f"afps-{uuid.uuid4().hex[:12]}" argv = self._build_cmd(workdir, container, env or {}, mounts or ()) argv += ["bash", "-lc", self._wrap(command)] popen = subprocess.Popen( argv, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, start_new_session=True, ) return DockerCommandHandle( popen=popen, container=container, started_at=time.time() )
[docs] def session( self, workdir: Path, *, env: Mapping[str, str] | None = None, mounts: Sequence[tuple[str, str]] | None = None, timeout_s: int | None = None, ) -> ComputeSession: """Start a detached keep-alive container for repeated ``docker exec`` commands. All mounts are wired at ``docker run`` time (Docker can't add them per-exec); the container lives until :meth:`DockerSession.close`. Parameters ---------- workdir : pathlib.Path Host directory bind-mounted at :attr:`workdir_mount` for the session's life. env : Mapping[str, str], optional Environment variables pinned on the container at creation, merged over the backend's :attr:`env`. Default empty. mounts : Sequence[tuple[str, str]], optional Extra ``(host_path, container_path)`` bind mounts wired at creation (Docker can't add them per-exec). Default empty. timeout_s : int, optional Unused by Docker (the container has no built-in cap). Default ``None``. Returns ------- ComputeSession A live :class:`DockerSession` over the workdir. """ # A detached keep-alive container (the workdir bind-mounted, all mounts wired # at run time since Docker can't add them per-exec); commands land via # ``docker exec`` until close() kills it. container = f"afps-{uuid.uuid4().hex[:12]}" argv = self._build_cmd(workdir, container, env or {}, mounts or (), detach=True) argv += ["sleep", "infinity"] proc = subprocess.run( argv, stdout=subprocess.DEVNULL, stderr=subprocess.PIPE, text=True ) if proc.returncode != 0: raise RuntimeError( f"failed to start docker session container: {proc.stderr.strip()}" ) return DockerSession(backend=self, container=container)
[docs] @dataclass class DockerSession(ComputeSession): """A live ``docker run -d`` container; exec many commands, ``docker kill`` once. The workdir is bind-mounted, so :meth:`sync_out`/:meth:`sync_in` are no-ops -- edits already live on the host. """ backend: DockerBackend container: str
[docs] def exec( self, command: str, *, env: Mapping[str, str] | None = None, timeout_s: int | None = None, ) -> CommandHandle: """``docker exec`` ``command`` into the container; close() owns teardown. Parameters ---------- command : str The shell command to run in the live container. env : Mapping[str, str], optional Per-command environment variables (``docker exec -e``). Default empty. timeout_s : int, optional Unused by Docker (the container has no built-in cap). Default ``None``. Returns ------- CommandHandle A live :class:`DockerSessionHandle` whose ``cancel`` is a no-op (the session owns teardown). """ argv = ["docker", "exec"] for key, value in (env or {}).items(): argv += ["-e", f"{key}={value}"] argv += [self.container, "bash", "-lc", self.backend._wrap(command)] popen = subprocess.Popen( argv, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, start_new_session=True, ) return DockerSessionHandle( popen=popen, container=self.container, started_at=time.time() )
[docs] def sync_out(self) -> None: """No-op: the bind mount means edits are already on the host."""
[docs] def sync_in(self) -> None: """No-op: the bind mount means host edits are already visible in-container."""
[docs] def close(self) -> None: """``docker kill`` the keep-alive container. Idempotent.""" # Idempotent: killing an already-gone container just errors out quietly. subprocess.run( ["docker", "kill", self.container], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, )