Source code for open_atp.verify

"""Verification: the output types and the shared verifier.

Every prover -- agentic, Numina, *and* Aristotle -- funnels its output through the
:class:`Verifier`. The checks are lifted from milp_flare's ``entrypoint.sh`` +
``FLARE._evaluate``:

1. Compile each target file with ``lake env lean <file>``.
2. Scan the compile log for ``sorry`` warnings.
3. Extract the axiom dependency list via ``#print axioms`` and compare against
   :data:`STANDARD_AXIOMS`.

It also enforces the input contract: the project's toolchain (and locked Mathlib
revision) must match the backend image's pins, else we reject before spending
compute.
"""

from __future__ import annotations

import re
from dataclasses import dataclass, field
from pathlib import Path

from open_atp.backends.base import ComputeBackend, ComputeSession
from open_atp.backends.docker import DockerBackend, DockerConfig
from open_atp.backends.modal import ModalBackend, ModalConfig
from open_atp.images import DEFAULT_IMAGE, Image
from open_atp.lean import LeanProject, MathlibRevMismatch, ToolchainMismatch

# Axioms that a "clean" Mathlib proof is allowed to depend on. Anything else
# (notably ``sorryAx``) means the proof is not actually complete.
STANDARD_AXIOMS = frozenset({"propext", "Classical.choice", "Quot.sound"})

_SORRY_RE = re.compile(r"declaration uses 'sorry'|uses `sorry`")
_AXIOMS_RE = re.compile(r"depends on axioms: \[([^\]]*)\]")


[docs] @dataclass(frozen=True) class VerificationReport: """Result of compiling a candidate project in a sandbox. Produced by :class:`Verifier` and shared by every prover, including Aristotle. Attributes ---------- compiles : bool Whether the whole project built successfully. sorry_free : bool Whether the build is free of ``sorry`` (no incomplete proofs remain). axioms : tuple[str, ...] Every axiom the compiled project depends on, as reported by Lean. compile_log : str The full build log. Omitted from :meth:`to_dict`. per_file : dict[str, bool] Per-file compile status, keyed by file path relative to the project root. non_standard_axioms : tuple[str, ...] The axioms outside :data:`STANDARD_AXIOMS` -- notably ``sorryAx``, which means the proof is not actually complete. verified : bool True iff the project compiles, has no ``sorry``, and uses no axioms outside :data:`STANDARD_AXIOMS`. """ compiles: bool sorry_free: bool axioms: tuple[str, ...] = () compile_log: str = "" per_file: dict[str, bool] = field(default_factory=dict) @property def non_standard_axioms(self) -> tuple[str, ...]: return tuple(a for a in self.axioms if a not in STANDARD_AXIOMS) @property def verified(self) -> bool: """True iff the project compiles, has no sorry, and no foreign axioms.""" return self.compiles and self.sorry_free and not self.non_standard_axioms
[docs] def to_dict(self) -> dict[str, object]: """JSON-ready summary (the full ``compile_log`` is intentionally omitted).""" return { "verified": self.verified, "compiles": self.compiles, "sorry_free": self.sorry_free, "axioms": list(self.axioms), "non_standard_axioms": list(self.non_standard_axioms), "per_file": dict(self.per_file), }
[docs] @dataclass class ProofResult: """What a prover returns from :meth:`~open_atp.provers.base.AutomatedProver.prove`. The prover writes its artifacts into the caller-chosen :attr:`output_dir`, laid out as ``output_dir/{wd,logs}/``: ``wd`` is the completed lake project (the proof output) and ``logs`` is the run record (the streamed agent ``stdout.txt``, ``stderr.txt``, ``result.json``, and any harness-specific rich logs). This object just records where those live, plus the verification verdict and run metadata. Attributes ---------- prover : str Name of the prover that produced this result. verification : VerificationReport or None The shared verification of the completed project, or ``None`` when the run failed before a candidate could be verified (see :attr:`error`). output_dir : pathlib.Path The run's output directory. Holds the :attr:`wd` (proof project) and :attr:`logs_dir` (run record) subdirectories the prover populated. completed_files : dict[str, str] The completed ``.lean`` sources, keyed by file path relative to the project root. cost_usd : float, optional Estimated USD cost of the run. ``None`` when the prover does not report cost. duration_s : float, optional Wall-clock duration of the run, in seconds. metadata : dict[str, object] Harness-specific run metadata (token counts, run summaries, ...). error : str, optional Set when the prover raised before producing a result (Docker down, API error, toolchain mismatch). :attr:`verification` is ``None`` and :attr:`success` is ``False``. wd : pathlib.Path The completed working directory (``output_dir/wd``) -- a complete lake project with the completed ``.lean`` files. The proof output. logs_dir : pathlib.Path The run's logs directory (``output_dir/logs``) -- the captured agent stream (``stdout.txt``), ``stderr.txt``, ``result.json``, and any harness-specific rich record (Vibe's session log, ax-prover's per-target logs, Aristotle's events). success : bool True iff :attr:`verification` exists and is :attr:`~VerificationReport.verified`. """ prover: str verification: VerificationReport | None output_dir: Path completed_files: dict[str, str] = field(default_factory=dict) cost_usd: float | None = None duration_s: float | None = None metadata: dict[str, object] = field(default_factory=dict) error: str | None = None @property def wd(self) -> Path: """The completed proof project (``output_dir/wd``).""" return self.output_dir / "wd" @property def logs_dir(self) -> Path: """The run's logs directory (``output_dir/logs``).""" return self.output_dir / "logs" @property def success(self) -> bool: return bool(self.verification and self.verification.verified)
[docs] def to_dict(self) -> dict[str, object]: """JSON-ready view: inline files, verification, cost, and artifact paths.""" return { "prover": self.prover, "success": self.success, "error": self.error, "verification": self.verification.to_dict() if self.verification else None, "completed_files": dict(self.completed_files), "cost_usd": self.cost_usd, "duration_s": self.duration_s, "output_dir": str(self.output_dir), "wd": str(self.wd), "logs_dir": str(self.logs_dir), "metadata": dict(self.metadata), }
[docs] def docker_verifier(image: Image = DEFAULT_IMAGE) -> Verifier: """A :class:`Verifier` backed by a local Docker sandbox running ``image``.""" return Verifier(DockerBackend(DockerConfig(image=image)))
def modal_verifier(image: Image = DEFAULT_IMAGE) -> Verifier: """A :class:`Verifier` backed by a Modal Sandbox running the published ``image``. Needs Modal credentials and the image published via ``open-atp build-modal-image``. The image's ``:tag`` is dropped for the Modal name lookup. """ return Verifier(ModalBackend(ModalConfig(image=image)))
[docs] class Verifier: """Compiles projects in a :class:`ComputeBackend` and reports their status.""" def __init__(self, backend: ComputeBackend) -> None: self.backend = backend @property def image(self) -> Image: """The image the backend runs -- the compatibility contract projects match.""" return self.backend.config.image
[docs] def check_compatible(self, project: LeanProject) -> None: """Reject a project whose pins differ from the backend image's. Matches the project's :attr:`~open_atp.lean.LeanProject.lean_toolchain` against the image's :attr:`~open_atp.images.Image.lean_toolchain`, and its locked :attr:`~open_atp.lean.LeanProject.mathlib_rev` (when the project records one) against the image's :attr:`~open_atp.images.Image.mathlib_rev`. Raises :class:`~open_atp.lean.ToolchainMismatch` or :class:`~open_atp.lean.MathlibRevMismatch` on the first mismatch. Examples -------- >>> import tempfile >>> from pathlib import Path >>> from open_atp.backends.docker import DockerBackend, DockerConfig >>> from open_atp.images import Image >>> from open_atp.lean import LeanProject >>> from open_atp.verify import Verifier >>> root = Path(tempfile.mkdtemp()) >>> _ = (root / "lakefile.toml").write_text('name = "demo"\\n') >>> _ = (root / "lean-toolchain").write_text("leanprover/lean4:v4.31.0\\n") >>> project = LeanProject(root) A matching pin passes silently: >>> image = Image(lean_toolchain="leanprover/lean4:v4.31.0") >>> ok = Verifier(DockerBackend(DockerConfig(image=image))) >>> ok.check_compatible(project) A differing pin is rejected up front: >>> bad = Verifier(DockerBackend(DockerConfig(image=Image()))) >>> bad.check_compatible(project) # doctest: +ELLIPSIS Traceback (most recent call last): ... open_atp.lean.ToolchainMismatch: Project pins ... """ image = self.image if project.lean_toolchain != image.lean_toolchain: raise ToolchainMismatch( f"Project pins toolchain {project.lean_toolchain!r} but backend image " f"supports {image.lean_toolchain!r}. Re-submit against a matching " "image." ) if project.mathlib_rev is not None and project.mathlib_rev != image.mathlib_rev: raise MathlibRevMismatch( f"Project pins Mathlib {project.mathlib_rev!r} but backend image " f"supports {image.mathlib_rev!r}. Re-submit against a matching image." )
[docs] def verify( self, project: LeanProject, *, session: ComputeSession | None = None ) -> VerificationReport: """Compile ``project`` and return a :class:`VerificationReport`. Standalone (``session is None``) the compile spins up its own sandbox via ``backend.run`` -- the path Aristotle and the split-backend case take. When a caller passes a live ``session`` (the agent/verify backend-reuse path), the compile runs in that already-hot sandbox instead, avoiding a second spin-up. Examples -------- A real project compiles in the backend; a project with no ``.lean`` files short-circuits to a trivial passing report without touching the sandbox: >>> import tempfile >>> from pathlib import Path >>> from open_atp.backends.docker import DockerBackend, DockerConfig >>> from open_atp.images import Image >>> from open_atp.lean import LeanProject >>> from open_atp.verify import Verifier >>> root = Path(tempfile.mkdtemp()) >>> _ = (root / "lakefile.toml").write_text('name = "demo"\\n') >>> _ = (root / "lean-toolchain").write_text("leanprover/lean4:v4.31.0\\n") >>> image = Image(lean_toolchain="leanprover/lean4:v4.31.0") >>> verifier = Verifier(DockerBackend(DockerConfig(image=image))) >>> report = verifier.verify(LeanProject(root)) >>> report.verified True """ self.check_compatible(project) rel = [t.relative_to(project.root).as_posix() for t in project.lean_files()] if not rel: return VerificationReport(compiles=True, sorry_free=True) script = self._compile_script(rel) if session is None: result = self.backend.run(project.root, script) else: with session.exec(script) as handle: result = handle.wait() log = result.stdout + ("\n" + result.stderr if result.stderr else "") per_file = self._parse_per_file(log, rel) compiles = result.exit_code == 0 and all(per_file.values()) return VerificationReport( compiles=compiles, sorry_free=not _SORRY_RE.search(log), axioms=self._parse_axioms(log), compile_log=log, per_file=per_file, )
@staticmethod def _compile_script(rel: list[str]) -> str: """Compile each file, bracketing it with markers so we can read exit codes. Mirrors milp_flare's ``entrypoint.sh``: every file runs (``;`` not ``&&``) so one failure doesn't mask the rest, and the overall status is the OR of the per-file exit codes. """ lines = ["fail=0"] for f in rel: lines += [ f'echo "=== FILE {f} ==="', f'lake env lean "{f}" 2>&1', "rc=$?", 'echo "=== EXIT $rc ==="', '[ "$rc" -ne 0 ] && fail=1', ] lines.append("exit $fail") return "\n".join(lines) @staticmethod def _parse_per_file(log: str, rel: list[str]) -> dict[str, bool]: # Walk the FILE/EXIT markers in order; a file passes iff its exit code is 0. per_file: dict[str, bool] = {} current: str | None = None for line in log.splitlines(): if line.startswith("=== FILE "): current = line[len("=== FILE ") : -len(" ===")] elif line.startswith("=== EXIT ") and current is not None: per_file[current] = line[len("=== EXIT ") : -len(" ===")].strip() == "0" current = None # Files with no recorded marker (shouldn't happen) default to failing. for f in rel: per_file.setdefault(f, False) return per_file @staticmethod def _parse_axioms(log: str) -> tuple[str, ...]: found: list[str] = [] for m in _AXIOMS_RE.finditer(log): found.extend(a.strip() for a in m.group(1).split(",") if a.strip()) return tuple(dict.fromkeys(found)) # dedupe, preserve order