Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ The format is based on Keep a Changelog, and this project follows Semantic Versi
- Removed the empty root `.gitkeep` placeholder.

### Fixed
- PR summaries now stay aligned with merge triage test-first actions, and full-fallback PR analysis hides repo-wide unchanged noise from merge triage scoring.
- GitHub Actions example now installs from the GitHub repository while the package is not published on PyPI.

## [0.1.1] - 2026-04-21
Expand Down
1 change: 1 addition & 0 deletions src/ai_risk_manager/pipeline/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -722,6 +722,7 @@ def _stage_analysis(
test_plan,
summary=summary,
analysis_scope=scope.analysis_scope,
changed_files=scope.changed_files,
)
sinks.progress.finish(6, total_steps, "QA strategy agent", t)

Expand Down
41 changes: 41 additions & 0 deletions src/ai_risk_manager/pr_scope.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
from __future__ import annotations

from typing import Protocol

_PR_SCOPED_RULE_IDS = {"ui_journey_smoke_failed"}
_PR_SCOPED_RULE_PREFIXES = ("pr_",)


class FindingLike(Protocol):
rule_id: str
source_ref: str
evidence_refs: list[str]


def normalize_path(path: str) -> str:
normalized = path.replace("\\", "/").strip()
while normalized.startswith("./"):
normalized = normalized[2:]
return normalized


def source_ref_path(source_ref: str) -> str:
parts = source_ref.rsplit(":", 1)
if len(parts) == 2 and parts[1].isdigit():
return parts[0]
return source_ref


def finding_matches_changed_files(finding: FindingLike, changed_files: set[str]) -> bool:
refs = [finding.source_ref, *finding.evidence_refs]
normalized_changed = {normalize_path(path) for path in changed_files}
for ref in refs:
if normalize_path(source_ref_path(ref)) in normalized_changed:
return True
return False


def is_pr_scoped_finding(finding: FindingLike, changed_files: set[str]) -> bool:
if finding.rule_id.startswith(_PR_SCOPED_RULE_PREFIXES) or finding.rule_id in _PR_SCOPED_RULE_IDS:
return True
return bool(changed_files) and finding_matches_changed_files(finding, changed_files)
99 changes: 54 additions & 45 deletions src/ai_risk_manager/reports/generator.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
from __future__ import annotations

from collections import Counter
from collections.abc import Iterable
from pathlib import Path

from ai_risk_manager.pr_scope import is_pr_scoped_finding, normalize_path
from ai_risk_manager.schemas.types import (
Finding,
GitHubCheckPayload,
GitHubCheckConclusion,
PRSummary,
Expand Down Expand Up @@ -36,8 +39,6 @@
"agent_generated_test_missing_negative_path": "Add negative-path coverage for changed write or validation flows.",
"agent_generated_test_nondeterministic_dependency": "Stabilize flaky tests before relying on them in merge decisions.",
}
_PR_SCOPED_RULE_IDS = {"ui_journey_smoke_failed"}
_PR_SCOPED_RULE_PREFIXES = ("pr_",)


def _summary_counts(findings: FindingsReport) -> dict[str, int]:
Expand All @@ -50,42 +51,14 @@ def _summary_counts(findings: FindingsReport) -> dict[str, int]:
}


def _normalize_path(path: str) -> str:
normalized = path.replace("\\", "/").strip()
while normalized.startswith("./"):
normalized = normalized[2:]
return normalized


def _source_ref_path(source_ref: str) -> str:
parts = source_ref.rsplit(":", 1)
if len(parts) == 2 and parts[1].isdigit():
return parts[0]
return source_ref


def _finding_matches_changed_files(finding, changed_files: set[str]) -> bool:
refs = [finding.source_ref, *finding.evidence_refs]
for ref in refs:
if _normalize_path(_source_ref_path(ref)) in changed_files:
return True
return False


def _is_pr_scoped_finding(finding, changed_files: set[str]) -> bool:
if finding.rule_id.startswith(_PR_SCOPED_RULE_PREFIXES) or finding.rule_id in _PR_SCOPED_RULE_IDS:
return True
return bool(changed_files) and _finding_matches_changed_files(finding, changed_files)


def _cap_repo_wide_repeated_findings(findings, changed_files: set[str]):
if not changed_files:
return list(findings)

capped = []
repo_wide_rule_counts: dict[str, int] = {}
for finding in findings:
if not _is_pr_scoped_finding(finding, changed_files):
if not is_pr_scoped_finding(finding, changed_files):
repo_wide_rule_counts[finding.rule_id] = repo_wide_rule_counts.get(finding.rule_id, 0) + 1
if repo_wide_rule_counts[finding.rule_id] > 1:
continue
Expand All @@ -94,11 +67,11 @@ def _cap_repo_wide_repeated_findings(findings, changed_files: set[str]):


def _rank_findings(findings, *, changed_files: set[str] | None = None, prefer_pr_scope: bool = False):
normalized_changed_files = {_normalize_path(path) for path in (changed_files or set())}
normalized_changed_files = {normalize_path(path) for path in (changed_files or set())}
return sorted(
findings,
key=lambda f: (
0 if prefer_pr_scope and _is_pr_scoped_finding(f, normalized_changed_files) else 1,
0 if prefer_pr_scope and is_pr_scoped_finding(f, normalized_changed_files) else 1,
SEVERITY_INDEX.get(f.severity, len(SEVERITY_ORDER)),
CONFIDENCE_ORDER.get(f.confidence, 3),
-len(f.evidence_refs),
Expand Down Expand Up @@ -190,6 +163,22 @@ def _pr_summary_actions(result: PipelineResult, ranked_findings) -> list[PRSumma
return actions


def _merge_triage_action_findings(result: PipelineResult) -> list[Finding]:
action_finding_ids = {action.finding_id for action in result.merge_triage.actions}
return [finding for finding in result.findings.findings if finding.id in action_finding_ids]


def _dedupe_findings(findings: Iterable[Finding]) -> list[Finding]:
deduped: list[Finding] = []
seen: set[str] = set()
for finding in findings:
if finding.id in seen:
continue
deduped.append(finding)
seen.add(finding.id)
return deduped


def _format_profiles(summary: PRSummary) -> str:
if not summary.profiles:
return "none"
Expand All @@ -204,8 +193,24 @@ def _format_trust(finding) -> str | None:
return f"{trust_band} ({trust_score:.2f})"


def _report_focus_findings(result: PipelineResult) -> list[Finding]:
if result.analysis_scope == "full_fallback" and result.merge_triage.actions:
return _merge_triage_action_findings(result)
return list(result.findings.findings)


def _report_focus_test_items(result: PipelineResult):
if result.analysis_scope != "full_fallback" or not result.merge_triage.actions:
return list(result.test_plan.items)

action_finding_ids = {action.finding_id for action in result.merge_triage.actions}
return [item for item in result.test_plan.items if item.finding_id in action_finding_ids]


def render_report_md(result: PipelineResult, notes: list[str]) -> str:
counts = _summary_counts(result.findings)
focus_findings = _report_focus_findings(result)
focus_test_items = _report_focus_test_items(result)
counts = _summary_counts(FindingsReport(findings=focus_findings, generated_without_llm=result.findings.generated_without_llm))

lines: list[str] = []
lines.append("# Risk Analysis Report")
Expand Down Expand Up @@ -248,6 +253,8 @@ def render_report_md(result: PipelineResult, notes: list[str]) -> str:
f"`{len(result.deterministic_graph.nodes)} nodes`, `{len(result.deterministic_graph.edges)} edges`"
)
lines.append(f"- Suppressed findings: `{result.suppressed_count}`")
if len(focus_findings) != len(result.findings.findings):
lines.append(f"- Detailed findings listed: `{len(result.findings.findings)}`")
lines.append(f"- Run metric (precision proxy): `{result.run_metrics.precision_proxy:.2%}`")
lines.append(f"- Run metric (actionability proxy): `{result.run_metrics.actionability_proxy:.2%}`")
lines.append(f"- Run metric (verification pass rate): `{result.summary.verification_pass_rate:.2%}`")
Expand Down Expand Up @@ -278,36 +285,37 @@ def render_report_md(result: PipelineResult, notes: list[str]) -> str:
lines.append("")
lines.append("## Why This Matters for Release Risk")
lines.append("")
if not result.findings.findings:
lines.append("No high-signal release risks detected in current scope.")
if not focus_findings:
lines.append("No high-signal release risks detected in current triage scope.")
else:
top_severity = sorted(
result.findings.findings,
focus_findings,
key=lambda f: SEVERITY_INDEX.get(f.severity, len(SEVERITY_ORDER)),
)[0].severity
scope_label = "changed-file/PR-scoped" if result.analysis_scope == "full_fallback" else "active"
lines.append(
f"Detected `{len(result.findings.findings)}` active risk(s). "
f"Detected `{len(focus_findings)}` {scope_label} risk(s). "
f"Highest severity is `{top_severity}`, which can impact release confidence if ignored."
)

lines.append("")
lines.append("## Top Actions for Next Sprint")
lines.append("")
if not result.findings.findings:
if not focus_findings:
lines.append("No immediate actions required.")
else:
actions = _rank_findings(result.findings.findings)[:5]
actions = _rank_findings(focus_findings)[:5]
for finding in actions:
lines.append(f"- Action: {finding.recommendation}")
lines.append(f" Expected impact: reduce `{finding.rule_id}` risk around `{finding.source_ref}`.")

lines.append("")
lines.append("## Top Risks")
lines.append("")
if not result.findings.findings:
if not focus_findings:
lines.append("No risks detected in current scope.")
else:
top = _rank_findings(result.findings.findings)[:5]
top = _rank_findings(focus_findings)[:5]
for finding in top:
lines.append(f"### {finding.title}")
lines.append(f"- Severity: `{finding.severity}`")
Expand Down Expand Up @@ -338,10 +346,10 @@ def render_report_md(result: PipelineResult, notes: list[str]) -> str:
lines.append("")
lines.append("## Recommended Test Strategy")
lines.append("")
if not result.test_plan.items:
if not focus_test_items:
lines.append("No additional test recommendations.")
else:
for recommendation in result.test_plan.items:
for recommendation in focus_test_items:
lines.append(
f"- [{recommendation.priority}] {recommendation.recommendation} "
f"(source: `{recommendation.source_ref}`)"
Expand Down Expand Up @@ -382,7 +390,8 @@ def build_pr_summary(
for finding in top_candidates
if finding.status == "new" and SEVERITY_INDEX.get(finding.severity, len(SEVERITY_ORDER)) <= min_rank
]
normalized_changed_files = {_normalize_path(path) for path in (changed_files or set())}
top_candidates = _dedupe_findings([*top_candidates, *_merge_triage_action_findings(result)])
normalized_changed_files = {normalize_path(path) for path in (changed_files or set())}
ranked_top_candidates = _rank_findings(
top_candidates,
changed_files=normalized_changed_files,
Expand Down
36 changes: 31 additions & 5 deletions src/ai_risk_manager/triage/merge.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

from dataclasses import replace

from ai_risk_manager.pr_scope import is_pr_scoped_finding, normalize_path
from ai_risk_manager.schemas.types import (
AnalysisScope,
CIMode,
Expand Down Expand Up @@ -142,6 +143,19 @@ def _risk_score(findings: list[Finding]) -> int:
return min(100, sum(top_scores))


def _triage_candidates(
findings: list[Finding],
*,
analysis_scope: AnalysisScope,
changed_files: set[str] | None,
) -> list[Finding]:
if analysis_scope != "full_fallback" or changed_files is None:
return list(findings)

normalized_changed_files = {normalize_path(path) for path in changed_files}
return [finding for finding in findings if is_pr_scoped_finding(finding, normalized_changed_files)]


def _resolve_decision(
findings: list[Finding],
*,
Expand Down Expand Up @@ -194,6 +208,7 @@ def _decision_reasons(
analysis_scope: AnalysisScope,
repository_support_state: RepositorySupportState,
summary: RunSummary,
hidden_fallback_finding_count: int = 0,
) -> list[str]:
reasons: list[str] = []
new_high_or_critical = [
Expand All @@ -203,6 +218,10 @@ def _decision_reasons(
reasons.append(f"{len(new_high_or_critical)} new high/critical release-risk finding(s) in current scope.")
if analysis_scope == "full_fallback":
reasons.append("PR impact mapping fell back to full scan, so changed-file risk attribution is weaker.")
if hidden_fallback_finding_count:
reasons.append(
f"{hidden_fallback_finding_count} repo-wide finding(s) hidden from merge triage because they do not match changed files."
)
if analysis_scope == "full" and any(finding.severity in {"critical", "high"} for finding in findings):
reasons.append("Full repository scan found high/critical release-risk signals.")
if repository_support_state != "supported":
Expand All @@ -212,7 +231,10 @@ def _decision_reasons(
if summary.verification_pass_rate < 1.0:
reasons.append(f"Verification pass rate is `{summary.verification_pass_rate:.0%}`.")
if not findings:
reasons.append("No findings survived evidence, policy, suppression, and confidence filters.")
if analysis_scope == "full_fallback" and hidden_fallback_finding_count:
reasons.append("No changed-file release-risk finding survived fallback filters.")
else:
reasons.append("No findings survived evidence, policy, suppression, and confidence filters.")
return reasons


Expand All @@ -222,15 +244,18 @@ def build_merge_triage(
*,
summary: RunSummary,
analysis_scope: AnalysisScope,
changed_files: set[str] | None = None,
) -> MergeTriage:
ranked = _rank_findings(findings.findings)
candidates = _triage_candidates(findings.findings, analysis_scope=analysis_scope, changed_files=changed_files)
hidden_fallback_finding_count = len(findings.findings) - len(candidates)
ranked = _rank_findings(candidates)
actions = _budgeted_actions(ranked, test_plan)
risk_score = _risk_score(ranked)
new_high_or_critical_count = sum(
1 for finding in findings.findings if finding.status == "new" and finding.severity in {"critical", "high"}
1 for finding in candidates if finding.status == "new" and finding.severity in {"critical", "high"}
)
decision = _resolve_decision(
findings.findings,
candidates,
analysis_scope=analysis_scope,
repository_support_state=summary.repository_support_state,
effective_ci_mode=summary.effective_ci_mode,
Expand All @@ -247,10 +272,11 @@ def build_merge_triage(
verification_pass_rate=summary.verification_pass_rate,
evidence_completeness=summary.evidence_completeness,
reasons=_decision_reasons(
findings.findings,
candidates,
analysis_scope=analysis_scope,
repository_support_state=summary.repository_support_state,
summary=summary,
hidden_fallback_finding_count=hidden_fallback_finding_count,
),
actions=actions,
generated_without_llm=findings.generated_without_llm and test_plan.generated_without_llm,
Expand Down
Loading
Loading