Source code for open_atp.provers.numina

"""NuminaProver: a configured variant of :class:`AgentProver`.

Numina is, structurally, "Claude Code + a specific skills/prompts/search toolkit,
run in a multi-round loop in a sandbox." So rather than re-implement it, we extend
``AgentProver`` pinned to the ``claude_code`` harness with Numina's vendored assets
(``vendor/numina/skills`` + ``vendor/numina/prompts``), and add the two genuinely
different behaviours:

* a **round-continuation loop** -- re-invoke the agent while it reports it hit a
  limit rather than completing (Numina's ``END_REASON == LIMIT``); and
* the **statement tracker** -- guard against the agent deleting or weakening the
  theorems it was asked to prove (ported from ``scripts/statement_tracker.py``).

Numina's helper skills call Leandex / Gemini / GPT, so its config carries those API
keys to forward into the sandbox.

Re-implementation note (vs upstream ``scripts/runner.py``): each round here is a
fresh ``claude -p`` invocation over the *same* (bind-mounted, persistent) workdir,
not a ``claude -c continue`` against a live session. The DockerBackend launches a
new ``--rm`` container per round, so the Claude session does not survive between
rounds -- which means every round is effectively a "session reset", and the agent
resumes from the partial proof state left on disk. ``max_consecutive_limits`` is
still tracked (and surfaced in metadata) for parity/observability.
"""

from __future__ import annotations

import json
import re
import shutil
from pathlib import Path
from typing import Any, Literal

from open_atp.backends.base import ComputeBackend, ComputeSession
from open_atp.harness import (
    Harness,
    HarnessRunResult,
    compute_cost_usd,
)
from open_atp.harness._catalog import resolve_skill
from open_atp.harness._numina import _DEFAULT_HELPER_ENV_KEYS, NuminaHarness
from open_atp.harness._paths import _vendor_numina_dir
from open_atp.lean import LeanProject, ProofTask
from open_atp.provers.agent_prover import AgentProver
from open_atp.provers.base import ProofResult, compose_prompt
from open_atp.provers.numina_tracker import StatementTracker

# Directories never worth copying into the agent workdir (mirrors AgentProver).
_IGNORE = shutil.ignore_patterns(".lake", ".git", "*.tar.gz")

# Numina's coordinator prompt (main_entry.md) does not, by itself, emit the
# END_REASON marker the round loop keys off; append an explicit protocol so the
# loop can tell "done" from "out of budget". (We still fall back to Claude's result
# subtype when the marker is missing -- see ``_end_reason``.)
_END_REASON_PROTOCOL = """

---
SESSION CONTROL PROTOCOL (open-atp round loop)

When you finish this turn, the VERY LAST line of your final message MUST be
exactly one of the following, on its own line, with no surrounding markdown,
backticks, or trailing text:

  END_REASON:COMPLETE   -- every target theorem compiles and is sorry-free
  END_REASON:LIMIT      -- you made progress but ran out of turns/budget

If you emit END_REASON:LIMIT you will be re-invoked to continue from the proof
state currently on disk, so leave the project in the best state you can.
"""


def _coordinator_prompt() -> str:
    """Numina's prover prompt: the coordinator (main_entry.md) + the round protocol.

    The coordinator scaffold is a vendored asset (``vendor/numina/prompts``); the
    session-control protocol is appended so the round loop can tell "done" from "out
    of budget". This is the *prover* prompt -- the task's optional ``user_prompt`` is
    layered on top by :func:`~open_atp.provers.base.compose_prompt`.
    """
    main_entry = _vendor_numina_dir() / "prompts" / "main_entry.md"
    return main_entry.read_text() + _END_REASON_PROTOCOL


# Match END_REASON:<reason> on a line of its own (case-insensitive), per upstream.
_REASON_RE = re.compile(
    r"(?m)^\s*END_REASON:(LIMIT|COMPLETE|SELECTED_TARGET_COMPLETE)\s*$", re.I
)

_SORRY_RE = re.compile(r"\bsorry\b")

# Helper-LLM usage ledger: ``discussion_partner.py`` appends one JSON record per
# Gemini/GPT call here (workdir-relative, under ``.claude/`` next to ``cli.log``).
# ``prove()`` reads it after the run, prices the tokens, and bills the result into
# the run's ``cost_usd`` so discussion-partner spend is not lost. The path is the
# host-side mirror of the default in ``discussion_partner._record_usage``.
_HELPER_USAGE_FILE = Path(".claude") / "helper_usage.jsonl"


[docs] class NuminaProver(AgentProver): """Run the Numina coordinator/subagent scaffold as an :class:`AgentProver`. A specialization of :class:`AgentProver` wired to Numina's vendored scaffold (coordinator prompt + skills + subagent prompts, staged by :meth:`_stage_numina_assets`); generation and the shared :class:`~open_atp.verify.Verifier` work exactly as in the base agent prover. The harness is fixed to an internal ``NuminaHarness`` (Claude Code with no plugins, since Numina ships its own scaffold) and is *not* configurable -- Numina is claude-CLI driven, and :meth:`_stage_numina_assets` mounts the vendored scaffold straight into the known ``.claude/`` locations. Parameters ---------- backend : ComputeBackend The sandbox the agent runs in. Generation reuses it via a live session and verification runs in that same hot sandbox. skills : list[str], optional Extra named/path skills to mount alongside Numina's vendored scaffold. Default empty -- Numina's coordinator skill is staged from ``vendor/numina/skills``, not this list. max_rounds : int Maximum number of coordinator rounds before the run stops. Default ``20``. max_consecutive_limits : int Reset (start a fresh session) after this many consecutive LIMIT rounds. Default ``2``. helper_env_keys : tuple[str, ...] Helper-skill credentials forwarded into the sandbox when present in the host env; skills degrade/skip when their key is absent. Default :data:`_DEFAULT_HELPER_ENV_KEYS`. guard_statements : bool Whether to snapshot the target theorems and reject runs that weaken or delete them. Default ``True``. on_statement_change : {"error", "warn"} Behavior on a weakened/deleted target theorem: ``error`` stops the run and restores the originals; ``warn`` restores and continues. Default ``error`` (rejects; safe). timeout_s : int Wall-clock budget for the generation run, in seconds. Default ``1800``. env : dict[str, str], optional Extra literal environment variables forwarded into the agent sandbox (Numina pins its harness, so its env knobs live here). Default empty. Attributes ---------- prover_prompt : str The prover's own prompt -- Numina's coordinator scaffold plus the round protocol -- handed to the agent, before any user prompt. Examples -------- Construct the prover directly: >>> from open_atp.backends.docker import DockerBackend >>> from open_atp.provers.numina import NuminaProver >>> backend = DockerBackend() >>> prover = NuminaProver(backend=backend) >>> prover.max_rounds 20 Or build the same prover from the standard catalog by name, taking its baked-in defaults (see :func:`~open_atp.config.standard_prover`): >>> from open_atp import standard_prover >>> prover = standard_prover("numina", backend=DockerBackend()) >>> prover.name 'numina' Complete a task's ``sorry``\\s with :meth:`~open_atp.provers.base.AutomatedProver.prove`, here on a bundled example (this runs the Numina scaffold in Docker and bills it): >>> import tempfile >>> from open_atp.examples import EXAMPLE, example_task >>> task = example_task(EXAMPLE.INTER_UNION_DISTRIB) >>> result = prover.prove(task, tempfile.mkdtemp()) # doctest: +SKIP >>> result.success # doctest: +SKIP True """ name = "numina" def __init__( self, *, backend: ComputeBackend, skills: list[str] | None = None, max_rounds: int = 20, max_consecutive_limits: int = 2, helper_env_keys: tuple[str, ...] = _DEFAULT_HELPER_ENV_KEYS, guard_statements: bool = True, on_statement_change: Literal["error", "warn"] = "error", timeout_s: int = 1800, env: dict[str, str] | None = None, ) -> None: # The harness is pinned: Numina is claude_code-driven and not configurable. # NuminaHarness ships no plugins (Numina ships its own scaffold) and forwards # the helper-skill keys best-effort plus any literal env extras. Default skills # are empty -- the coordinator skill is staged from the vendored scaffold. super().__init__( backend=backend, harness=NuminaHarness(helper_env_keys=helper_env_keys, env=env), skills=skills if skills is not None else [], timeout_s=timeout_s, ) #: Maximum number of coordinator rounds before the run stops. self.max_rounds = max_rounds #: Reset (start a fresh session) after this many consecutive LIMIT rounds. self.max_consecutive_limits = max_consecutive_limits #: Helper-skill credentials forwarded into the sandbox when present. self.helper_env_keys = tuple(helper_env_keys) #: Whether to snapshot target theorems and reject weakened/deleted ones. self.guard_statements = guard_statements #: Behavior on a weakened/deleted target theorem (``error`` | ``warn``). self.on_statement_change = on_statement_change @property def prover_prompt(self) -> str: """Numina's coordinator scaffold + round protocol, before any user prompt.""" return _coordinator_prompt() def _generate( self, task: ProofTask, wd: Path, logs_dir: Path, result: ProofResult ) -> None: # 1. Stage the project so the workdir is a complete project to edit in place. shutil.copytree(task.project.root, wd, dirs_exist_ok=True, ignore=_IGNORE) # 2. Snapshot original .lean contents for the post-run diff. original = { p.relative_to(wd).as_posix(): p.read_text() for p in wd.rglob("*.lean") if ".lake" not in p.parts } # 3. Stage the workdir: the harness launch script, Numina's vendored scaffold # (coordinator skill + subagent prompts), any config skills, then the # coordinator prompt (+ the task's optional user prompt). harness = self.harness harness.stage_wd(wd) self._stage_numina_assets(wd) harness.stage_skills(wd, [resolve_skill(s) for s in self.skills]) harness.write_prompt(wd, compose_prompt(self.prover_prompt, task.user_prompt)) stdout_path = logs_dir / "stdout.txt" # 4. Statement-change guard: snapshot the target theorems before the run. tracked = self._tracked_files(task, wd) tracker: StatementTracker | None = None if self.guard_statements and tracked: tracker = StatementTracker(tracked) # 5. Round-continuation loop, with the final check, in one persistent sandbox: # generation and verification share the backend, so we pay neither a # per-round nor a separate verify spin-up. _, mounts = self._auth(harness) with self.verifier.backend.session( wd, mounts=mounts, timeout_s=self.timeout_s ) as session: loop = self._run_rounds(wd, harness, stdout_path, tracker, session=session) # Final pull so the host workdir (helper-usage ledger, completed files) # is current before pricing + diffing. self._download_wd(wd, session=session) self._finalize(result, wd, logs_dir, harness, loop, original) result.verification = self.verifier.verify(LeanProject(wd), session=session) def _stage_numina_assets(self, wd: Path) -> None: """Copy Numina's vendored scaffold into the workdir's ``.claude/`` locations. Numina is one root-mounted coordinator skill (top-level ``SKILL.md`` + ``cli/`` helpers) plus a subagent-prompt tree. Because the harness is pinned to Claude, the destinations are known: the skill's *contents* land at ``.claude/skills`` (so the top-level ``SKILL.md`` is at ``.claude/skills/SKILL.md``), and the whole prompt tree at ``.claude/prompts`` (where the coordinator prompt tells the agent to read its subagent prompts from ``.claude/prompts/subagent_prompts/``). """ root = _vendor_numina_dir() claude = wd / ".claude" shutil.copytree(root / "skills", claude / "skills", dirs_exist_ok=True) shutil.copytree(root / "prompts", claude / "prompts", dirs_exist_ok=True) def _finalize( self, result: ProofResult, wd: Path, logs_dir: Path, harness: Harness, loop: dict[str, Any], original: dict[str, str], ) -> None: """Price helper-LLM usage, diff the workdir, and fill the result.""" # Price the helper-LLM (discussion_partner) usage the agent racked up inside # the sandbox and fold it into the run's total cost. helper = self._helper_cost(wd) # Diff the workdir's .lean files against the staged originals. completed: dict[str, str] = {} for path in sorted(wd.rglob("*.lean")): if ".lake" in path.parts: continue rel = path.relative_to(wd).as_posix() content = path.read_text() if original.get(rel) != content: completed[rel] = content # Relocate harness logs (no-op for claude_code) and write captured stderr; the # streamed stdout was already teed live into ``logs_dir/stdout.txt``. self._download_logs(harness, wd, logs_dir, loop["stderr"]) result.completed_files = completed result.cost_usd = loop["total_cost_usd"] + helper["cost_usd"] result.metadata = { "harness": harness.name, "model": self.harness.model, "effort": self.harness.effort, "input_tokens": loop["input_tokens"], "output_tokens": loop["output_tokens"], # cost_usd above bundles agent + helper; keep the split visible. "agent_cost_usd": loop["total_cost_usd"], "helper_cost_usd": helper["cost_usd"], "helper_tokens": { "input": helper["input_tokens"], "output": helper["output_tokens"], }, "helper_breakdown": helper["breakdown"], "helper_unpriced_models": helper["unpriced_models"], "rounds": loop["rounds"], "end_reason": loop["end_reason"], "session_resets": loop["session_resets"], "guard_statements": self.guard_statements, "statement_changed": loop["statement_changed"], "statement_changes": loop["statement_changes"], "round_history": loop["round_history"], } # -- round loop ----------------------------------------------------------- def _run_rounds( self, workdir: Path, harness: Harness, stdout_path: Path, tracker: StatementTracker | None, session: ComputeSession, ) -> dict[str, Any]: """Drive the agent for up to ``max_rounds``, continuing while it reports LIMIT. Each round's stdout is teed live into ``stdout_path`` (appended, so the file is the full multi-round transcript). Returns an accumulator dict (token totals, cost, per-round history, final end reason, statement-guard outcome). Every round execs in the one persistent ``session``. We ``sync_out`` after each round so the host-side statement tracker reads the round's result, and ``sync_in`` after a restore so the sandbox picks it up for the next round (both no-ops on bind-mounted Docker). """ stderrs: list[str] = [] round_history: list[dict[str, Any]] = [] total_cost = 0.0 input_tokens = 0 output_tokens = 0 consecutive_limits = 0 session_resets = 0 end_reason: str | None = None statement_changed = False statement_changes: list[str] = [] for round_num in range(1, self.max_rounds + 1): # A fresh session is started whenever we have hit too many consecutive # LIMITs (and, in this backend, every round -- see module docstring). if consecutive_limits >= self.max_consecutive_limits: consecutive_limits = 0 session_resets += 1 round_lines, round_stderr = self._run_agent( workdir, harness, stdout_path, session=session ) if round_stderr: stderrs.append(round_stderr) # Pull the round's edits to the host so the statement tracker (and the # next round's snapshot) see them (no-op on bind-mounted Docker). session.sync_out() parsed = harness.parse_result(round_lines) input_tokens += parsed.input_tokens output_tokens += parsed.output_tokens round_cost = parsed.cost_usd if round_cost is None: round_cost = ( compute_cost_usd( self.harness.model, parsed.input_tokens, parsed.output_tokens, ) or 0.0 ) total_cost += round_cost end_reason = self._end_reason(parsed) record: dict[str, Any] = { "round": round_num, "end_reason": end_reason, "subtype": parsed.subtype, "input_tokens": parsed.input_tokens, "output_tokens": parsed.output_tokens, "cost_usd": round_cost, } # Statement-change guard. if tracker is not None: ok, changes = tracker.check_initial_statements() if not ok: statement_changed = True these = [str(c) for c in changes] statement_changes.extend(these) record["statement_changes"] = these tracker.restore_initial_statements(changes) # Push the restored statements back so the sandbox reflects them # for the next round (no-op on bind-mounted Docker). session.sync_in() if self.on_statement_change == "error": end_reason = "STATEMENT_CHANGED" record["end_reason"] = end_reason round_history.append(record) break round_history.append(record) if end_reason == "COMPLETE": break if end_reason == "LIMIT": consecutive_limits += 1 else: # None / SELECTED_TARGET_COMPLETE: keep going from a clean streak. consecutive_limits = 0 return { "stderr": "\n".join(stderrs), "round_history": round_history, "rounds": len(round_history), "total_cost_usd": total_cost, "input_tokens": input_tokens, "output_tokens": output_tokens, "session_resets": session_resets, "end_reason": end_reason, "statement_changed": statement_changed, "statement_changes": statement_changes, } @staticmethod def _end_reason(parsed: HarnessRunResult) -> str | None: """Resolve a round's end reason from the agent's final result. Prefers the explicit ``END_REASON:<reason>`` marker; falls back to Claude's result subtype (``success`` -> COMPLETE, ``error_max_turns`` -> LIMIT). """ if parsed.result_text: m = _REASON_RE.search(parsed.result_text) if m: return m.group(1).upper() if parsed.subtype == "success": return "COMPLETE" if parsed.subtype == "error_max_turns": return "LIMIT" return None # -- helpers -------------------------------------------------------------- def _helper_cost(self, workdir: Path) -> dict[str, Any]: """Price the discussion-partner usage ledger into USD. Reads the JSONL ledger ``discussion_partner.py`` appended inside the sandbox (one record per Gemini/GPT call, accumulated across rounds in the persistent workdir) and converts the tokens via :func:`compute_cost_usd`. Records whose model is absent from the price table contribute ``0.0`` but their tokens are still summed and the model name flagged in ``unpriced_models`` -- so an unpriced helper is visible, not silently free. Returns total cost, token totals, a per-``backend:model`` breakdown, and the unpriced-model list. """ path = workdir / _HELPER_USAGE_FILE total = 0.0 input_tokens = 0 output_tokens = 0 unpriced: list[str] = [] breakdown: dict[str, dict[str, Any]] = {} if path.is_file(): for line in path.read_text().splitlines(): line = line.strip() if not line: continue try: rec = json.loads(line) except json.JSONDecodeError: continue model = str(rec.get("model", "")) it = int(rec.get("input_tokens", 0) or 0) ot = int(rec.get("output_tokens", 0) or 0) input_tokens += it output_tokens += ot cost = compute_cost_usd(model, it, ot) if cost is None: if model not in unpriced: unpriced.append(model) cost = 0.0 total += cost key = f"{rec.get('backend', '?')}:{model}" agg = breakdown.setdefault( key, { "calls": 0, "input_tokens": 0, "output_tokens": 0, "cost_usd": 0.0, }, ) agg["calls"] += 1 agg["input_tokens"] += it agg["output_tokens"] += ot agg["cost_usd"] += cost return { "cost_usd": total, "input_tokens": input_tokens, "output_tokens": output_tokens, "unpriced_models": unpriced, "breakdown": breakdown, } def _tracked_files(self, task: ProofTask, workdir: Path) -> list[Path]: """The workdir ``.lean`` files whose statements the guard should protect.""" if task.targets: return [f for f in (workdir / t for t in task.targets) if f.is_file()] return [ p for p in sorted(workdir.rglob("*.lean")) if ".lake" not in p.parts and _SORRY_RE.search(p.read_text()) ]