Source code for open_atp.provers.agent_prover

"""AgentProver: a coding agent + lean-lsp-mcp running in a sandbox.

An ``AgentProver`` composes:

* a :class:`~open_atp.harness.Harness` (claude_code / codex / opencode) -- the
  *agent* concern: launch script, auth forwarding, output parsing; and
* a :class:`~open_atp.backends.base.ComputeBackend` -- the *compute* concern:
  where the agent runs, with Lean+Mathlib and the lean-lsp MCP server.

``prove`` stages the project into the workdir, lets the agent fill the sorrys in
place, then diffs the ``.lean`` files against the staged originals to report what
changed. The shared :class:`~open_atp.verify.Verifier` (owned by the base
``run``) does the final compile/sorry/axiom check.
"""

from __future__ import annotations

import logging
import os
import shutil
from dataclasses import dataclass, field
from pathlib import Path
from typing import TextIO

from open_atp.backends.base import (
    CommandHandle,
    CommandResult,
    ComputeBackend,
    ComputeSession,
)
from open_atp.harness import HARNESSES, Harness, bundle_for_config, compute_cost_usd
from open_atp.lean import LeanProject, ProofTask
from open_atp.provers.base import AutomatedProver, AutomatedProverConfig
from open_atp.verify import ProofResult

log = logging.getLogger(__name__)

_DEFAULT_PROMPT = """\
The working directory is a complete Lean 4 lake project. One or more `.lean`
files contain `sorry` (or `admit`) placeholders standing in for proofs that have
not been written yet. Replace every such placeholder with a real proof so the
project compiles cleanly and depends on no axioms beyond Lean's standard set.

Hard rules:
- Do not weaken, rename, restate, or delete any theorem, lemma, `def`,
  `structure`, or signature. Only fill in proof bodies (the part after `:=` /
  `by` that is currently `sorry`). Changing a statement to make it easier to
  prove is failure, not success.
- No new axioms and no `sorry`/`admit`/`native_decide`-on-false escapes. The
  finished proof must type-check honestly. The only acceptable axioms are Lean's
  standard `propext`, `Classical.choice`, and `Quot.sound`.
- Stay inside this working directory; do not read or write files outside it.
- Do not edit `lakefile.toml`/`lakefile.lean`, `lean-toolchain`, or
  `lake-manifest.json` — they pin the toolchain and dependencies and must match
  the verification environment.

Workflow:
1. Find the work: search for `sorry` across the `.lean` source files (e.g.
   `rg -n '\\bsorry\\b'`). Read each file containing one to understand the
   statement, the hypotheses, and the relevant imports.
2. Confirm the lean-lsp MCP server is live before relying on it: call
   `mcp__lean-lsp__lean_diagnostic_messages` on a file you have not yet edited.
   `success:true, items:[]` means it compiles cleanly; real errors come back as
   `items`. `success:false, items:[]` usually means imports aren't built yet —
   run `lake build` for the relevant modules first.
3. Write a proof for one `sorry` at a time. Mathlib is available; prefer library
   lemmas, `simp`, `omega`, `linarith`, `exact?`/`apply?` suggestions, and
   `aesop` over long bespoke arguments.
4. After each edit, re-check that file with
   `mcp__lean-lsp__lean_diagnostic_messages` and iterate until it is clean.
5. When a file looks done, verify it has no stubbed proofs with
   `mcp__lean-lsp__lean_verify` — the reported axioms must NOT contain `sorryAx`.
6. Repeat until no `.lean` file contains a `sorry` and the whole project builds
   (`lake build`).

Tips:
- Use the lean-lsp tools (`mcp__lean-lsp__*`) as your primary feedback loop; they
  are far faster than a full `lake build` per change. Use `lake build` to
  materialize oleans for imports and as the final whole-project check.
- If a goal looks false or unprovable from the given hypotheses, re-read the
  statement: you likely misread a binder or a coercion. Do not "fix" it by
  changing the statement — finish the proof as stated.
- Non-trivial proofs routinely take many rounds of compile-error fixing. Keep
  iterating against the diagnostics rather than guessing."""
# END _DEFAULT_PROMPT (docs literalinclude end marker -- keep adjacent)

# Directories never worth copying into the agent workdir.
_IGNORE = shutil.ignore_patterns(".lake", ".git", "*.tar.gz")


[docs] @dataclass class AgentProverConfig(AutomatedProverConfig): """Configuration for :class:`AgentProver`. Extends :class:`~open_atp.provers.base.AutomatedProverConfig` (``timeout_s``, ``env``) with the agent-harness knobs. Attributes ---------- harness : str Which agent CLI to drive: one of ``claude_code`` | ``opencode`` | ``codex`` | ``vibe``. Default ``claude_code``. model : str Model id passed to the harness. Default ``claude-opus-4-8``. effort : str Reasoning-effort level passed to harnesses that support it. Default ``high``. assets : str Vendored skill/prompt/MCP asset bundle to mount into the workdir. Default ``default``. skills : list[str], optional Per-run override of the bundle's skills, each a name (resolved from a vendored catalog) or a full path. ``None`` keeps the bundle's own skills; an empty list mounts none. Applies to every harness. plugins : list[str], optional Per-run override of the bundle's plugins, same resolution as ``skills``. Claude-only; ignored by the other harnesses. extra_env : dict[str, str] Additional environment variables forwarded into the agent sandbox. Default empty. agent : str Vibe-only: which vibe agent profile to drive (``lean`` is Leanstral; ``lean-standin`` the non-Labs, model-templated stand-in). Ignored by the other harnesses. Default ``lean``. max_turns : int, optional Vibe-only programmatic-run guard passed straight to ``vibe -p``. ``None`` leaves it unset. Ignored by the other harnesses. max_price : float, optional Vibe-only programmatic-run guard passed straight to ``vibe -p``. ``None`` leaves it unset. Ignored by the other harnesses. max_iterations : int, optional AxProver-only cap on ax-prover's proposer->builder->reviewer loop. ``None`` keeps ax-prover's default (50). Ignored by the other harnesses. """ harness: str = "claude_code" model: str = "claude-opus-4-8" effort: str = "high" assets: str = "default" skills: list[str] | None = None plugins: list[str] | None = None extra_env: dict[str, str] = field(default_factory=dict) agent: str = "lean" max_turns: int | None = None max_price: float | None = None max_iterations: int | None = None
[docs] class AgentProver(AutomatedProver): """Generate proofs by driving an agent CLI harness in a compute backend. Composes an :doc:`agent harness </api/harness>` (the *agent* concern) with a :class:`~open_atp.backends.base.ComputeBackend` (the *compute* concern): the harness edits the staged ``.lean`` files in place, then the shared :class:`~open_atp.verify.Verifier` does the final compile/sorry/axiom check. ``codex``, ``opencode``, ``axprover``, and ``vibe`` are this prover on a different harness. Examples -------- Build the config and construct the prover directly (the agent backend defaults to the verify backend): >>> from open_atp.backends.docker import DockerBackend, DockerConfig >>> from open_atp.provers.agent_prover import AgentProver, AgentProverConfig >>> backend = DockerBackend(DockerConfig()) >>> config = AgentProverConfig( ... harness="claude_code", ... model="claude-opus-4-8", ... effort="high", ... ) >>> prover = AgentProver(config, verification_backend=backend) >>> prover.config.harness 'claude_code' """ name = "agent" config: AgentProverConfig def __init__( self, config: AgentProverConfig, verification_backend: ComputeBackend, agent_backend: ComputeBackend | None = None, ) -> None: super().__init__(config, verification_backend) # Generation may run in a different backend than verification (e.g. Modal for # the agent, local Docker for the cheap final check) -- but defaults to shared. self.agent_backend = agent_backend or verification_backend def _generate( self, task: ProofTask, wd: Path, logs_dir: Path, result: ProofResult ) -> None: # 1. Stage the project so the workdir is a complete project to edit in place. shutil.copytree(task.project.root, wd, dirs_exist_ok=True, ignore=_IGNORE) # 2. Snapshot the original .lean contents for the post-run diff. original = { p.relative_to(wd).as_posix(): p.read_text() for p in wd.rglob("*.lean") if ".lake" not in p.parts } # 3. Configure the workdir for the chosen harness + asset bundle. bundle = bundle_for_config(self.config) harness = HARNESSES[self.config.harness].from_config(self.config, assets=bundle) # Prompt precedence: explicit task instructions > bundle default > generic. prompt = task.instructions or bundle.default_prompt() or _DEFAULT_PROMPT harness.configure_wd(wd, prompt) stdout_path = logs_dir / "stdout.txt" # 4. Run the agent. When generation and verification share a backend, keep the # sandbox alive and run the final check in it -- no second spin-up. if self.agent_backend is self.verifier.backend: # Mounts (credential dirs) must be pinned when the session sandbox is # created; per-command env is forwarded by _run_agent's exec. _, mounts = self._auth(harness) with self.agent_backend.session( wd, mounts=mounts, timeout_s=self.config.timeout_s ) as session: lines, stderr = self._run_agent( wd, harness, stdout_path, session=session ) self._download_wd(wd, session=session) # Parse (which reads harness usage files still in wd) before # _download_logs relocates them out. self._fill_result(result, harness, wd, original, lines) self._download_logs(harness, wd, logs_dir, stderr) result.verification = self.verifier.verify( LeanProject(wd), session=session ) return lines, stderr = self._run_agent(wd, harness, stdout_path) self._download_wd(wd) self._fill_result(result, harness, wd, original, lines) self._download_logs(harness, wd, logs_dir, stderr) def _download_wd(self, wd: Path, session: ComputeSession | None = None) -> None: """Bring the agent's edits onto the host at ``wd``. A tar pull for a live Modal session; a no-op for bind-mounted Docker (where the agent wrote ``wd`` directly) and for the one-shot path (whose backend syncs out on context exit). """ if session is not None: session.sync_out() def _download_logs( self, harness: Harness, wd: Path, logs_dir: Path, stderr: str = "" ) -> None: """Populate ``logs_dir`` with the run record beside the live ``stdout.txt``. The streamed stdout is already teed into ``logs_dir/stdout.txt`` as it arrives; here we relocate any harness-specific rich logs out of the workdir (no-op for the CLI harnesses) and write the captured ``stderr``. """ harness.collect_logs(wd, logs_dir) if stderr: (logs_dir / "stderr.txt").write_text(stderr) def _fill_result( self, result: ProofResult, harness: Harness, wd: Path, original: dict[str, str], lines: list[str], ) -> None: """Token totals -> cost, then diff the workdir against the staged originals.""" parsed = harness.parse(lines) cost = parsed.cost_usd if cost is None: cost = compute_cost_usd( self.config.model, parsed.input_tokens, parsed.output_tokens ) completed: dict[str, str] = {} for path in sorted(wd.rglob("*.lean")): if ".lake" in path.parts: continue rel = path.relative_to(wd).as_posix() content = path.read_text() if original.get(rel) != content: completed[rel] = content result.completed_files = completed result.cost_usd = cost result.metadata = { "harness": harness.name, "model": self.config.model, "effort": self.config.effort, "input_tokens": parsed.input_tokens, "output_tokens": parsed.output_tokens, "stop_reason": parsed.stop_reason, } def _auth(self, harness: Harness) -> tuple[dict[str, str], list[tuple[str, str]]]: """Resolve the harness's :class:`AuthSpec` into backend env + mounts.""" spec = harness.auth_spec() env: dict[str, str] = {} for key in spec.env: value = os.environ.get(key) if value is not None: env[key] = value env.update(harness.static_env()) env.update(self.config.env) env.update(self.config.extra_env) home = self.agent_backend.container_home mounts = [(str(src), f"{home}/{dest}") for src, dest in spec.home_dirs] return env, mounts def _run_agent( self, workdir: Path, harness: Harness, stdout_path: Path, session: ComputeSession | None = None, ) -> tuple[list[str], str]: """Resolve auth, launch the agent in the backend, and tee its stdout. Each streamed event line is written to ``stdout_path`` (opened in append mode, so a multi-round caller accumulates one transcript) as it arrives -- the live run log -- and collected to return for token/cost parsing. Returns ``(stdout_lines, stderr)``: the lines plus the run's captured stderr (Modal's ``modal_stderr.txt`` / the one-shot run's stderr), which the prover writes to ``logs/stderr.txt``. Isolated (it owns credential resolution + the backend call) so tests can stand in a fake run -- write a solved file, return a captured stream -- without Docker or credentials. ``session`` given (the backend-reuse path): exec the agent in that live sandbox, which stays up for the verifier afterwards. ``None``: the one-shot path, a fresh sandbox via ``start`` that tears down on exit. """ env, mounts = self._auth(harness) lines: list[str] = [] def drain(handle: CommandHandle, sink: TextIO) -> None: for line in handle.stream(): sink.write(line + "\n") sink.flush() lines.append(line) with stdout_path.open("a", encoding="utf-8") as sink: if session is not None: # Mounts were pinned at session creation; only per-command env here. handle = session.exec(harness.command, env=env) drain(handle, sink) result = handle.wait() self._log_agent_result(harness, result, lines) return lines, result.stderr with self.agent_backend.start( workdir, harness.command, env=env, mounts=mounts, timeout_s=self.config.timeout_s, ) as handle: drain(handle, sink) result = handle.wait() self._log_agent_result(harness, result, lines) return lines, result.stderr def _log_agent_result( self, harness: Harness, result: CommandResult, lines: list[str] ) -> None: """Surface a silent/failed agent run -- its stderr is otherwise discarded. The agent's stderr (captured by the backend) is the only clue when an agent emits no parseable stdout (e.g. a launch/auth failure that leaves 0 tokens and no edits). Without this a silent run is invisible short of re-running the whole sandbox job, so log a warning whenever the agent exits non-zero or produces no output at all. """ if result.exit_code != 0 or not lines: stderr = result.stderr.strip() log.warning( "agent harness %r exited %s with %d stdout line(s)%s", harness.name, result.exit_code, len(lines), f"; stderr:\n{stderr}" if stderr else " and no stderr", )