Source code for open_atp.provers.agent_prover

"""AgentProver: a coding agent + lean-lsp-mcp running in a sandbox.

An ``AgentProver`` composes:

* a :class:`~open_atp.harness.Harness` (claude_code / codex / opencode) -- the
  *agent* concern: launch script, auth forwarding, output parsing; and
* a :class:`~open_atp.backends.base.ComputeBackend` -- the *compute* concern:
  where the agent runs, with Lean+Mathlib and the lean-lsp MCP server.

``prove`` stages the project into the workdir, lets the agent fill the sorrys in
place, then diffs the ``.lean`` files against the staged originals to report what
changed. The shared :class:`~open_atp.verify.Verifier` (owned by the base
``run``) does the final compile/sorry/axiom check.
"""

from __future__ import annotations

import logging
import shutil
from pathlib import Path
from typing import TextIO

from open_atp.backends.base import (
    CommandHandle,
    CommandResult,
    ComputeBackend,
    ComputeSession,
)
from open_atp.harness import (
    ClaudeCodeHarness,
    Harness,
    compute_cost_usd,
)
from open_atp.harness._catalog import resolve_skill
from open_atp.lean import LeanProject, ProofTask
from open_atp.provers.base import (
    AutomatedProver,
    ProofResult,
    compose_prompt,
)

log = logging.getLogger(__name__)

PROVER_PROMPT = """\
The working directory is a complete Lean 4 lake project. One or more `.lean`
files contain `sorry` (or `admit`) placeholders standing in for proofs that have
not been written yet. Replace every such placeholder with a real proof so the
project compiles cleanly and depends on no axioms beyond Lean's standard set.

Hard rules:
- Do not weaken, rename, restate, or delete any theorem, lemma, `def`,
  `structure`, or signature. Only fill in proof bodies (the part after `:=` /
  `by` that is currently `sorry`). Changing a statement to make it easier to
  prove is failure, not success.
- No new axioms and no `sorry`/`admit`/`native_decide`-on-false escapes. The
  finished proof must type-check honestly. The only acceptable axioms are Lean's
  standard `propext`, `Classical.choice`, and `Quot.sound`.
- Stay inside this working directory; do not read or write files outside it.
- Do not edit `lakefile.toml`/`lakefile.lean`, `lean-toolchain`, or
  `lake-manifest.json` — they pin the toolchain and dependencies and must match
  the verification environment.

Workflow:
1. Find the work: search for `sorry` across the `.lean` source files (e.g.
   `rg -n '\\bsorry\\b'`). Read each file containing one to understand the
   statement, the hypotheses, and the relevant imports.
2. Confirm the lean-lsp MCP server is live before relying on it: call
   `mcp__lean-lsp__lean_diagnostic_messages` on a file you have not yet edited.
   `success:true, items:[]` means it compiles cleanly; real errors come back as
   `items`. `success:false, items:[]` usually means imports aren't built yet —
   run `lake build` for the relevant modules first.
3. Write a proof for one `sorry` at a time. Mathlib is available; prefer library
   lemmas, `simp`, `omega`, `linarith`, `exact?`/`apply?` suggestions, and
   `aesop` over long bespoke arguments.
4. After each edit, re-check that file with
   `mcp__lean-lsp__lean_diagnostic_messages` and iterate until it is clean.
5. When a file looks done, verify it has no stubbed proofs with
   `mcp__lean-lsp__lean_verify` — the reported axioms must NOT contain `sorryAx`.
6. Repeat until no `.lean` file contains a `sorry` and the whole project builds
   (`lake build`).

Tips:
- Use the lean-lsp tools (`mcp__lean-lsp__*`) as your primary feedback loop; they
  are far faster than a full `lake build` per change. Use `lake build` to
  materialize oleans for imports and as the final whole-project check.
- If a goal looks false or unprovable from the given hypotheses, re-read the
  statement: you likely misread a binder or a coercion. Do not "fix" it by
  changing the statement — finish the proof as stated.
- Non-trivial proofs routinely take many rounds of compile-error fixing. Keep
  iterating against the diagnostics rather than guessing."""
# END PROVER_PROMPT (docs literalinclude end marker -- keep adjacent)

# Directories never worth copying into the agent workdir.
_IGNORE = shutil.ignore_patterns(".lake", ".git", "*.tar.gz")


[docs] class AgentProver(AutomatedProver): """Generate proofs by driving an agent CLI harness in a compute backend. Composes an :doc:`agent harness </api/harness>` (the *agent* concern) with a :class:`~open_atp.backends.base.ComputeBackend` (the *compute* concern): the harness edits the staged ``.lean`` files in place, then the shared :class:`~open_atp.verify.Verifier` does the final compile/sorry/axiom check. ``codex``, ``opencode``, ``axprover``, and ``vibe`` are this prover on a different harness. Parameters ---------- backend : ComputeBackend The sandbox the agent runs in. Generation reuses it via a live session and verification runs in that same hot sandbox. harness : Harness, optional The harness to drive and its knobs: :class:`~open_atp.harness.ClaudeCodeHarness` (default), :class:`~open_atp.harness.CodexHarness`, :class:`~open_atp.harness.OpenCodeHarness`, :class:`~open_atp.harness.VibeHarness`, or :class:`~open_atp.harness.AxProverHarness`. Carries ``model``/``effort`` plus any harness-specific knobs. Plugins are Claude-only and live on :class:`~open_atp.harness.ClaudeCodeHarness`. skills : list[str], optional Skills to mount into the agent workdir, each a name (resolved from the vendored ``leanprover/skills`` catalog) or a full path to a ``SKILL.md`` tree. Default ``["lean-proof"]``; an empty list mounts none. Staged into every skill-supporting harness's location; ignored by ax-prover. timeout_s : int Wall-clock budget for the generation run, in seconds. Default ``1800``. Attributes ---------- prover_prompt : str The prover's own prompt handed to the agent, before any user prompt. Examples -------- Construct the prover directly, wiring up a harness and a backend: >>> from open_atp.backends.docker import DockerBackend >>> from open_atp.harness import CodexHarness >>> from open_atp.provers.agent_prover import AgentProver >>> backend = DockerBackend() >>> prover = AgentProver(harness=CodexHarness(effort="high"), backend=backend) >>> prover.harness.model 'gpt-5.5' Or build the same prover from the standard catalog by name, taking its baked-in defaults (see :func:`~open_atp.config.standard_prover`): >>> from open_atp import standard_prover >>> prover = standard_prover("agent:codex", backend=DockerBackend()) >>> prover.name, prover.harness.name ('agent', 'codex') Complete a task's ``sorry``\\s with :meth:`~open_atp.provers.base.AutomatedProver.prove`, here on a bundled example (this runs the agent in Docker and bills it): >>> import tempfile >>> from open_atp.examples import EXAMPLE, example_task >>> task = example_task(EXAMPLE.MUL_REORDER) >>> result = prover.prove(task, tempfile.mkdtemp()) # doctest: +SKIP >>> result.success # doctest: +SKIP True """ name = "agent" def __init__( self, *, backend: ComputeBackend, harness: Harness | None = None, skills: list[str] | None = None, timeout_s: int = 1800, ) -> None: super().__init__(backend=backend, timeout_s=timeout_s) #: The harness this prover drives. self.harness = harness or ClaudeCodeHarness() #: Skills to mount into the agent workdir (names or paths). self.skills = skills if skills is not None else ["lean-proof"] @property def prover_prompt(self) -> str: """The prover's own prompt handed to the agent, before any user prompt.""" return PROVER_PROMPT def _generate( self, task: ProofTask, wd: Path, logs_dir: Path, result: ProofResult ) -> None: # 1. Stage the project so the workdir is a complete project to edit in place. shutil.copytree(task.project.root, wd, dirs_exist_ok=True, ignore=_IGNORE) # 2. Snapshot the original .lean contents for the post-run diff. original = { p.relative_to(wd).as_posix(): p.read_text() for p in wd.rglob("*.lean") if ".lake" not in p.parts } # 3. Stage the workdir: the harness launch script, then the config's skills # (this prover owns the list; the harness owns where they land), then prompt # (this prover's own prompt + the task's optional user prompt). harness = self.harness harness.stage_wd(wd) harness.stage_skills(wd, [resolve_skill(s) for s in self.skills]) harness.write_prompt(wd, compose_prompt(self.prover_prompt, task.user_prompt)) stdout_path = logs_dir / "stdout.txt" # 4. Run the agent and the final check in one persistent sandbox: generation # and verification share the backend, so we keep the container hot between # them and never pay a second spin-up. Mounts (credential dirs) are pinned # when the session is created; per-command env is forwarded by _run_agent. _, mounts = self._auth(harness) with self.verifier.backend.session( wd, mounts=mounts, timeout_s=self.timeout_s ) as session: lines, stderr = self._run_agent(wd, harness, stdout_path, session) self._download_wd(wd, session) # Parse (which reads harness usage files still in wd) before # _download_logs relocates them out. self._fill_result(result, harness, wd, original, lines) self._download_logs(harness, wd, logs_dir, stderr) result.verification = self.verifier.verify(LeanProject(wd), session=session) def _download_wd(self, wd: Path, session: ComputeSession) -> None: """Bring the agent's edits onto the host at ``wd``. A tar pull for a live Modal session; a no-op for bind-mounted Docker (where the agent wrote ``wd`` directly). """ session.sync_out() def _download_logs( self, harness: Harness, wd: Path, logs_dir: Path, stderr: str = "" ) -> None: """Populate ``logs_dir`` with the run record beside the live ``stdout.txt``. The streamed stdout is already teed into ``logs_dir/stdout.txt`` as it arrives; here we relocate any harness-specific rich logs out of the workdir (no-op for the CLI harnesses) and write the captured ``stderr``. """ harness.collect_logs(wd, logs_dir) if stderr: (logs_dir / "stderr.txt").write_text(stderr) def _fill_result( self, result: ProofResult, harness: Harness, wd: Path, original: dict[str, str], lines: list[str], ) -> None: """Token totals -> cost, then diff the workdir against the staged originals.""" parsed = harness.parse_result(lines) cost = parsed.cost_usd if cost is None: cost = compute_cost_usd( self.harness.model, parsed.input_tokens, parsed.output_tokens ) completed: dict[str, str] = {} for path in sorted(wd.rglob("*.lean")): if ".lake" in path.parts: continue rel = path.relative_to(wd).as_posix() content = path.read_text() if original.get(rel) != content: completed[rel] = content result.completed_files = completed result.cost_usd = cost result.metadata = { "harness": harness.name, "model": self.harness.model, "effort": self.harness.effort, "input_tokens": parsed.input_tokens, "output_tokens": parsed.output_tokens, "stop_reason": parsed.stop_reason, } def _auth(self, harness: Harness) -> tuple[dict[str, str], list[tuple[str, str]]]: """Map the harness's resolved :class:`AgentAuth` onto backend env + mounts.""" auth = harness.agent_auth() home = self.verifier.backend.container_home mounts = [(str(src), f"{home}/{dest}") for src, dest in auth.mounts] return auth.env, mounts def _run_agent( self, workdir: Path, harness: Harness, stdout_path: Path, session: ComputeSession, ) -> tuple[list[str], str]: """Resolve auth, launch the agent in the live ``session``, and tee its stdout. Each streamed event line is written to ``stdout_path`` (opened in append mode, so a multi-round caller accumulates one transcript) as it arrives -- the live run log -- and collected to return for token/cost parsing. Returns ``(stdout_lines, stderr)``: the lines plus the run's captured stderr (e.g. Modal's ``modal_stderr.txt``), which the prover writes to ``logs/stderr.txt``. The agent execs in the persistent ``session`` -- the same hot sandbox that stays up for the verifier afterwards. Mounts (credential dirs) were pinned when the session was created; only per-command env is forwarded here. Isolated (it owns credential resolution + the backend call) so tests can stand in a fake run -- write a solved file, return a captured stream -- without Docker or credentials. """ env, _ = self._auth(harness) lines: list[str] = [] def drain(handle: CommandHandle, sink: TextIO) -> None: for line in handle.stream(): sink.write(line + "\n") sink.flush() lines.append(line) with stdout_path.open("a", encoding="utf-8") as sink: handle = session.exec(harness.command, env=env) drain(handle, sink) result = handle.wait() self._log_agent_result(harness, result, lines) return lines, result.stderr def _log_agent_result( self, harness: Harness, result: CommandResult, lines: list[str] ) -> None: """Surface a silent/failed agent run -- its stderr is otherwise discarded. The agent's stderr (captured by the backend) is the only clue when an agent emits no parseable stdout (e.g. a launch/auth failure that leaves 0 tokens and no edits). Without this a silent run is invisible short of re-running the whole sandbox job, so log a warning whenever the agent exits non-zero or produces no output at all. """ if result.exit_code != 0 or not lines: stderr = result.stderr.strip() log.warning( "agent harness %r exited %s with %d stdout line(s)%s", harness.name, result.exit_code, len(lines), f"; stderr:\n{stderr}" if stderr else " and no stderr", )