Source code for understudy.check

"""Check: validate a trace against scene expectations."""

from dataclasses import dataclass, field
from pathlib import Path
from typing import Any

from .batch import BatchExecutor
from .metrics import MetricRegistry, MetricResult
from .models import Expectations
from .trace import Trace


[docs] @dataclass class CheckResult: """Result of checking a trace against expectations.""" checks: list["CheckItem"] = field(default_factory=list) metrics: dict[str, MetricResult] = field(default_factory=dict) @property def passed(self) -> bool: checks_passed = all(c.passed for c in self.checks) metrics_passed = all(m.passed for m in self.metrics.values() if m.passed is not None) return checks_passed and metrics_passed @property def failed_checks(self) -> list["CheckItem"]: return [c for c in self.checks if not c.passed] @property def failed_metrics(self) -> list[MetricResult]: return [m for m in self.metrics.values() if m.passed is False]
[docs] def metric(self, name: str) -> MetricResult | None: return self.metrics.get(name)
[docs] def summary(self) -> str: lines = [] for c in self.checks: mark = "✓" if c.passed else "✗" lines.append(f" {mark} {c.label}: {c.detail}") for name, m in self.metrics.items(): if m.passed is not None: mark = "✓" if m.passed else "✗" lines.append(f" {mark} {name}: {m.detail}") else: lines.append(f" • {name}: {m.value}") return "\n".join(lines)
def __repr__(self) -> str: n_checks = len(self.checks) n_pass = sum(1 for c in self.checks if c.passed) n_metrics = len(self.metrics) if n_metrics: return f"CheckResult({n_pass}/{n_checks} checks, {n_metrics} metrics)" return f"CheckResult({n_pass}/{n_checks} passed)"
@dataclass class CheckItem: """A single check result.""" label: str passed: bool detail: str
[docs] def check(trace: Trace, expectations: Expectations) -> CheckResult: """Validate a trace against expectations. Args: trace: The execution trace from a rehearsal. expectations: The expectations from a scene. Returns: A CheckResult with individual check outcomes. """ result = CheckResult() called_tools = set(trace.call_sequence()) # required tools for tool in expectations.required_tools: result.checks.append( CheckItem( label="required_tool", passed=tool in called_tools, detail=f"{tool} {'called' if tool in called_tools else 'NOT called'}", ) ) # forbidden tools for tool in expectations.forbidden_tools: was_called = tool in called_tools result.checks.append( CheckItem( label="forbidden_tool", passed=not was_called, detail=f"{tool} {'CALLED (violation)' if was_called else 'not called'}", ) ) # required agents invoked_agents = set(trace.agents_invoked()) for agent in expectations.required_agents: result.checks.append( CheckItem( label="required_agent", passed=agent in invoked_agents, detail=f"{agent} {'invoked' if agent in invoked_agents else 'NOT invoked'}", ) ) # forbidden agents for agent in expectations.forbidden_agents: was_invoked = agent in invoked_agents result.checks.append( CheckItem( label="forbidden_agent", passed=not was_invoked, detail=f"{agent} {'INVOKED (violation)' if was_invoked else 'not invoked'}", ) ) # required agent tools for agent, tools in expectations.required_agent_tools.items(): for tool in tools: called = trace.agent_called(agent, tool) result.checks.append( CheckItem( label="required_agent_tool", passed=called, detail=f"{agent}.{tool} {'called' if called else 'NOT called'}", ) ) # expected resolution check if expectations.expected_resolution: passed = trace.terminal_state == expectations.expected_resolution expected = expectations.expected_resolution actual = trace.terminal_state result.checks.append( CheckItem( label="expected_resolution", passed=passed, detail=f"expected={expected}, actual={actual}", ) ) # compute metrics if expectations.metrics: result.metrics = MetricRegistry.compute_all(expectations.metrics, trace, expectations) return result
def evaluate( trace: Trace, expectations: Expectations, metrics: list[str] | None = None, judge_model: str | None = None, judges: dict[str, Any] | None = None, ) -> CheckResult: """Evaluate a trace against expectations with optional judge evaluations. Extends check() with support for overriding metrics and adding LLM judge evaluations. Args: trace: The execution trace to evaluate. expectations: The expectations to check against. metrics: Override metrics to compute (defaults to scene's metrics). judge_model: Optional LLM model for judge evaluations. judges: Optional pre-configured judge objects (dict of name -> Judge). Returns: A CheckResult with check outcomes, metrics, and judge results. """ if metrics: expectations = Expectations( required_tools=expectations.required_tools, forbidden_tools=expectations.forbidden_tools, required_agents=expectations.required_agents, forbidden_agents=expectations.forbidden_agents, required_agent_tools=expectations.required_agent_tools, expected_resolution=expectations.expected_resolution, metrics=metrics, ) result = check(trace, expectations) if judge_model and judges: for name, judge in judges.items(): judge_result = judge.evaluate(trace) result.metrics[f"judge_{name}"] = MetricResult( name=f"judge_{name}", value={"score": judge_result.score, "agreement_rate": judge_result.agreement_rate}, passed=judge_result.score == 1, detail=judge_result.reasoning or "", ) return result @dataclass class EvaluationResult: """Result of evaluating a single trace.""" trace_id: str check_result: CheckResult error: str | None = None @property def passed(self) -> bool: return self.error is None and self.check_result.passed @dataclass class _EvaluationTask: """Internal task descriptor for batch evaluation.""" trace_id: str trace: Trace expectations: Expectations class _EvaluationExecutor(BatchExecutor[_EvaluationTask, EvaluationResult]): """Executor for running evaluations in parallel.""" def __init__( self, metrics: list[str] | None, judge_model: str | None, result_storage: Any, parallel: int = 1, ): super().__init__(parallel) self.metrics = metrics self.judge_model = judge_model self.result_storage = result_storage def execute_one(self, item: _EvaluationTask) -> EvaluationResult: try: check_result = evaluate( trace=item.trace, expectations=item.expectations, metrics=self.metrics, judge_model=self.judge_model, ) if self.result_storage: self.result_storage.save(trace_id=item.trace_id, check_result=check_result) return EvaluationResult(trace_id=item.trace_id, check_result=check_result) except Exception as e: import traceback return EvaluationResult( trace_id=item.trace_id, check_result=CheckResult(), error=f"{type(e).__name__}: {e}\n{traceback.format_exc()}", ) def evaluate_batch( traces: list[Trace] | str | Path, expectations: Expectations | None = None, output: str | Path | None = None, judge_model: str | None = None, metrics: list[str] | None = None, parallel: int = 1, ) -> list[EvaluationResult]: """Evaluate multiple traces and return results. Args: traces: List of Trace objects, or path to trace file/directory. expectations: Expectations to evaluate against (if None, loads from trace metadata). output: Optional path to save evaluation results. judge_model: Optional LLM model for judge evaluations. metrics: Override metrics to compute. parallel: Number of parallel evaluation threads. Returns: List of EvaluationResult objects. """ from .storage import EvaluationStorage, TraceStorage tasks: list[_EvaluationTask] = [] if isinstance(traces, (str, Path)): path = Path(traces) storage = TraceStorage(path=path) for trace_id in storage.list_traces(): data = storage.load(trace_id) trace = data["trace"] scene = data["scene"] exp = expectations if expectations else scene.expectations tasks.append(_EvaluationTask(trace_id=trace_id, trace=trace, expectations=exp)) else: for i, trace in enumerate(traces): trace_id = f"trace_{i}" exp = expectations if expectations else Expectations() tasks.append(_EvaluationTask(trace_id=trace_id, trace=trace, expectations=exp)) result_storage = None if output: result_storage = EvaluationStorage(path=Path(output)) executor = _EvaluationExecutor( metrics=metrics, judge_model=judge_model, result_storage=result_storage, parallel=parallel, ) return executor.run(tasks)