diff --git a/packages/nemo_evaluator_sdk/examples/run_agent_eval/example_metrics.py b/packages/nemo_evaluator_sdk/examples/run_agent_eval/example_metrics.py index 9e6118dedf..47f3d2ec6f 100644 --- a/packages/nemo_evaluator_sdk/examples/run_agent_eval/example_metrics.py +++ b/packages/nemo_evaluator_sdk/examples/run_agent_eval/example_metrics.py @@ -3,19 +3,22 @@ """Reference metrics-over-evidence for this example (not SDK API). -These show how to score from the SDK's filesystem evidence handle instead of a -stamped verifier reward: +These show how to score from the SDK's evidence handles instead of a stamped +verifier reward: * :class:`TestsPassMetric` runs a command against ``final_state`` filesystem evidence (in a throwaway overlay) and scores on exit 0. * :class:`NoTestCheatingMetric` diffs ``initial_state`` against ``final_state`` and fails if the agent touched protected (e.g. test) paths. +* :class:`InefficientRetryLoopMetric` reads the normalized ``trace`` and fails + when the same tool call repeats past a threshold. """ from __future__ import annotations from collections.abc import Sequence +from nemo_evaluator_sdk.agent_eval.trials import EVIDENCE_FINAL_STATE, EVIDENCE_INITIAL_STATE, EVIDENCE_TRACE from nemo_evaluator_sdk.metrics.protocol import MetricInput, MetricOutput, MetricOutputSpec, MetricResult @@ -26,7 +29,7 @@ def __init__( self, command: Sequence[str], *, - evidence_name: str = "final_state", + evidence_name: str = EVIDENCE_FINAL_STATE, cwd: str = ".", timeout_s: float = 300.0, ) -> None: @@ -60,8 +63,8 @@ def __init__( *, protected: Sequence[str] = ("tests/",), change_types: Sequence[str] = ("added", "modified", "deleted"), - initial_name: str = "initial_state", - final_name: str = "final_state", + initial_name: str = EVIDENCE_INITIAL_STATE, + final_name: str = EVIDENCE_FINAL_STATE, ) -> None: self._protected = tuple(protected) self._change_types = set(change_types) @@ -87,3 +90,38 @@ async def compute_scores(self, input: MetricInput) -> MetricResult: ] clean = not violations return MetricResult(outputs=[MetricOutput(name="no_test_cheating", value=clean)]) + + +class InefficientRetryLoopMetric: + """Score ``False`` when the same tool call repeats more than ``threshold`` times.""" + + def __init__(self, *, threshold: int = 2, evidence_name: str = EVIDENCE_TRACE) -> None: + self._threshold = threshold + self._evidence_name = evidence_name + + @property + def type(self) -> str: + return "inefficient_retry_loop" + + def output_spec(self) -> list[MetricOutputSpec]: + return [ + MetricOutputSpec.boolean("efficient_tool_use"), + MetricOutputSpec.discrete_score("max_repeated_tool_calls"), + ] + + async def compute_scores(self, input: MetricInput) -> MetricResult: + max_repeats = 0 + evidence = input.candidate.evidence + if evidence is not None and evidence.get(self._evidence_name) is not None: + calls = await (await evidence.trace(self._evidence_name)).tool_calls() + counts: dict[str, int] = {} + for call in calls: + key = f"{call.function_name}:{sorted((call.arguments or {}).items())}" + counts[key] = counts.get(key, 0) + 1 + max_repeats = max(counts.values(), default=0) + return MetricResult( + outputs=[ + MetricOutput(name="efficient_tool_use", value=max_repeats <= self._threshold), + MetricOutput(name="max_repeated_tool_calls", value=max_repeats), + ] + ) diff --git a/packages/nemo_evaluator_sdk/src/nemo_evaluator_sdk/agent_eval/evaluator.py b/packages/nemo_evaluator_sdk/src/nemo_evaluator_sdk/agent_eval/evaluator.py index b6654fb230..15139f440e 100644 --- a/packages/nemo_evaluator_sdk/src/nemo_evaluator_sdk/agent_eval/evaluator.py +++ b/packages/nemo_evaluator_sdk/src/nemo_evaluator_sdk/agent_eval/evaluator.py @@ -44,7 +44,7 @@ from nemo_evaluator_sdk.metrics.protocol import Metric, validate_metric_result from nemo_evaluator_sdk.metrics.utils import metric_type_name from nemo_evaluator_sdk.values import Agent, Model, RunConfig, RunConfigOnline, RunConfigOnlineModel -from nemo_evaluator_sdk.values.evidence import CandidateEvidence, EvidenceDescriptor +from nemo_evaluator_sdk.values.evidence import CandidateEvidence, EvidenceDescriptor, normalize_trace_descriptor from openai import AsyncOpenAI log = getLogger(__name__) @@ -327,7 +327,9 @@ def _trial_from_sample(task: AgentEvalTask, target: Model | Agent, sample: dict[ # trial stays scorable instead of being dropped as empty output. output_text = _reasoning_content_fallback(sample.get("response")) if "trajectory" in sample: - trace = EvidenceDescriptor(kind="trace", format="json", data=sample["trajectory"]) + # Normalize to ATIF before the trial is persisted so the stored shape is + # source-agnostic (sources in, ATIF out); TraceHandle then reads it uniformly. + trace = normalize_trace_descriptor(EvidenceDescriptor(kind="trace", format="json", data=sample["trajectory"])) else: trace = EvidenceDescriptor(kind="sdk_online_generation", data={"task_id": task.id, "target": target.name}) diff --git a/packages/nemo_evaluator_sdk/src/nemo_evaluator_sdk/agent_eval/trials.py b/packages/nemo_evaluator_sdk/src/nemo_evaluator_sdk/agent_eval/trials.py index 187b431782..2cae14dbd8 100644 --- a/packages/nemo_evaluator_sdk/src/nemo_evaluator_sdk/agent_eval/trials.py +++ b/packages/nemo_evaluator_sdk/src/nemo_evaluator_sdk/agent_eval/trials.py @@ -14,7 +14,7 @@ from nemo_evaluator_sdk.agent_eval.tasks import AgentEvalRunConfig, AgentEvalTask from nemo_evaluator_sdk.values import Agent, Model -from nemo_evaluator_sdk.values.evidence import CandidateEvidence, EvidenceDescriptor +from nemo_evaluator_sdk.values.evidence import CandidateEvidence, EvidenceDescriptor, normalize_trace_descriptor from pydantic import BaseModel, ConfigDict, Field, field_validator, model_validator # Well-known evidence keys produced by ``standard_evidence_descriptors``. Harness @@ -158,10 +158,14 @@ def standard_evidence_descriptors( if trace_path is not None: trace_name = Path(trace_path).name.lower() is_atif = trace_name.startswith("atif") or ".atif." in trace_name - descriptors[EVIDENCE_TRACE] = EvidenceDescriptor( - kind="trace", - format="atif" if is_atif else "json", - ref=str(trace_path), + # Normalize the source trace into a sibling ATIF file before persistence so the + # stored descriptor is ATIF regardless of producer (no-op if already ATIF/missing). + descriptors[EVIDENCE_TRACE] = normalize_trace_descriptor( + EvidenceDescriptor( + kind="trace", + format="atif" if is_atif else "json", + ref=str(trace_path), + ) ) logs_metadata = {"primary_log": primary_log} if primary_log else {} diff --git a/packages/nemo_evaluator_sdk/src/nemo_evaluator_sdk/values/__init__.py b/packages/nemo_evaluator_sdk/src/nemo_evaluator_sdk/values/__init__.py index 9c4e11eb7e..2bee423d95 100644 --- a/packages/nemo_evaluator_sdk/src/nemo_evaluator_sdk/values/__init__.py +++ b/packages/nemo_evaluator_sdk/src/nemo_evaluator_sdk/values/__init__.py @@ -4,6 +4,15 @@ """Public value types for evaluator SDK runtime.""" from nemo_evaluator_sdk.values.agents import Agent +from nemo_evaluator_sdk.values.atif import ( + FinalMetrics, + Metrics, + Observation, + ObservationResult, + Step, + ToolCall, + Trajectory, +) from nemo_evaluator_sdk.values.common import SecretRef, SupportedJobTypes from nemo_evaluator_sdk.values.dataset_schemas import ( FieldMapping, @@ -17,6 +26,11 @@ FilesystemDiff, FilesystemEntry, LocalFilesystemEvidence, + LogHandle, + TraceHandle, + WellKnownEvidenceKey, + normalize_candidate_evidence, + normalize_trace_descriptor, ) from nemo_evaluator_sdk.values.metrics import ( BLEU, @@ -109,6 +123,18 @@ "ContinuousScore", "FilesystemDiff", "FilesystemEntry", + "FinalMetrics", + "LogHandle", + "Metrics", + "Observation", + "ObservationResult", + "Step", + "ToolCall", + "Trajectory", + "TraceHandle", + "WellKnownEvidenceKey", + "normalize_candidate_evidence", + "normalize_trace_descriptor", "DatasetRow", "DatasetRows", "DefaultAggregateFieldName", diff --git a/packages/nemo_evaluator_sdk/src/nemo_evaluator_sdk/values/atif.py b/packages/nemo_evaluator_sdk/src/nemo_evaluator_sdk/values/atif.py new file mode 100644 index 0000000000..4387b565b5 --- /dev/null +++ b/packages/nemo_evaluator_sdk/src/nemo_evaluator_sdk/values/atif.py @@ -0,0 +1,639 @@ +# SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +"""Agent Trajectory Interchange Format (ATIF) models. + +Vendored, byte-for-byte from the Harbor reference implementation +(``harbor.models.trajectories``, Harbor v0.15.0) so the eval SDK persists traces +in the canonical ATIF schema without taking a runtime dependency on the Harbor +framework (which pulls in litellm/datasets/fastapi/...). The authoritative spec is +RFC 0001 "Agent Trajectory Interchange Format (ATIF)" (schema_version ATIF-v1.7). + +Keep this module in lockstep with the upstream models: when Harbor bumps the ATIF +schema, re-vendor rather than editing the field set locally. The only change from +upstream is collapsing the per-file modules into this single file and ordering the +classes by dependency. +""" + +from datetime import datetime +from typing import Any, Literal + +from pydantic import BaseModel, Field, field_validator, model_validator + + +class ImageSource(BaseModel): + """Image source specification for images stored as files or at remote URLs.""" + + media_type: Literal["image/jpeg", "image/png", "image/gif", "image/webp"] = Field( + default=..., + description="MIME type of the image", + ) + path: str = Field( + default=..., + description="Location of the image. Can be a relative or absolute file path, or a URL.", + ) + + model_config = {"extra": "forbid"} + + +class ContentPart(BaseModel): + """A single content part within a multimodal message. + + Used when a message or observation contains mixed content types (text and images). + For text-only content, a plain string can still be used instead of a ContentPart array. + """ + + type: Literal["text", "image"] = Field( + default=..., + description="The type of content", + ) + text: str | None = Field( + default=None, + description="Text content. Required when type='text'.", + ) + source: ImageSource | None = Field( + default=None, + description="Image source (file reference). Required when type='image'.", + ) + + model_config = {"extra": "forbid"} + + @model_validator(mode="after") + def validate_content_type(self) -> "ContentPart": + """Validate that the correct fields are present for each content type.""" + if self.type == "text": + if self.text is None: + raise ValueError("'text' field is required when type='text'") + if self.source is not None: + raise ValueError("'source' field is not allowed when type='text'") + elif self.type == "image": + if self.source is None: + raise ValueError("'source' field is required when type='image'") + if self.text is not None: + raise ValueError("'text' field is not allowed when type='image'") + return self + + +class SubagentTrajectoryRef(BaseModel): + """Reference to a delegated subagent trajectory. + + A subagent reference is resolved by one of two mechanisms: + + 1. **Embedded form** — set `trajectory_id` to match the + `Trajectory.trajectory_id` of an entry in the parent's + `subagent_trajectories` array. + 2. **File-ref form** — set `trajectory_path` to the location + (file path, S3 URL, etc.) of an external trajectory file. + + These two mechanisms are the only resolution keys. `session_id`, when + present on the ref, is **informational only**: it records the run + identity of the delegated subagent for debug / correlation / cross- + trajectory search purposes, and MUST NOT be used as a matching key + (it is run-scoped and MAY collide across siblings — see + `Trajectory.session_id`). A ref therefore MUST set at least one of + `trajectory_id` or `trajectory_path`; `session_id` alone is not a + resolvable reference. + """ + + trajectory_id: str | None = Field( + default=None, + description=( + "Canonical identifier of the delegated subagent trajectory. " + "Matches `Trajectory.trajectory_id` of an entry in the parent's " + "`subagent_trajectories` array and is the resolution key for " + "embedded references. Added in ATIF-v1.7 to provide a document-" + "unique matching key without overloading `session_id`." + ), + ) + session_id: str | None = Field( + default=None, + description=( + "Run identity of the delegated subagent trajectory. Informational " + "only: recorded so consumers can correlate this ref back to the " + "subagent's run for debug / search / display purposes. Run-scoped " + "(see `Trajectory.session_id`) and therefore NOT a valid " + "resolution key — consumers MUST resolve via `trajectory_id` " + "(embedded) or `trajectory_path` (external file)." + ), + ) + trajectory_path: str | None = Field( + default=None, + description=( + "Location of the complete subagent trajectory as an external " + "file (file path, S3 URL, database reference, etc.). Resolution " + "key for file-ref references. When both `trajectory_id` and " + "`trajectory_path` are set, consumers MAY choose either; " + "typically `trajectory_id` is preferred when the embedded " + "trajectory is available in-memory." + ), + ) + extra: dict[str, Any] | None = Field( + default=None, + description="Custom metadata about the subagent execution", + ) + + model_config = {"extra": "forbid"} + + @model_validator(mode="after") + def validate_is_resolvable(self) -> "SubagentTrajectoryRef": + """A ref must be resolvable: set `trajectory_id` or `trajectory_path`. + + `session_id` alone is not sufficient because it is run-scoped and + MAY collide across siblings (see `Trajectory.session_id`), so it + cannot unambiguously identify which subagent trajectory a ref + points at. + """ + if self.trajectory_id is None and self.trajectory_path is None: + raise ValueError( + "SubagentTrajectoryRef must be resolvable: set either " + "`trajectory_id` (for embedded references) or " + "`trajectory_path` (for external-file references). " + "`session_id` alone is not a resolution key — it is " + "run-scoped and may collide across siblings." + ) + return self + + +class ToolCall(BaseModel): + """A tool call within a step.""" + + tool_call_id: str = Field( + default=..., + description="Unique identifier for this specific tool call", + ) + function_name: str = Field( + default=..., + description="The name of the function or tool being invoked", + ) + arguments: dict[str, Any] = Field( + default=..., + description="Arguments passed to the function (can be empty dict)", + ) + extra: dict[str, Any] | None = Field( + default=None, + description=("Custom tool-call-level metadata (e.g., timeout, retry count, tool version). Added in ATIF-v1.7."), + ) + + model_config = {"extra": "forbid"} + + +class ObservationResult(BaseModel): + """A single result within an observation.""" + + source_call_id: str | None = Field( + default=None, + description=( + "The `tool_call_id` from the _tool_calls_ array in _StepObject_ that this " + "result corresponds to. If null or omitted, the result comes from an " + "action that doesn't use the standard tool calling format (e.g., agent " + "actions without tool calls or system-initiated operations)." + ), + ) + content: str | list[ContentPart] | None = Field( + default=None, + description=( + "The output or result from the tool execution. String for text-only " + "content, or array of ContentPart for multimodal content (added in ATIF-v1.6)." + ), + ) + subagent_trajectory_ref: list[SubagentTrajectoryRef] | None = Field( + default=None, + description="Array of references to delegated subagent trajectories", + ) + extra: dict[str, Any] | None = Field( + default=None, + description=( + "Custom observation-result-level metadata (e.g., confidence score, " + "retrieval score, source document ID). Added in ATIF-v1.7." + ), + ) + + model_config = {"extra": "forbid"} + + +class Observation(BaseModel): + """Environment feedback/result after actions or system events.""" + + results: list[ObservationResult] = Field( + default=..., + description="Array of result objects from tool calls or actions", + ) + + model_config = {"extra": "forbid"} + + +class Metrics(BaseModel): + """LLM operational and confidence data.""" + + prompt_tokens: int | None = Field( + default=None, + description="Total input tokens including cached and non-cached", + ) + completion_tokens: int | None = Field( + default=None, + description="Total tokens generated by the LLM response", + ) + cached_tokens: int | None = Field( + default=None, + description="Subset of prompt_tokens that were cache hits", + ) + cost_usd: float | None = Field( + default=None, + description="Monetary cost of the API call in USD", + ) + prompt_token_ids: list[int] | None = Field( + default=None, + description="Token IDs for prompt (input) tokens sent to the LLM, including chat history (if applicable)", + ) + completion_token_ids: list[int] | None = Field( + default=None, + description="Token IDs for completion (response) tokens, enabling RL training without retokenization drift", + ) + logprobs: list[float] | None = Field( + default=None, + description="Log probability assigned to each generated token", + ) + extra: dict[str, Any] | None = Field( + default=None, + description="Other metrics", + ) + + model_config = {"extra": "forbid"} + + +class FinalMetrics(BaseModel): + """Aggregate statistics for the entire trajectory.""" + + total_prompt_tokens: int | None = Field( + default=None, + description="Sum of all prompt tokens across all steps, including cached tokens", + ) + total_completion_tokens: int | None = Field( + default=None, + description="Sum of all completion tokens across all steps", + ) + total_cached_tokens: int | None = Field( + default=None, + description="Sum of all cached tokens across all steps", + ) + total_cost_usd: float | None = Field( + default=None, + description="Total real monetary cost for the entire trajectory, including cost for subagents, if any", + ) + total_steps: int | None = Field( + default=None, + ge=0, + description=( + "Total number of steps. If not equivalent to the number of steps in the " + "trajectory, must be documented in the root-level notes field." + ), + ) + extra: dict[str, Any] | None = Field( + default=None, + description="Custom aggregate metrics", + ) + + model_config = {"extra": "forbid"} + + +class Agent(BaseModel): + """Agent configuration.""" + + name: str = Field( + default=..., + description="The name of the agent system", + ) + version: str = Field( + default=..., + description="The version identifier of the agent system", + ) + model_name: str | None = Field( + default=None, + description="Default LLM model used for this trajectory", + ) + tool_definitions: list[dict[str, Any]] | None = Field( + default=None, + description=( + "Array of tool/function definitions available to the agent. " + "Each element follows OpenAI's function calling schema." + ), + ) + extra: dict[str, Any] | None = Field( + default=None, + description="Custom agent configuration details", + ) + + model_config = {"extra": "forbid"} + + +class Step(BaseModel): + """A single step in the trajectory.""" + + step_id: int = Field( + default=..., + ge=1, + description="Ordinal index of the turn (starting from 1)", + ) + timestamp: str | None = Field( + default=None, + description="ISO 8601 timestamp indicating when this step occurred", + ) + source: Literal["system", "user", "agent"] = Field( + default=..., + description="The originator of this step", + ) + model_name: str | None = Field( + default=None, + description=( + "The specific LLM model used for this turn. Omission implies the model " + "defined in the root-level agent config." + ), + ) + reasoning_effort: str | float | None = Field( + default=None, + description="Qualitative or quantitative measure of effort", + ) + message: str | list[ContentPart] = Field( + default=..., + description=( + "The dialogue message. String for text-only content, or array of " + "ContentPart for multimodal content (added in ATIF-v1.6)." + ), + ) + reasoning_content: str | None = Field( + default=None, + description="The agent's explicit internal reasoning", + ) + tool_calls: list[ToolCall] | None = Field( + default=None, + description="Array of structured objects for the agent's actions", + ) + observation: Observation | None = Field( + default=None, + description="Environment feedback/result after actions or system events", + ) + metrics: Metrics | None = Field( + default=None, + description="LLM operational and confidence data for this step", + ) + is_copied_context: bool | None = Field( + default=None, + description=( + "Indicates whether this step was copied from a previous trajectory " + "for context (e.g., during continuation after summarization). " + "Steps marked as copied context should not be included in training data " + "as they represent previously-trained interactions. " + "Added in ATIF-v1.5." + ), + ) + llm_call_count: int | None = Field( + default=None, + ge=0, + description=( + "Number of LLM inferences this step represents. When >1, metrics are " + "aggregated across multiple LLM calls. When 1, the step represents exactly " + "one inference. When 0 on a `source: 'agent'` step, the step represents a " + "deterministic (non-LLM) dispatch; `metrics` and `reasoning_content` MUST " + "be absent in that case. When null, the producer did not track this " + "(backward-compatible default). Added in ATIF-v1.7." + ), + ) + extra: dict[str, Any] | None = Field( + default=None, + description="Custom step-level metadata", + ) + + model_config = {"extra": "forbid"} + + @field_validator("timestamp") + @classmethod + def validate_timestamp(cls, v: str | None) -> str | None: + """Validate that timestamp is a valid ISO 8601 string.""" + if v is not None: + try: + datetime.fromisoformat(v.replace("Z", "+00:00")) + except ValueError as e: + raise ValueError(f"Invalid ISO 8601 timestamp: {e}") from e + return v + + @model_validator(mode="after") + def validate_agent_only_fields(self) -> "Step": + """Validate that certain fields are only present for agent steps.""" + if self.source != "agent": + agent_only_fields = [ + "model_name", + "reasoning_effort", + "reasoning_content", + "tool_calls", + "metrics", + ] + for field in agent_only_fields: + if getattr(self, field) is not None: + raise ValueError( + f"Field '{field}' is only applicable when source is 'agent', but source is '{self.source}'" + ) + return self + + @model_validator(mode="after") + def validate_llm_call_count_zero_fields(self) -> "Step": + """Enforce ATIF v1.7 no-LLM orchestration rule. + + When ``llm_call_count == 0`` on a ``source: "agent"`` step, the step + represents a deterministic (non-LLM) dispatch. LLM-specific fields + (``metrics``, ``reasoning_content``) MUST be absent on such steps. + """ + if self.llm_call_count == 0 and self.source == "agent": + llm_only_fields = ["metrics", "reasoning_content"] + for field in llm_only_fields: + if getattr(self, field) is not None: + raise ValueError( + f"Field '{field}' must be absent when llm_call_count is 0 " + f"(deterministic dispatch on a 'source: agent' step)" + ) + return self + + +class Trajectory(BaseModel): + """Agent Trajectory in ATIF (Agent Trajectory Interchange Format).""" + + schema_version: Literal[ + "ATIF-v1.0", + "ATIF-v1.1", + "ATIF-v1.2", + "ATIF-v1.3", + "ATIF-v1.4", + "ATIF-v1.5", + "ATIF-v1.6", + "ATIF-v1.7", + ] = Field( + default="ATIF-v1.7", + description="String defining ATIF compatibility", + ) + session_id: str | None = Field( + default=None, + description=( + "Identifier for the agent run this trajectory belongs to. Scoped " + "to the run, NOT to an individual trajectory document: multiple " + "Trajectory objects MAY share the same `session_id` when they " + "represent the same logical run (e.g., a parent trajectory and " + "its embedded subagents, or a trajectory and its continuation " + "segments linked via `continued_trajectory_ref`). Therefore " + "`session_id`s within a parent's `subagent_trajectories` array " + "are NOT required to be unique. Use `trajectory_id` when a " + "per-trajectory-document unique identifier is required (e.g., " + "for `SubagentTrajectoryRef` resolution). Optional since " + "ATIF-v1.7; producers SHOULD set this on root trajectories for " + "run-level traceability, and MAY omit it on embedded subagents " + "that inherit the parent's run identity." + ), + ) + trajectory_id: str | None = Field( + default=None, + description=( + "Canonical per-trajectory-document identifier, distinct from " + "`session_id`. Unlike `session_id` (which is run-scoped and MAY " + "be shared), `trajectory_id` uniquely identifies THIS trajectory " + "object. Used to resolve `SubagentTrajectoryRef` entries against " + "the root's `subagent_trajectories` array without overloading " + "`session_id`'s run-scoped semantics. Optional on standalone " + "trajectories, but REQUIRED on any trajectory embedded in a " + "parent's `subagent_trajectories` array. `trajectory_id`s within " + "a single parent's `subagent_trajectories` array MUST be unique. " + "Added in ATIF-v1.7." + ), + ) + agent: Agent = Field( + default=..., + description="Object specifying the agent configuration", + ) + steps: list[Step] = Field( + default=..., + min_length=1, + description="Array of step objects representing the complete interaction history", + ) + notes: str | None = Field( + default=None, + description="Custom information, design notes, or explanations", + ) + final_metrics: FinalMetrics | None = Field( + default=None, + description="Summary metrics for the entire trajectory", + ) + continued_trajectory_ref: str | None = Field( + default=None, + description="Reference to the continuation trajectory file if this trajectory is continued in another file", + ) + extra: dict[str, Any] | None = Field( + default=None, + description="Custom root-level metadata", + ) + subagent_trajectories: list["Trajectory"] | None = Field( + default=None, + description=( + "Array of embedded subagent trajectories. Each element is a complete, " + "independently-valid ATIF Trajectory with its own schema_version, " + "agent, and step_id sequence starting at 1. Enables single-file " + "storage of multi-agent workflows: when a " + "SubagentTrajectoryRef.trajectory_path is null, consumers resolve " + "the reference by matching SubagentTrajectoryRef.trajectory_id " + "against Trajectory.trajectory_id of entries in this array. " + "Uniqueness rules: every embedded subagent MUST set `trajectory_id`, " + "and `trajectory_id`s within this array MUST be unique. " + "`session_id`, by contrast, is run-scoped and MAY collide across " + "siblings (or match the parent) when all trajectories belong to " + "the same logical run; embedded subagents MAY also omit " + "`session_id` entirely to inherit the parent's run identity. " + "Added in ATIF-v1.7." + ), + ) + + model_config = {"extra": "forbid"} + + def to_json_dict(self, exclude_none: bool = True) -> dict[str, Any]: + """Export trajectory to a dictionary suitable for JSON serialization. + + Args: + exclude_none: If True, exclude fields with None values from output. + + Returns: + Dictionary representation of the trajectory. + """ + return self.model_dump(exclude_none=exclude_none, mode="json") + + @model_validator(mode="after") + def validate_step_ids(self) -> "Trajectory": + """Validate that step_ids are sequential starting from 1.""" + for i, step in enumerate(self.steps): + expected_step_id = i + 1 + if step.step_id != expected_step_id: + raise ValueError( + f"steps[{i}].step_id: expected {expected_step_id} (sequential from 1), got {step.step_id}" + ) + return self + + @model_validator(mode="after") + def validate_embedded_subagent_trajectory_ids(self) -> "Trajectory": + """Every embedded subagent must carry a unique, non-null `trajectory_id`. + + Embedded subagents are resolved by matching + `SubagentTrajectoryRef.trajectory_id` against + `Trajectory.trajectory_id`, so the identifier must be present and + unique within a parent's `subagent_trajectories` array. Note that + no such constraint is placed on `session_id` — siblings MAY share + a `session_id` (or omit it entirely to inherit the parent's) when + they represent the same logical agent run. + """ + if not self.subagent_trajectories: + return self + seen: set[str] = set() + for i, sub in enumerate(self.subagent_trajectories): + if sub.trajectory_id is None: + raise ValueError( + f"subagent_trajectories[{i}].trajectory_id is required for embedded subagents " + f"(agent.name={sub.agent.name!r}, session_id={sub.session_id!r})" + ) + if sub.trajectory_id in seen: + raise ValueError( + f"subagent_trajectories[{i}].trajectory_id {sub.trajectory_id!r} is not unique " + f"within subagent_trajectories" + ) + seen.add(sub.trajectory_id) + return self + + @model_validator(mode="after") + def validate_tool_call_references(self) -> "Trajectory": + """Validate that observation source_call_ids reference valid tool_call_ids.""" + for step in self.steps: + if step.observation is None: + continue + + tool_call_ids = set() + if step.tool_calls: + tool_call_ids = {tc.tool_call_id for tc in step.tool_calls} + + for result in step.observation.results: + if result.source_call_id is not None and result.source_call_id not in tool_call_ids: + raise ValueError( + f"Observation result references source_call_id " + f"'{result.source_call_id}' which is not found in " + f"step {step.step_id}'s tool_calls" + ) + return self + + def has_multimodal_content(self) -> bool: + """Check if any step contains multimodal content (images). + + Returns: + True if any step message or observation contains image content parts. + """ + for step in self.steps: + if isinstance(step.message, list): + for part in step.message: + if part.type == "image": + return True + if step.observation: + for result in step.observation.results: + if isinstance(result.content, list): + for part in result.content: + if part.type == "image": + return True + return False diff --git a/packages/nemo_evaluator_sdk/src/nemo_evaluator_sdk/values/evidence.py b/packages/nemo_evaluator_sdk/src/nemo_evaluator_sdk/values/evidence.py index 244d0dcfcc..e2072b380d 100644 --- a/packages/nemo_evaluator_sdk/src/nemo_evaluator_sdk/values/evidence.py +++ b/packages/nemo_evaluator_sdk/src/nemo_evaluator_sdk/values/evidence.py @@ -8,6 +8,7 @@ import asyncio import difflib import hashlib +import json import os import shutil import signal @@ -18,6 +19,23 @@ from pydantic import BaseModel, ConfigDict, Field, PrivateAttr, model_validator +from nemo_evaluator_sdk.values.atif import ( + Agent, + FinalMetrics, + Metrics, + Observation, + ObservationResult, + Step, + ToolCall, + Trajectory, +) + +# Well-known evidence keys produced by the standard trial-evidence builder. The +# string values are the single source of truth (mirrored by the ``EVIDENCE_*`` +# constants in ``agent_eval.trials``); this alias types accessors/builders that +# expect a documented key while still allowing arbitrary extension keys. +WellKnownEvidenceKey = Literal["initial_state", "trace", "logs", "final_state", "verifier_logs"] + class FilesystemEntry(BaseModel): """One path that differs between two filesystem snapshots.""" @@ -320,6 +338,307 @@ def _requires_ref_or_data(self) -> EvidenceDescriptor: return self +# --------------------------------------------------------------------------- # +# ATIF normalization: map producer traces into the canonical ATIF Trajectory. +# The persisted/normalized format is the vendored ATIF schema (see values/atif.py, +# RFC 0001, schema_version ATIF-v1.7). Source traces (NeMo Agent Toolkit +# trajectories, OTel/OpenInference spans) are mapped into a Trajectory so trace +# metrics read one canonical shape regardless of producer. +# --------------------------------------------------------------------------- # +_DEFAULT_AGENT = Agent(name="unknown", version="0") +_VALID_SOURCES = {"system", "user", "agent"} + + +def _coerce_source(value: Any) -> Literal["system", "user", "agent"]: + """Map a producer step source onto an ATIF source, defaulting unknowns to ``agent``.""" + return value if value in _VALID_SOURCES else "agent" + + +def normalize_nat_trajectory(payload: dict[str, Any]) -> Trajectory: + """Map a NeMo Agent Toolkit trajectory (loose ``steps[]``) into an ATIF Trajectory. + + Producer steps may omit ATIF envelope fields (``schema_version``/``agent``) and + per-step ``step_id``; these are synthesized. Tool calls get a synthetic + ``tool_call_id`` when the producer supplies none, and an observation result's + ``source_call_id`` is kept only when it correlates to a tool call in the same + step (otherwise dropped) so the ATIF reference validator passes. + """ + steps: list[Step] = [] + for raw in payload.get("steps", []): + if not isinstance(raw, dict): + continue + source = _coerce_source(raw.get("source")) + tool_calls = _nat_tool_calls(raw, len(steps)) if source == "agent" else None + known_ids = {call.tool_call_id for call in tool_calls} if tool_calls else set() + message = raw.get("message") + steps.append( + Step( + step_id=len(steps) + 1, + source=source, + message=message if isinstance(message, str) else "", + reasoning_content=(raw.get("reasoning_content") if source == "agent" else None), + tool_calls=tool_calls or None, + observation=_nat_observation(raw, known_ids), + ) + ) + return Trajectory( + schema_version="ATIF-v1.7", + agent=_nat_agent(payload), + steps=steps or [Step(step_id=1, source="system", message="")], + final_metrics=_nat_final_metrics(payload), + ) + + +def _nat_tool_calls(raw: dict[str, Any], step_index: int) -> list[ToolCall]: + calls: list[ToolCall] = [] + for call_index, call in enumerate(raw.get("tool_calls", []) or []): + if not isinstance(call, dict): + continue + args = call.get("arguments") + calls.append( + ToolCall( + tool_call_id=call.get("tool_call_id") or f"call_{step_index}_{call_index}", + function_name=call.get("function_name") or "unknown", + arguments=args if isinstance(args, dict) else {}, + ) + ) + return calls + + +def _nat_observation(raw: dict[str, Any], known_ids: set[str]) -> Observation | None: + obs = raw.get("observation") + if not isinstance(obs, dict): + return None + results: list[ObservationResult] = [] + for result in obs.get("results", []) or []: + if not isinstance(result, dict): + continue + source_call_id = result.get("source_call_id") + content = result.get("content") + results.append( + ObservationResult( + source_call_id=source_call_id if source_call_id in known_ids else None, + content=str(content) if content is not None else None, + ) + ) + return Observation(results=results) if results else None + + +def _nat_agent(payload: dict[str, Any]) -> Agent: + agent = payload.get("agent") + if isinstance(agent, dict) and agent.get("name") and agent.get("version"): + return Agent(name=str(agent["name"]), version=str(agent["version"]), model_name=agent.get("model_name")) + return _DEFAULT_AGENT + + +def _nat_final_metrics(payload: dict[str, Any]) -> FinalMetrics | None: + metrics = payload.get("final_metrics") + if not isinstance(metrics, dict): + return None + prompt = metrics.get("prompt_tokens") or metrics.get("input_tokens") + completion = metrics.get("completion_tokens") or metrics.get("output_tokens") + cached = metrics.get("cached_tokens") + if prompt is None and completion is None and cached is None: + return None + return FinalMetrics(total_prompt_tokens=prompt, total_completion_tokens=completion, total_cached_tokens=cached) + + +def normalize_otel_spans(spans: list[dict[str, Any]]) -> Trajectory: + """Normalize OpenTelemetry GenAI spans into an ATIF Trajectory (one agent step per span).""" + return _normalize_spans(spans, _OTEL_KEYS) + + +def normalize_openinference_spans(spans: list[dict[str, Any]]) -> Trajectory: + """Normalize OpenInference spans into an ATIF Trajectory (one agent step per span).""" + return _normalize_spans(spans, _OPENINFERENCE_KEYS) + + +# (kind_attr, tool_kinds, tool_name_attr, prompt_attr, completion_attr) +_OTEL_KEYS = ( + "gen_ai.operation.name", + {"execute_tool", "tool"}, + "gen_ai.tool.name", + "gen_ai.usage.input_tokens", + "gen_ai.usage.output_tokens", +) +_OPENINFERENCE_KEYS = ( + "openinference.span.kind", + {"TOOL"}, + "tool.name", + "llm.token_count.prompt", + "llm.token_count.completion", +) + + +def _normalize_spans(spans: list[dict[str, Any]], keys: tuple[str, set[str], str, str, str]) -> Trajectory: + kind_attr, tool_kinds, name_attr, prompt_attr, completion_attr = keys + steps: list[Step] = [] + for span in spans: + attributes = span.get("attributes", {}) if isinstance(span, dict) else {} + kind = str(attributes.get(kind_attr, "")).lower() + is_tool = any(marker.lower() in kind for marker in tool_kinds) or name_attr in attributes + name = attributes.get(name_attr) or (span.get("name") if isinstance(span, dict) else None) + prompt, completion = attributes.get(prompt_attr), attributes.get(completion_attr) + metrics = Metrics(prompt_tokens=prompt, completion_tokens=completion) if (prompt or completion) else None + tool_calls = ( + [ToolCall(tool_call_id=f"call_{len(steps)}", function_name=str(name or "unknown"), arguments={})] + if is_tool + else None + ) + steps.append( + Step( + step_id=len(steps) + 1, + source="agent", + message="" if is_tool else str(name or ""), + tool_calls=tool_calls, + metrics=metrics, + ) + ) + return Trajectory( + schema_version="ATIF-v1.7", + agent=_DEFAULT_AGENT, + steps=steps or [Step(step_id=1, source="system", message="")], + final_metrics=_aggregate_step_metrics(steps), + ) + + +def _aggregate_step_metrics(steps: list[Step]) -> FinalMetrics | None: + prompt = sum((step.metrics.prompt_tokens or 0) for step in steps if step.metrics is not None) + completion = sum((step.metrics.completion_tokens or 0) for step in steps if step.metrics is not None) + if not prompt and not completion: + return None + return FinalMetrics(total_prompt_tokens=prompt or None, total_completion_tokens=completion or None) + + +def normalize_trace(payload: Any, *, source_format: str | None = None) -> Trajectory: + """Normalize a raw trace payload into a canonical ATIF :class:`Trajectory`. + + ``source_format`` (from the evidence descriptor) routes to a span normalizer + when set to ``otel``/``openinference``; otherwise the payload shape decides: + an already-ATIF document (``schema_version``/``agent``), a NeMo Agent Toolkit + trajectory (``steps``), or a bare span list. Unknown shapes are wrapped in a + minimal valid Trajectory so the persisted artifact is always conformant ATIF. + """ + if isinstance(payload, Trajectory): + return payload + fmt = (source_format or "").lower() + if fmt in {"otel", "opentelemetry"} and isinstance(payload, list): + return normalize_otel_spans(payload) + if fmt == "openinference" and isinstance(payload, list): + return normalize_openinference_spans(payload) + if isinstance(payload, dict): + if "schema_version" in payload or "agent" in payload: + return Trajectory.model_validate(payload) + if "steps" in payload: + return normalize_nat_trajectory(payload) + return Trajectory( + schema_version="ATIF-v1.7", agent=_DEFAULT_AGENT, steps=[Step(step_id=1, source="system", message="")] + ) + + +def _read_trace_payload(descriptor: EvidenceDescriptor) -> Any: + """Load a trace descriptor's raw payload from inline ``data`` or a local ``ref``.""" + if descriptor.data is not None: + return descriptor.data + if descriptor.ref is None: + raise ValueError("trace evidence descriptor requires ref or data") + return json.loads(_local_filesystem_ref(descriptor.ref).read_text(encoding="utf-8")) + + +def normalize_trace_descriptor(descriptor: EvidenceDescriptor) -> EvidenceDescriptor: + """Return ``descriptor`` with its trace payload normalized to ATIF (idempotent). + + Run before a trial is persisted so the stored shape is ATIF regardless of the + ingestion source (inline JSON trajectory, OTEL/OpenInference spans, raw events); + ``format`` becomes ``"atif"``. Non-trace descriptors, already-ATIF traces, and + descriptors with no resolvable payload are returned unchanged. Storage modality + is preserved: inline ``data`` stays inline; a file ``ref`` is normalized into a + sibling ``*.atif.json`` that the returned descriptor points at. + """ + if descriptor.kind != "trace" or descriptor.format == "atif": + return descriptor + if descriptor.ref is not None: + source = _local_filesystem_ref(descriptor.ref) + if not source.is_file(): + return descriptor + trace = normalize_trace(_read_trace_payload(descriptor), source_format=descriptor.format) + atif_path = source.with_name(f"{source.stem}.atif.json") + atif_path.write_text(json.dumps(trace.to_json_dict()), encoding="utf-8") + return descriptor.model_copy(update={"format": "atif", "ref": str(atif_path), "data": None}) + if descriptor.data is None: + return descriptor + trace = normalize_trace(descriptor.data, source_format=descriptor.format) + return descriptor.model_copy(update={"format": "atif", "data": trace.to_json_dict()}) + + +def normalize_candidate_evidence(evidence: CandidateEvidence) -> CandidateEvidence: + """Return a copy of ``evidence`` with every ``kind="trace"`` descriptor ATIF-normalized.""" + descriptors = {name: normalize_trace_descriptor(descriptor) for name, descriptor in evidence.descriptors.items()} + return evidence.model_copy(update={"descriptors": descriptors}) + + +class TraceHandle: + """Lazily normalized read handle over a trace evidence descriptor. + + Exposes the canonical ATIF :class:`~nemo_evaluator_sdk.values.atif.Trajectory` + and convenience views over its ``steps[]``. + """ + + def __init__(self, descriptor: EvidenceDescriptor) -> None: + self._descriptor = descriptor + self._trajectory: Trajectory | None = None + + async def trace(self) -> Trajectory: + """Return the normalized ATIF trajectory, reading and parsing on first access.""" + if self._trajectory is None: + payload = await asyncio.to_thread(self._load_payload) + self._trajectory = normalize_trace(payload, source_format=self._descriptor.format) + return self._trajectory + + def _load_payload(self) -> Any: + return _read_trace_payload(self._descriptor) + + async def steps(self) -> list[Step]: + """Return the ATIF steps in order.""" + return (await self.trace()).steps + + async def tool_calls(self) -> list[ToolCall]: + """Return all tool calls flattened across agent steps, in order.""" + calls: list[ToolCall] = [] + for step in await self.steps(): + calls.extend(step.tool_calls or []) + return calls + + async def token_usage(self) -> FinalMetrics: + """Return aggregate token usage (trajectory ``final_metrics``, else summed per step).""" + trajectory = await self.trace() + if trajectory.final_metrics is not None: + return trajectory.final_metrics + prompt = sum((step.metrics.prompt_tokens or 0) for step in trajectory.steps if step.metrics is not None) + completion = sum((step.metrics.completion_tokens or 0) for step in trajectory.steps if step.metrics is not None) + return FinalMetrics(total_prompt_tokens=prompt or None, total_completion_tokens=completion or None) + + +class LogHandle: + """Read handle over a log-bundle directory.""" + + def __init__(self, root: str | Path) -> None: + self._fs = LocalFilesystemEvidence(root) + + async def list_files(self) -> list[str]: + """Return relative paths of log files in the bundle.""" + return await self._fs.list_files("**/*") + + async def read_text(self, name: str) -> str: + """Read one log file's full text.""" + return await self._fs.read_text(name) + + async def tail(self, name: str, lines: int = 50) -> str: + """Return the last ``lines`` lines of a log file.""" + text = await self._fs.read_text(name) + return "\n".join(text.splitlines()[-lines:]) + + class CandidateEvidence(BaseModel): """Named evidence descriptors attached to an AgentEvalAttempt.""" @@ -334,6 +653,8 @@ class CandidateEvidence(BaseModel): description="Free-form metadata associated with the evidence collection.", ) _filesystem_cache: dict[str, LocalFilesystemEvidence] = PrivateAttr(default_factory=dict) + _trace_cache: dict[str, TraceHandle] = PrivateAttr(default_factory=dict) + _log_cache: dict[str, LogHandle] = PrivateAttr(default_factory=dict) @model_validator(mode="before") @classmethod @@ -378,6 +699,31 @@ async def filesystem(self, name: str) -> LocalFilesystemEvidence: self._filesystem_cache[name] = handle return handle + async def trace(self, name: str = "trace") -> TraceHandle: + """Return a cached normalized-trace handle for a named trace descriptor. + + Async for consistency with :meth:`filesystem`/:meth:`logs` and to leave room + for ref staging/validation; the trace itself is read lazily on first access. + """ + cached = self._trace_cache.get(name) + if cached is not None: + return cached + handle = TraceHandle(self.require(name)) + self._trace_cache[name] = handle + return handle + + async def logs(self, name: str = "logs") -> LogHandle: + """Return a cached log-bundle handle for a named logs descriptor.""" + cached = self._log_cache.get(name) + if cached is not None: + return cached + descriptor = self.require(name, kind="logs") + if descriptor.ref is None: + raise ValueError(f"logs evidence descriptor {name!r} requires a local ref") + handle = LogHandle(_local_filesystem_ref(descriptor.ref)) + self._log_cache[name] = handle + return handle + def _local_filesystem_ref(ref: str) -> Path: """Resolve a local filesystem ref to a Path. diff --git a/packages/nemo_evaluator_sdk/tests/agent_eval/test_evidence.py b/packages/nemo_evaluator_sdk/tests/agent_eval/test_evidence.py index 7de29cf409..59d8a50bdf 100644 --- a/packages/nemo_evaluator_sdk/tests/agent_eval/test_evidence.py +++ b/packages/nemo_evaluator_sdk/tests/agent_eval/test_evidence.py @@ -2,6 +2,8 @@ # SPDX-License-Identifier: Apache-2.0 import asyncio +import json +import os from pathlib import Path import pytest @@ -10,6 +12,7 @@ CandidateEvidence, EvidenceDescriptor, LocalFilesystemEvidence, + normalize_trace_descriptor, ) @@ -151,14 +154,19 @@ async def test_verifier_timeout_kills_the_whole_process_tree(tmp_path: Path) -> root.mkdir() handle = LocalFilesystemEvidence(root) - # A grandchild backgrounded by the verifier would write the marker after 1s if it - # survived; killing the whole process group on timeout stops it first. - marker = tmp_path / "survived.txt" - result = await handle.run_verifier(["sh", "-c", f"(sleep 1; touch '{marker}') & sleep 5"], timeout_s=0.3) + # The verifier backgrounds a long-lived child and records its PID. Killing only the + # direct shell would orphan that child; killing the whole process group reaps it. + # (We assert the child PID is gone rather than watching for a follow-on side effect, + # which would race the kill: reaping the child can let the shell run its next command + # in the window before the shell itself is signalled.) + pidfile = tmp_path / "child.pid" + result = await handle.run_verifier(["sh", "-c", f"sleep 30 & echo $! > '{pidfile}'; wait"], timeout_s=0.3) assert result.timed_out - await asyncio.sleep(1.5) - assert not marker.exists() + child_pid = int(pidfile.read_text().strip()) + await asyncio.sleep(0.2) # let the OS finish reaping the killed group + with pytest.raises(ProcessLookupError): + os.kill(child_pid, 0) @pytest.mark.asyncio @@ -177,3 +185,107 @@ async def test_unified_diff_reports_text_patch_and_skips_binary(tmp_path: Path) assert "-b" in patch and "+c" in patch and patch.startswith("--- a/f.txt") assert await before.unified_diff(after, "img.bin") == "" # binary: no textual patch assert await before.unified_diff(before, "f.txt") == "" # identical: empty + + +@pytest.mark.asyncio +async def test_trace_handle_normalizes_nat_otel_and_openinference(tmp_path: Path) -> None: + nat_trajectory = { + "steps": [ + {"step_id": 1, "source": "user", "message": "do it"}, + { + "step_id": 2, + "source": "agent", + "message": "calling tool", + "tool_calls": [{"function_name": "search", "arguments": {"q": "x"}}], + "observation": {"results": [{"content": "result text"}]}, + }, + ], + "final_metrics": {"prompt_tokens": 10, "completion_tokens": 5}, + } + trace_path = tmp_path / "trajectory.json" + trace_path.write_text(json.dumps(nat_trajectory), encoding="utf-8") + + evidence = CandidateEvidence( + descriptors={"trace": EvidenceDescriptor(kind="trace", ref=str(trace_path), format="json")} + ) + handle = await evidence.trace("trace") + assert handle is await evidence.trace("trace") # cached + trajectory = await handle.trace() + assert trajectory.schema_version == "ATIF-v1.7" + assert [step.source for step in trajectory.steps] == ["user", "agent"] + assert (await handle.tool_calls())[0].function_name == "search" + assert (await handle.token_usage()).total_prompt_tokens == 10 + + otel = CandidateEvidence( + descriptors={ + "trace": EvidenceDescriptor( + kind="trace", + format="otel", + data=[ + {"attributes": {"gen_ai.operation.name": "execute_tool", "gen_ai.tool.name": "shell"}}, + {"attributes": {"gen_ai.usage.input_tokens": 7, "gen_ai.usage.output_tokens": 3}}, + ], + ) + } + ) + assert (await (await otel.trace("trace")).tool_calls())[0].function_name == "shell" + assert (await (await otel.trace("trace")).token_usage()).total_prompt_tokens == 7 + + openinference = CandidateEvidence( + descriptors={ + "trace": EvidenceDescriptor( + kind="trace", + format="openinference", + data=[{"attributes": {"openinference.span.kind": "TOOL", "tool.name": "calc"}}], + ) + } + ) + assert (await (await openinference.trace("trace")).tool_calls())[0].function_name == "calc" + + +@pytest.mark.asyncio +async def test_logs_handle_reads_and_tails(tmp_path: Path) -> None: + log_dir = tmp_path / "logs" + log_dir.mkdir() + (log_dir / "agent.log").write_text("line1\nline2\nline3\n", encoding="utf-8") + + evidence = CandidateEvidence(descriptors={"logs": EvidenceDescriptor(kind="logs", format="dir", ref=str(log_dir))}) + handle = await evidence.logs("logs") + assert handle is await evidence.logs("logs") # cached + assert await handle.list_files() == ["agent.log"] + assert await handle.read_text("agent.log") == "line1\nline2\nline3\n" + assert await handle.tail("agent.log", 2) == "line2\nline3" + + +@pytest.mark.asyncio +async def test_normalize_trace_descriptor_persists_atif_round_trip(tmp_path: Path) -> None: + # Inline NAT trajectory — the shape the live path (evaluator._trial_from_sample) attaches. + inline = EvidenceDescriptor(kind="trace", format="json", data={"steps": [{"source": "user", "message": "hi"}]}) + norm = normalize_trace_descriptor(inline) + assert norm.format == "atif" + assert norm.data["schema_version"] == "ATIF-v1.7" and norm.data["steps"][0]["source"] == "user" + assert normalize_trace_descriptor(norm) is norm # idempotent: already-ATIF passes through + steps = await (await CandidateEvidence(descriptors={"trace": norm}).trace("trace")).steps() + assert steps[0].source == "user" and steps[0].message == "hi" + + # OTEL spans (inline list) normalize to ATIF too. + otel = EvidenceDescriptor( + kind="trace", + format="otel", + data=[{"attributes": {"gen_ai.operation.name": "execute_tool", "gen_ai.tool.name": "shell"}}], + ) + otel_norm = normalize_trace_descriptor(otel) + assert otel_norm.format == "atif" and otel_norm.data["steps"][0]["tool_calls"][0]["function_name"] == "shell" + + # File ref: source is read, a sibling *.atif.json is written, and ref re-points to it. + src = tmp_path / "trajectory.json" + src.write_text( + json.dumps( + {"steps": [{"source": "agent", "tool_calls": [{"function_name": "search", "arguments": {"q": "x"}}]}]} + ), + encoding="utf-8", + ) + file_norm = normalize_trace_descriptor(EvidenceDescriptor(kind="trace", format="json", ref=str(src))) + assert file_norm.format == "atif" and file_norm.ref.endswith(".atif.json") and Path(file_norm.ref).is_file() + tool_calls = await (await CandidateEvidence(descriptors={"trace": file_norm}).trace("trace")).tool_calls() + assert tool_calls[0].function_name == "search" diff --git a/packages/nemo_evaluator_sdk/tests/agent_eval/test_example_metrics.py b/packages/nemo_evaluator_sdk/tests/agent_eval/test_example_metrics.py index 9f25fabdd5..3d28b94f95 100644 --- a/packages/nemo_evaluator_sdk/tests/agent_eval/test_example_metrics.py +++ b/packages/nemo_evaluator_sdk/tests/agent_eval/test_example_metrics.py @@ -4,6 +4,7 @@ """Exercise the example's reference metrics-over-evidence.""" import importlib.util +import json from pathlib import Path import pytest @@ -55,3 +56,30 @@ async def test_tests_pass_and_no_test_cheating(tmp_path: Path) -> None: ) cheated = await example_metrics.NoTestCheatingMetric().compute_scores(_input_with_evidence(evidence_cheated)) assert cheated.outputs[0].value is False + + +@pytest.mark.asyncio +async def test_inefficient_retry_loop(tmp_path: Path) -> None: + def trajectory(repeats: int) -> dict: + calls = [{"function_name": "search", "arguments": {"q": "same"}} for _ in range(repeats)] + return {"steps": [{"step_id": 1, "source": "agent", "message": "", "tool_calls": calls}]} + + looping = tmp_path / "loop.json" + looping.write_text(json.dumps(trajectory(5)), encoding="utf-8") + clean = tmp_path / "clean.json" + clean.write_text(json.dumps(trajectory(1)), encoding="utf-8") + + metric = example_metrics.InefficientRetryLoopMetric(threshold=2) + + loop_result = await metric.compute_scores( + _input_with_evidence( + CandidateEvidence(descriptors={"trace": EvidenceDescriptor(kind="trace", ref=str(looping))}) + ) + ) + assert loop_result.outputs[0].value is False + assert loop_result.outputs[1].value == 5 + + clean_result = await metric.compute_scores( + _input_with_evidence(CandidateEvidence(descriptors={"trace": EvidenceDescriptor(kind="trace", ref=str(clean))})) + ) + assert clean_result.outputs[0].value is True diff --git a/sdk/python/nemo-platform/src/nemo_platform/beta/evaluator/agent_eval/evaluator.py b/sdk/python/nemo-platform/src/nemo_platform/beta/evaluator/agent_eval/evaluator.py index c0c5099f1a..be833b3991 100644 --- a/sdk/python/nemo-platform/src/nemo_platform/beta/evaluator/agent_eval/evaluator.py +++ b/sdk/python/nemo-platform/src/nemo_platform/beta/evaluator/agent_eval/evaluator.py @@ -44,7 +44,7 @@ from nemo_platform.beta.evaluator.metrics.protocol import Metric, validate_metric_result from nemo_platform.beta.evaluator.metrics.utils import metric_type_name from nemo_platform.beta.evaluator.values import Agent, Model, RunConfig, RunConfigOnline, RunConfigOnlineModel -from nemo_platform.beta.evaluator.values.evidence import CandidateEvidence, EvidenceDescriptor +from nemo_platform.beta.evaluator.values.evidence import CandidateEvidence, EvidenceDescriptor, normalize_trace_descriptor from openai import AsyncOpenAI log = getLogger(__name__) @@ -327,7 +327,9 @@ def _trial_from_sample(task: AgentEvalTask, target: Model | Agent, sample: dict[ # trial stays scorable instead of being dropped as empty output. output_text = _reasoning_content_fallback(sample.get("response")) if "trajectory" in sample: - trace = EvidenceDescriptor(kind="trace", format="json", data=sample["trajectory"]) + # Normalize to ATIF before the trial is persisted so the stored shape is + # source-agnostic (sources in, ATIF out); TraceHandle then reads it uniformly. + trace = normalize_trace_descriptor(EvidenceDescriptor(kind="trace", format="json", data=sample["trajectory"])) else: trace = EvidenceDescriptor(kind="sdk_online_generation", data={"task_id": task.id, "target": target.name}) diff --git a/sdk/python/nemo-platform/src/nemo_platform/beta/evaluator/agent_eval/trials.py b/sdk/python/nemo-platform/src/nemo_platform/beta/evaluator/agent_eval/trials.py index 9aa05a1584..9ecdc6fd46 100644 --- a/sdk/python/nemo-platform/src/nemo_platform/beta/evaluator/agent_eval/trials.py +++ b/sdk/python/nemo-platform/src/nemo_platform/beta/evaluator/agent_eval/trials.py @@ -14,7 +14,7 @@ from nemo_platform.beta.evaluator.agent_eval.tasks import AgentEvalRunConfig, AgentEvalTask from nemo_platform.beta.evaluator.values import Agent, Model -from nemo_platform.beta.evaluator.values.evidence import CandidateEvidence, EvidenceDescriptor +from nemo_platform.beta.evaluator.values.evidence import CandidateEvidence, EvidenceDescriptor, normalize_trace_descriptor from pydantic import BaseModel, ConfigDict, Field, field_validator, model_validator # Well-known evidence keys produced by ``standard_evidence_descriptors``. Harness @@ -158,10 +158,14 @@ def standard_evidence_descriptors( if trace_path is not None: trace_name = Path(trace_path).name.lower() is_atif = trace_name.startswith("atif") or ".atif." in trace_name - descriptors[EVIDENCE_TRACE] = EvidenceDescriptor( - kind="trace", - format="atif" if is_atif else "json", - ref=str(trace_path), + # Normalize the source trace into a sibling ATIF file before persistence so the + # stored descriptor is ATIF regardless of producer (no-op if already ATIF/missing). + descriptors[EVIDENCE_TRACE] = normalize_trace_descriptor( + EvidenceDescriptor( + kind="trace", + format="atif" if is_atif else "json", + ref=str(trace_path), + ) ) logs_metadata = {"primary_log": primary_log} if primary_log else {} diff --git a/sdk/python/nemo-platform/src/nemo_platform/beta/evaluator/values/__init__.py b/sdk/python/nemo-platform/src/nemo_platform/beta/evaluator/values/__init__.py index 328bcabb0c..68248e9d0e 100644 --- a/sdk/python/nemo-platform/src/nemo_platform/beta/evaluator/values/__init__.py +++ b/sdk/python/nemo-platform/src/nemo_platform/beta/evaluator/values/__init__.py @@ -4,6 +4,15 @@ """Public value types for evaluator SDK runtime.""" from nemo_platform.beta.evaluator.values.agents import Agent +from nemo_platform.beta.evaluator.values.atif import ( + FinalMetrics, + Metrics, + Observation, + ObservationResult, + Step, + ToolCall, + Trajectory, +) from nemo_platform.beta.evaluator.values.common import SecretRef, SupportedJobTypes from nemo_platform.beta.evaluator.values.dataset_schemas import ( FieldMapping, @@ -17,6 +26,11 @@ FilesystemDiff, FilesystemEntry, LocalFilesystemEvidence, + LogHandle, + TraceHandle, + WellKnownEvidenceKey, + normalize_candidate_evidence, + normalize_trace_descriptor, ) from nemo_platform.beta.evaluator.values.metrics import ( BLEU, @@ -109,6 +123,18 @@ "ContinuousScore", "FilesystemDiff", "FilesystemEntry", + "FinalMetrics", + "LogHandle", + "Metrics", + "Observation", + "ObservationResult", + "Step", + "ToolCall", + "Trajectory", + "TraceHandle", + "WellKnownEvidenceKey", + "normalize_candidate_evidence", + "normalize_trace_descriptor", "DatasetRow", "DatasetRows", "DefaultAggregateFieldName", diff --git a/sdk/python/nemo-platform/src/nemo_platform/beta/evaluator/values/atif.py b/sdk/python/nemo-platform/src/nemo_platform/beta/evaluator/values/atif.py new file mode 100644 index 0000000000..4387b565b5 --- /dev/null +++ b/sdk/python/nemo-platform/src/nemo_platform/beta/evaluator/values/atif.py @@ -0,0 +1,639 @@ +# SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +"""Agent Trajectory Interchange Format (ATIF) models. + +Vendored, byte-for-byte from the Harbor reference implementation +(``harbor.models.trajectories``, Harbor v0.15.0) so the eval SDK persists traces +in the canonical ATIF schema without taking a runtime dependency on the Harbor +framework (which pulls in litellm/datasets/fastapi/...). The authoritative spec is +RFC 0001 "Agent Trajectory Interchange Format (ATIF)" (schema_version ATIF-v1.7). + +Keep this module in lockstep with the upstream models: when Harbor bumps the ATIF +schema, re-vendor rather than editing the field set locally. The only change from +upstream is collapsing the per-file modules into this single file and ordering the +classes by dependency. +""" + +from datetime import datetime +from typing import Any, Literal + +from pydantic import BaseModel, Field, field_validator, model_validator + + +class ImageSource(BaseModel): + """Image source specification for images stored as files or at remote URLs.""" + + media_type: Literal["image/jpeg", "image/png", "image/gif", "image/webp"] = Field( + default=..., + description="MIME type of the image", + ) + path: str = Field( + default=..., + description="Location of the image. Can be a relative or absolute file path, or a URL.", + ) + + model_config = {"extra": "forbid"} + + +class ContentPart(BaseModel): + """A single content part within a multimodal message. + + Used when a message or observation contains mixed content types (text and images). + For text-only content, a plain string can still be used instead of a ContentPart array. + """ + + type: Literal["text", "image"] = Field( + default=..., + description="The type of content", + ) + text: str | None = Field( + default=None, + description="Text content. Required when type='text'.", + ) + source: ImageSource | None = Field( + default=None, + description="Image source (file reference). Required when type='image'.", + ) + + model_config = {"extra": "forbid"} + + @model_validator(mode="after") + def validate_content_type(self) -> "ContentPart": + """Validate that the correct fields are present for each content type.""" + if self.type == "text": + if self.text is None: + raise ValueError("'text' field is required when type='text'") + if self.source is not None: + raise ValueError("'source' field is not allowed when type='text'") + elif self.type == "image": + if self.source is None: + raise ValueError("'source' field is required when type='image'") + if self.text is not None: + raise ValueError("'text' field is not allowed when type='image'") + return self + + +class SubagentTrajectoryRef(BaseModel): + """Reference to a delegated subagent trajectory. + + A subagent reference is resolved by one of two mechanisms: + + 1. **Embedded form** — set `trajectory_id` to match the + `Trajectory.trajectory_id` of an entry in the parent's + `subagent_trajectories` array. + 2. **File-ref form** — set `trajectory_path` to the location + (file path, S3 URL, etc.) of an external trajectory file. + + These two mechanisms are the only resolution keys. `session_id`, when + present on the ref, is **informational only**: it records the run + identity of the delegated subagent for debug / correlation / cross- + trajectory search purposes, and MUST NOT be used as a matching key + (it is run-scoped and MAY collide across siblings — see + `Trajectory.session_id`). A ref therefore MUST set at least one of + `trajectory_id` or `trajectory_path`; `session_id` alone is not a + resolvable reference. + """ + + trajectory_id: str | None = Field( + default=None, + description=( + "Canonical identifier of the delegated subagent trajectory. " + "Matches `Trajectory.trajectory_id` of an entry in the parent's " + "`subagent_trajectories` array and is the resolution key for " + "embedded references. Added in ATIF-v1.7 to provide a document-" + "unique matching key without overloading `session_id`." + ), + ) + session_id: str | None = Field( + default=None, + description=( + "Run identity of the delegated subagent trajectory. Informational " + "only: recorded so consumers can correlate this ref back to the " + "subagent's run for debug / search / display purposes. Run-scoped " + "(see `Trajectory.session_id`) and therefore NOT a valid " + "resolution key — consumers MUST resolve via `trajectory_id` " + "(embedded) or `trajectory_path` (external file)." + ), + ) + trajectory_path: str | None = Field( + default=None, + description=( + "Location of the complete subagent trajectory as an external " + "file (file path, S3 URL, database reference, etc.). Resolution " + "key for file-ref references. When both `trajectory_id` and " + "`trajectory_path` are set, consumers MAY choose either; " + "typically `trajectory_id` is preferred when the embedded " + "trajectory is available in-memory." + ), + ) + extra: dict[str, Any] | None = Field( + default=None, + description="Custom metadata about the subagent execution", + ) + + model_config = {"extra": "forbid"} + + @model_validator(mode="after") + def validate_is_resolvable(self) -> "SubagentTrajectoryRef": + """A ref must be resolvable: set `trajectory_id` or `trajectory_path`. + + `session_id` alone is not sufficient because it is run-scoped and + MAY collide across siblings (see `Trajectory.session_id`), so it + cannot unambiguously identify which subagent trajectory a ref + points at. + """ + if self.trajectory_id is None and self.trajectory_path is None: + raise ValueError( + "SubagentTrajectoryRef must be resolvable: set either " + "`trajectory_id` (for embedded references) or " + "`trajectory_path` (for external-file references). " + "`session_id` alone is not a resolution key — it is " + "run-scoped and may collide across siblings." + ) + return self + + +class ToolCall(BaseModel): + """A tool call within a step.""" + + tool_call_id: str = Field( + default=..., + description="Unique identifier for this specific tool call", + ) + function_name: str = Field( + default=..., + description="The name of the function or tool being invoked", + ) + arguments: dict[str, Any] = Field( + default=..., + description="Arguments passed to the function (can be empty dict)", + ) + extra: dict[str, Any] | None = Field( + default=None, + description=("Custom tool-call-level metadata (e.g., timeout, retry count, tool version). Added in ATIF-v1.7."), + ) + + model_config = {"extra": "forbid"} + + +class ObservationResult(BaseModel): + """A single result within an observation.""" + + source_call_id: str | None = Field( + default=None, + description=( + "The `tool_call_id` from the _tool_calls_ array in _StepObject_ that this " + "result corresponds to. If null or omitted, the result comes from an " + "action that doesn't use the standard tool calling format (e.g., agent " + "actions without tool calls or system-initiated operations)." + ), + ) + content: str | list[ContentPart] | None = Field( + default=None, + description=( + "The output or result from the tool execution. String for text-only " + "content, or array of ContentPart for multimodal content (added in ATIF-v1.6)." + ), + ) + subagent_trajectory_ref: list[SubagentTrajectoryRef] | None = Field( + default=None, + description="Array of references to delegated subagent trajectories", + ) + extra: dict[str, Any] | None = Field( + default=None, + description=( + "Custom observation-result-level metadata (e.g., confidence score, " + "retrieval score, source document ID). Added in ATIF-v1.7." + ), + ) + + model_config = {"extra": "forbid"} + + +class Observation(BaseModel): + """Environment feedback/result after actions or system events.""" + + results: list[ObservationResult] = Field( + default=..., + description="Array of result objects from tool calls or actions", + ) + + model_config = {"extra": "forbid"} + + +class Metrics(BaseModel): + """LLM operational and confidence data.""" + + prompt_tokens: int | None = Field( + default=None, + description="Total input tokens including cached and non-cached", + ) + completion_tokens: int | None = Field( + default=None, + description="Total tokens generated by the LLM response", + ) + cached_tokens: int | None = Field( + default=None, + description="Subset of prompt_tokens that were cache hits", + ) + cost_usd: float | None = Field( + default=None, + description="Monetary cost of the API call in USD", + ) + prompt_token_ids: list[int] | None = Field( + default=None, + description="Token IDs for prompt (input) tokens sent to the LLM, including chat history (if applicable)", + ) + completion_token_ids: list[int] | None = Field( + default=None, + description="Token IDs for completion (response) tokens, enabling RL training without retokenization drift", + ) + logprobs: list[float] | None = Field( + default=None, + description="Log probability assigned to each generated token", + ) + extra: dict[str, Any] | None = Field( + default=None, + description="Other metrics", + ) + + model_config = {"extra": "forbid"} + + +class FinalMetrics(BaseModel): + """Aggregate statistics for the entire trajectory.""" + + total_prompt_tokens: int | None = Field( + default=None, + description="Sum of all prompt tokens across all steps, including cached tokens", + ) + total_completion_tokens: int | None = Field( + default=None, + description="Sum of all completion tokens across all steps", + ) + total_cached_tokens: int | None = Field( + default=None, + description="Sum of all cached tokens across all steps", + ) + total_cost_usd: float | None = Field( + default=None, + description="Total real monetary cost for the entire trajectory, including cost for subagents, if any", + ) + total_steps: int | None = Field( + default=None, + ge=0, + description=( + "Total number of steps. If not equivalent to the number of steps in the " + "trajectory, must be documented in the root-level notes field." + ), + ) + extra: dict[str, Any] | None = Field( + default=None, + description="Custom aggregate metrics", + ) + + model_config = {"extra": "forbid"} + + +class Agent(BaseModel): + """Agent configuration.""" + + name: str = Field( + default=..., + description="The name of the agent system", + ) + version: str = Field( + default=..., + description="The version identifier of the agent system", + ) + model_name: str | None = Field( + default=None, + description="Default LLM model used for this trajectory", + ) + tool_definitions: list[dict[str, Any]] | None = Field( + default=None, + description=( + "Array of tool/function definitions available to the agent. " + "Each element follows OpenAI's function calling schema." + ), + ) + extra: dict[str, Any] | None = Field( + default=None, + description="Custom agent configuration details", + ) + + model_config = {"extra": "forbid"} + + +class Step(BaseModel): + """A single step in the trajectory.""" + + step_id: int = Field( + default=..., + ge=1, + description="Ordinal index of the turn (starting from 1)", + ) + timestamp: str | None = Field( + default=None, + description="ISO 8601 timestamp indicating when this step occurred", + ) + source: Literal["system", "user", "agent"] = Field( + default=..., + description="The originator of this step", + ) + model_name: str | None = Field( + default=None, + description=( + "The specific LLM model used for this turn. Omission implies the model " + "defined in the root-level agent config." + ), + ) + reasoning_effort: str | float | None = Field( + default=None, + description="Qualitative or quantitative measure of effort", + ) + message: str | list[ContentPart] = Field( + default=..., + description=( + "The dialogue message. String for text-only content, or array of " + "ContentPart for multimodal content (added in ATIF-v1.6)." + ), + ) + reasoning_content: str | None = Field( + default=None, + description="The agent's explicit internal reasoning", + ) + tool_calls: list[ToolCall] | None = Field( + default=None, + description="Array of structured objects for the agent's actions", + ) + observation: Observation | None = Field( + default=None, + description="Environment feedback/result after actions or system events", + ) + metrics: Metrics | None = Field( + default=None, + description="LLM operational and confidence data for this step", + ) + is_copied_context: bool | None = Field( + default=None, + description=( + "Indicates whether this step was copied from a previous trajectory " + "for context (e.g., during continuation after summarization). " + "Steps marked as copied context should not be included in training data " + "as they represent previously-trained interactions. " + "Added in ATIF-v1.5." + ), + ) + llm_call_count: int | None = Field( + default=None, + ge=0, + description=( + "Number of LLM inferences this step represents. When >1, metrics are " + "aggregated across multiple LLM calls. When 1, the step represents exactly " + "one inference. When 0 on a `source: 'agent'` step, the step represents a " + "deterministic (non-LLM) dispatch; `metrics` and `reasoning_content` MUST " + "be absent in that case. When null, the producer did not track this " + "(backward-compatible default). Added in ATIF-v1.7." + ), + ) + extra: dict[str, Any] | None = Field( + default=None, + description="Custom step-level metadata", + ) + + model_config = {"extra": "forbid"} + + @field_validator("timestamp") + @classmethod + def validate_timestamp(cls, v: str | None) -> str | None: + """Validate that timestamp is a valid ISO 8601 string.""" + if v is not None: + try: + datetime.fromisoformat(v.replace("Z", "+00:00")) + except ValueError as e: + raise ValueError(f"Invalid ISO 8601 timestamp: {e}") from e + return v + + @model_validator(mode="after") + def validate_agent_only_fields(self) -> "Step": + """Validate that certain fields are only present for agent steps.""" + if self.source != "agent": + agent_only_fields = [ + "model_name", + "reasoning_effort", + "reasoning_content", + "tool_calls", + "metrics", + ] + for field in agent_only_fields: + if getattr(self, field) is not None: + raise ValueError( + f"Field '{field}' is only applicable when source is 'agent', but source is '{self.source}'" + ) + return self + + @model_validator(mode="after") + def validate_llm_call_count_zero_fields(self) -> "Step": + """Enforce ATIF v1.7 no-LLM orchestration rule. + + When ``llm_call_count == 0`` on a ``source: "agent"`` step, the step + represents a deterministic (non-LLM) dispatch. LLM-specific fields + (``metrics``, ``reasoning_content``) MUST be absent on such steps. + """ + if self.llm_call_count == 0 and self.source == "agent": + llm_only_fields = ["metrics", "reasoning_content"] + for field in llm_only_fields: + if getattr(self, field) is not None: + raise ValueError( + f"Field '{field}' must be absent when llm_call_count is 0 " + f"(deterministic dispatch on a 'source: agent' step)" + ) + return self + + +class Trajectory(BaseModel): + """Agent Trajectory in ATIF (Agent Trajectory Interchange Format).""" + + schema_version: Literal[ + "ATIF-v1.0", + "ATIF-v1.1", + "ATIF-v1.2", + "ATIF-v1.3", + "ATIF-v1.4", + "ATIF-v1.5", + "ATIF-v1.6", + "ATIF-v1.7", + ] = Field( + default="ATIF-v1.7", + description="String defining ATIF compatibility", + ) + session_id: str | None = Field( + default=None, + description=( + "Identifier for the agent run this trajectory belongs to. Scoped " + "to the run, NOT to an individual trajectory document: multiple " + "Trajectory objects MAY share the same `session_id` when they " + "represent the same logical run (e.g., a parent trajectory and " + "its embedded subagents, or a trajectory and its continuation " + "segments linked via `continued_trajectory_ref`). Therefore " + "`session_id`s within a parent's `subagent_trajectories` array " + "are NOT required to be unique. Use `trajectory_id` when a " + "per-trajectory-document unique identifier is required (e.g., " + "for `SubagentTrajectoryRef` resolution). Optional since " + "ATIF-v1.7; producers SHOULD set this on root trajectories for " + "run-level traceability, and MAY omit it on embedded subagents " + "that inherit the parent's run identity." + ), + ) + trajectory_id: str | None = Field( + default=None, + description=( + "Canonical per-trajectory-document identifier, distinct from " + "`session_id`. Unlike `session_id` (which is run-scoped and MAY " + "be shared), `trajectory_id` uniquely identifies THIS trajectory " + "object. Used to resolve `SubagentTrajectoryRef` entries against " + "the root's `subagent_trajectories` array without overloading " + "`session_id`'s run-scoped semantics. Optional on standalone " + "trajectories, but REQUIRED on any trajectory embedded in a " + "parent's `subagent_trajectories` array. `trajectory_id`s within " + "a single parent's `subagent_trajectories` array MUST be unique. " + "Added in ATIF-v1.7." + ), + ) + agent: Agent = Field( + default=..., + description="Object specifying the agent configuration", + ) + steps: list[Step] = Field( + default=..., + min_length=1, + description="Array of step objects representing the complete interaction history", + ) + notes: str | None = Field( + default=None, + description="Custom information, design notes, or explanations", + ) + final_metrics: FinalMetrics | None = Field( + default=None, + description="Summary metrics for the entire trajectory", + ) + continued_trajectory_ref: str | None = Field( + default=None, + description="Reference to the continuation trajectory file if this trajectory is continued in another file", + ) + extra: dict[str, Any] | None = Field( + default=None, + description="Custom root-level metadata", + ) + subagent_trajectories: list["Trajectory"] | None = Field( + default=None, + description=( + "Array of embedded subagent trajectories. Each element is a complete, " + "independently-valid ATIF Trajectory with its own schema_version, " + "agent, and step_id sequence starting at 1. Enables single-file " + "storage of multi-agent workflows: when a " + "SubagentTrajectoryRef.trajectory_path is null, consumers resolve " + "the reference by matching SubagentTrajectoryRef.trajectory_id " + "against Trajectory.trajectory_id of entries in this array. " + "Uniqueness rules: every embedded subagent MUST set `trajectory_id`, " + "and `trajectory_id`s within this array MUST be unique. " + "`session_id`, by contrast, is run-scoped and MAY collide across " + "siblings (or match the parent) when all trajectories belong to " + "the same logical run; embedded subagents MAY also omit " + "`session_id` entirely to inherit the parent's run identity. " + "Added in ATIF-v1.7." + ), + ) + + model_config = {"extra": "forbid"} + + def to_json_dict(self, exclude_none: bool = True) -> dict[str, Any]: + """Export trajectory to a dictionary suitable for JSON serialization. + + Args: + exclude_none: If True, exclude fields with None values from output. + + Returns: + Dictionary representation of the trajectory. + """ + return self.model_dump(exclude_none=exclude_none, mode="json") + + @model_validator(mode="after") + def validate_step_ids(self) -> "Trajectory": + """Validate that step_ids are sequential starting from 1.""" + for i, step in enumerate(self.steps): + expected_step_id = i + 1 + if step.step_id != expected_step_id: + raise ValueError( + f"steps[{i}].step_id: expected {expected_step_id} (sequential from 1), got {step.step_id}" + ) + return self + + @model_validator(mode="after") + def validate_embedded_subagent_trajectory_ids(self) -> "Trajectory": + """Every embedded subagent must carry a unique, non-null `trajectory_id`. + + Embedded subagents are resolved by matching + `SubagentTrajectoryRef.trajectory_id` against + `Trajectory.trajectory_id`, so the identifier must be present and + unique within a parent's `subagent_trajectories` array. Note that + no such constraint is placed on `session_id` — siblings MAY share + a `session_id` (or omit it entirely to inherit the parent's) when + they represent the same logical agent run. + """ + if not self.subagent_trajectories: + return self + seen: set[str] = set() + for i, sub in enumerate(self.subagent_trajectories): + if sub.trajectory_id is None: + raise ValueError( + f"subagent_trajectories[{i}].trajectory_id is required for embedded subagents " + f"(agent.name={sub.agent.name!r}, session_id={sub.session_id!r})" + ) + if sub.trajectory_id in seen: + raise ValueError( + f"subagent_trajectories[{i}].trajectory_id {sub.trajectory_id!r} is not unique " + f"within subagent_trajectories" + ) + seen.add(sub.trajectory_id) + return self + + @model_validator(mode="after") + def validate_tool_call_references(self) -> "Trajectory": + """Validate that observation source_call_ids reference valid tool_call_ids.""" + for step in self.steps: + if step.observation is None: + continue + + tool_call_ids = set() + if step.tool_calls: + tool_call_ids = {tc.tool_call_id for tc in step.tool_calls} + + for result in step.observation.results: + if result.source_call_id is not None and result.source_call_id not in tool_call_ids: + raise ValueError( + f"Observation result references source_call_id " + f"'{result.source_call_id}' which is not found in " + f"step {step.step_id}'s tool_calls" + ) + return self + + def has_multimodal_content(self) -> bool: + """Check if any step contains multimodal content (images). + + Returns: + True if any step message or observation contains image content parts. + """ + for step in self.steps: + if isinstance(step.message, list): + for part in step.message: + if part.type == "image": + return True + if step.observation: + for result in step.observation.results: + if isinstance(result.content, list): + for part in result.content: + if part.type == "image": + return True + return False diff --git a/sdk/python/nemo-platform/src/nemo_platform/beta/evaluator/values/evidence.py b/sdk/python/nemo-platform/src/nemo_platform/beta/evaluator/values/evidence.py index 244d0dcfcc..686170fc89 100644 --- a/sdk/python/nemo-platform/src/nemo_platform/beta/evaluator/values/evidence.py +++ b/sdk/python/nemo-platform/src/nemo_platform/beta/evaluator/values/evidence.py @@ -8,6 +8,7 @@ import asyncio import difflib import hashlib +import json import os import shutil import signal @@ -18,6 +19,23 @@ from pydantic import BaseModel, ConfigDict, Field, PrivateAttr, model_validator +from nemo_platform.beta.evaluator.values.atif import ( + Agent, + FinalMetrics, + Metrics, + Observation, + ObservationResult, + Step, + ToolCall, + Trajectory, +) + +# Well-known evidence keys produced by the standard trial-evidence builder. The +# string values are the single source of truth (mirrored by the ``EVIDENCE_*`` +# constants in ``agent_eval.trials``); this alias types accessors/builders that +# expect a documented key while still allowing arbitrary extension keys. +WellKnownEvidenceKey = Literal["initial_state", "trace", "logs", "final_state", "verifier_logs"] + class FilesystemEntry(BaseModel): """One path that differs between two filesystem snapshots.""" @@ -320,6 +338,307 @@ def _requires_ref_or_data(self) -> EvidenceDescriptor: return self +# --------------------------------------------------------------------------- # +# ATIF normalization: map producer traces into the canonical ATIF Trajectory. +# The persisted/normalized format is the vendored ATIF schema (see values/atif.py, +# RFC 0001, schema_version ATIF-v1.7). Source traces (NeMo Agent Toolkit +# trajectories, OTel/OpenInference spans) are mapped into a Trajectory so trace +# metrics read one canonical shape regardless of producer. +# --------------------------------------------------------------------------- # +_DEFAULT_AGENT = Agent(name="unknown", version="0") +_VALID_SOURCES = {"system", "user", "agent"} + + +def _coerce_source(value: Any) -> Literal["system", "user", "agent"]: + """Map a producer step source onto an ATIF source, defaulting unknowns to ``agent``.""" + return value if value in _VALID_SOURCES else "agent" + + +def normalize_nat_trajectory(payload: dict[str, Any]) -> Trajectory: + """Map a NeMo Agent Toolkit trajectory (loose ``steps[]``) into an ATIF Trajectory. + + Producer steps may omit ATIF envelope fields (``schema_version``/``agent``) and + per-step ``step_id``; these are synthesized. Tool calls get a synthetic + ``tool_call_id`` when the producer supplies none, and an observation result's + ``source_call_id`` is kept only when it correlates to a tool call in the same + step (otherwise dropped) so the ATIF reference validator passes. + """ + steps: list[Step] = [] + for raw in payload.get("steps", []): + if not isinstance(raw, dict): + continue + source = _coerce_source(raw.get("source")) + tool_calls = _nat_tool_calls(raw, len(steps)) if source == "agent" else None + known_ids = {call.tool_call_id for call in tool_calls} if tool_calls else set() + message = raw.get("message") + steps.append( + Step( + step_id=len(steps) + 1, + source=source, + message=message if isinstance(message, str) else "", + reasoning_content=(raw.get("reasoning_content") if source == "agent" else None), + tool_calls=tool_calls or None, + observation=_nat_observation(raw, known_ids), + ) + ) + return Trajectory( + schema_version="ATIF-v1.7", + agent=_nat_agent(payload), + steps=steps or [Step(step_id=1, source="system", message="")], + final_metrics=_nat_final_metrics(payload), + ) + + +def _nat_tool_calls(raw: dict[str, Any], step_index: int) -> list[ToolCall]: + calls: list[ToolCall] = [] + for call_index, call in enumerate(raw.get("tool_calls", []) or []): + if not isinstance(call, dict): + continue + args = call.get("arguments") + calls.append( + ToolCall( + tool_call_id=call.get("tool_call_id") or f"call_{step_index}_{call_index}", + function_name=call.get("function_name") or "unknown", + arguments=args if isinstance(args, dict) else {}, + ) + ) + return calls + + +def _nat_observation(raw: dict[str, Any], known_ids: set[str]) -> Observation | None: + obs = raw.get("observation") + if not isinstance(obs, dict): + return None + results: list[ObservationResult] = [] + for result in obs.get("results", []) or []: + if not isinstance(result, dict): + continue + source_call_id = result.get("source_call_id") + content = result.get("content") + results.append( + ObservationResult( + source_call_id=source_call_id if source_call_id in known_ids else None, + content=str(content) if content is not None else None, + ) + ) + return Observation(results=results) if results else None + + +def _nat_agent(payload: dict[str, Any]) -> Agent: + agent = payload.get("agent") + if isinstance(agent, dict) and agent.get("name") and agent.get("version"): + return Agent(name=str(agent["name"]), version=str(agent["version"]), model_name=agent.get("model_name")) + return _DEFAULT_AGENT + + +def _nat_final_metrics(payload: dict[str, Any]) -> FinalMetrics | None: + metrics = payload.get("final_metrics") + if not isinstance(metrics, dict): + return None + prompt = metrics.get("prompt_tokens") or metrics.get("input_tokens") + completion = metrics.get("completion_tokens") or metrics.get("output_tokens") + cached = metrics.get("cached_tokens") + if prompt is None and completion is None and cached is None: + return None + return FinalMetrics(total_prompt_tokens=prompt, total_completion_tokens=completion, total_cached_tokens=cached) + + +def normalize_otel_spans(spans: list[dict[str, Any]]) -> Trajectory: + """Normalize OpenTelemetry GenAI spans into an ATIF Trajectory (one agent step per span).""" + return _normalize_spans(spans, _OTEL_KEYS) + + +def normalize_openinference_spans(spans: list[dict[str, Any]]) -> Trajectory: + """Normalize OpenInference spans into an ATIF Trajectory (one agent step per span).""" + return _normalize_spans(spans, _OPENINFERENCE_KEYS) + + +# (kind_attr, tool_kinds, tool_name_attr, prompt_attr, completion_attr) +_OTEL_KEYS = ( + "gen_ai.operation.name", + {"execute_tool", "tool"}, + "gen_ai.tool.name", + "gen_ai.usage.input_tokens", + "gen_ai.usage.output_tokens", +) +_OPENINFERENCE_KEYS = ( + "openinference.span.kind", + {"TOOL"}, + "tool.name", + "llm.token_count.prompt", + "llm.token_count.completion", +) + + +def _normalize_spans(spans: list[dict[str, Any]], keys: tuple[str, set[str], str, str, str]) -> Trajectory: + kind_attr, tool_kinds, name_attr, prompt_attr, completion_attr = keys + steps: list[Step] = [] + for span in spans: + attributes = span.get("attributes", {}) if isinstance(span, dict) else {} + kind = str(attributes.get(kind_attr, "")).lower() + is_tool = any(marker.lower() in kind for marker in tool_kinds) or name_attr in attributes + name = attributes.get(name_attr) or (span.get("name") if isinstance(span, dict) else None) + prompt, completion = attributes.get(prompt_attr), attributes.get(completion_attr) + metrics = Metrics(prompt_tokens=prompt, completion_tokens=completion) if (prompt or completion) else None + tool_calls = ( + [ToolCall(tool_call_id=f"call_{len(steps)}", function_name=str(name or "unknown"), arguments={})] + if is_tool + else None + ) + steps.append( + Step( + step_id=len(steps) + 1, + source="agent", + message="" if is_tool else str(name or ""), + tool_calls=tool_calls, + metrics=metrics, + ) + ) + return Trajectory( + schema_version="ATIF-v1.7", + agent=_DEFAULT_AGENT, + steps=steps or [Step(step_id=1, source="system", message="")], + final_metrics=_aggregate_step_metrics(steps), + ) + + +def _aggregate_step_metrics(steps: list[Step]) -> FinalMetrics | None: + prompt = sum((step.metrics.prompt_tokens or 0) for step in steps if step.metrics is not None) + completion = sum((step.metrics.completion_tokens or 0) for step in steps if step.metrics is not None) + if not prompt and not completion: + return None + return FinalMetrics(total_prompt_tokens=prompt or None, total_completion_tokens=completion or None) + + +def normalize_trace(payload: Any, *, source_format: str | None = None) -> Trajectory: + """Normalize a raw trace payload into a canonical ATIF :class:`Trajectory`. + + ``source_format`` (from the evidence descriptor) routes to a span normalizer + when set to ``otel``/``openinference``; otherwise the payload shape decides: + an already-ATIF document (``schema_version``/``agent``), a NeMo Agent Toolkit + trajectory (``steps``), or a bare span list. Unknown shapes are wrapped in a + minimal valid Trajectory so the persisted artifact is always conformant ATIF. + """ + if isinstance(payload, Trajectory): + return payload + fmt = (source_format or "").lower() + if fmt in {"otel", "opentelemetry"} and isinstance(payload, list): + return normalize_otel_spans(payload) + if fmt == "openinference" and isinstance(payload, list): + return normalize_openinference_spans(payload) + if isinstance(payload, dict): + if "schema_version" in payload or "agent" in payload: + return Trajectory.model_validate(payload) + if "steps" in payload: + return normalize_nat_trajectory(payload) + return Trajectory( + schema_version="ATIF-v1.7", agent=_DEFAULT_AGENT, steps=[Step(step_id=1, source="system", message="")] + ) + + +def _read_trace_payload(descriptor: EvidenceDescriptor) -> Any: + """Load a trace descriptor's raw payload from inline ``data`` or a local ``ref``.""" + if descriptor.data is not None: + return descriptor.data + if descriptor.ref is None: + raise ValueError("trace evidence descriptor requires ref or data") + return json.loads(_local_filesystem_ref(descriptor.ref).read_text(encoding="utf-8")) + + +def normalize_trace_descriptor(descriptor: EvidenceDescriptor) -> EvidenceDescriptor: + """Return ``descriptor`` with its trace payload normalized to ATIF (idempotent). + + Run before a trial is persisted so the stored shape is ATIF regardless of the + ingestion source (inline JSON trajectory, OTEL/OpenInference spans, raw events); + ``format`` becomes ``"atif"``. Non-trace descriptors, already-ATIF traces, and + descriptors with no resolvable payload are returned unchanged. Storage modality + is preserved: inline ``data`` stays inline; a file ``ref`` is normalized into a + sibling ``*.atif.json`` that the returned descriptor points at. + """ + if descriptor.kind != "trace" or descriptor.format == "atif": + return descriptor + if descriptor.ref is not None: + source = _local_filesystem_ref(descriptor.ref) + if not source.is_file(): + return descriptor + trace = normalize_trace(_read_trace_payload(descriptor), source_format=descriptor.format) + atif_path = source.with_name(f"{source.stem}.atif.json") + atif_path.write_text(json.dumps(trace.to_json_dict()), encoding="utf-8") + return descriptor.model_copy(update={"format": "atif", "ref": str(atif_path), "data": None}) + if descriptor.data is None: + return descriptor + trace = normalize_trace(descriptor.data, source_format=descriptor.format) + return descriptor.model_copy(update={"format": "atif", "data": trace.to_json_dict()}) + + +def normalize_candidate_evidence(evidence: CandidateEvidence) -> CandidateEvidence: + """Return a copy of ``evidence`` with every ``kind="trace"`` descriptor ATIF-normalized.""" + descriptors = {name: normalize_trace_descriptor(descriptor) for name, descriptor in evidence.descriptors.items()} + return evidence.model_copy(update={"descriptors": descriptors}) + + +class TraceHandle: + """Lazily normalized read handle over a trace evidence descriptor. + + Exposes the canonical ATIF :class:`~nemo_platform.beta.evaluator.values.atif.Trajectory` + and convenience views over its ``steps[]``. + """ + + def __init__(self, descriptor: EvidenceDescriptor) -> None: + self._descriptor = descriptor + self._trajectory: Trajectory | None = None + + async def trace(self) -> Trajectory: + """Return the normalized ATIF trajectory, reading and parsing on first access.""" + if self._trajectory is None: + payload = await asyncio.to_thread(self._load_payload) + self._trajectory = normalize_trace(payload, source_format=self._descriptor.format) + return self._trajectory + + def _load_payload(self) -> Any: + return _read_trace_payload(self._descriptor) + + async def steps(self) -> list[Step]: + """Return the ATIF steps in order.""" + return (await self.trace()).steps + + async def tool_calls(self) -> list[ToolCall]: + """Return all tool calls flattened across agent steps, in order.""" + calls: list[ToolCall] = [] + for step in await self.steps(): + calls.extend(step.tool_calls or []) + return calls + + async def token_usage(self) -> FinalMetrics: + """Return aggregate token usage (trajectory ``final_metrics``, else summed per step).""" + trajectory = await self.trace() + if trajectory.final_metrics is not None: + return trajectory.final_metrics + prompt = sum((step.metrics.prompt_tokens or 0) for step in trajectory.steps if step.metrics is not None) + completion = sum((step.metrics.completion_tokens or 0) for step in trajectory.steps if step.metrics is not None) + return FinalMetrics(total_prompt_tokens=prompt or None, total_completion_tokens=completion or None) + + +class LogHandle: + """Read handle over a log-bundle directory.""" + + def __init__(self, root: str | Path) -> None: + self._fs = LocalFilesystemEvidence(root) + + async def list_files(self) -> list[str]: + """Return relative paths of log files in the bundle.""" + return await self._fs.list_files("**/*") + + async def read_text(self, name: str) -> str: + """Read one log file's full text.""" + return await self._fs.read_text(name) + + async def tail(self, name: str, lines: int = 50) -> str: + """Return the last ``lines`` lines of a log file.""" + text = await self._fs.read_text(name) + return "\n".join(text.splitlines()[-lines:]) + + class CandidateEvidence(BaseModel): """Named evidence descriptors attached to an AgentEvalAttempt.""" @@ -334,6 +653,8 @@ class CandidateEvidence(BaseModel): description="Free-form metadata associated with the evidence collection.", ) _filesystem_cache: dict[str, LocalFilesystemEvidence] = PrivateAttr(default_factory=dict) + _trace_cache: dict[str, TraceHandle] = PrivateAttr(default_factory=dict) + _log_cache: dict[str, LogHandle] = PrivateAttr(default_factory=dict) @model_validator(mode="before") @classmethod @@ -378,6 +699,31 @@ async def filesystem(self, name: str) -> LocalFilesystemEvidence: self._filesystem_cache[name] = handle return handle + async def trace(self, name: str = "trace") -> TraceHandle: + """Return a cached normalized-trace handle for a named trace descriptor. + + Async for consistency with :meth:`filesystem`/:meth:`logs` and to leave room + for ref staging/validation; the trace itself is read lazily on first access. + """ + cached = self._trace_cache.get(name) + if cached is not None: + return cached + handle = TraceHandle(self.require(name)) + self._trace_cache[name] = handle + return handle + + async def logs(self, name: str = "logs") -> LogHandle: + """Return a cached log-bundle handle for a named logs descriptor.""" + cached = self._log_cache.get(name) + if cached is not None: + return cached + descriptor = self.require(name, kind="logs") + if descriptor.ref is None: + raise ValueError(f"logs evidence descriptor {name!r} requires a local ref") + handle = LogHandle(_local_filesystem_ref(descriptor.ref)) + self._log_cache[name] = handle + return handle + def _local_filesystem_ref(ref: str) -> Path: """Resolve a local filesystem ref to a Path.