diff --git a/Cargo.lock b/Cargo.lock index e5845df0..a92097a4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -31,6 +31,23 @@ dependencies = [ "memchr", ] +[[package]] +name = "aisix-a2a" +version = "0.1.0" +dependencies = [ + "aisix-core", + "async-trait", + "axum", + "futures", + "http 1.4.0", + "reqwest 0.12.28", + "serde", + "serde_json", + "thiserror 1.0.69", + "tokio", + "tracing", +] + [[package]] name = "aisix-admin" version = "0.1.0" @@ -312,6 +329,7 @@ dependencies = [ name = "aisix-proxy" version = "0.1.0" dependencies = [ + "aisix-a2a", "aisix-cache", "aisix-core", "aisix-gateway", diff --git a/Cargo.toml b/Cargo.toml index d3dc6e60..2d226a9c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,7 @@ [workspace] resolver = "2" members = [ + "crates/aisix-a2a", "crates/aisix-core", "crates/aisix-etcd", "crates/aisix-gateway", diff --git a/crates/aisix-a2a/Cargo.toml b/crates/aisix-a2a/Cargo.toml new file mode 100644 index 00000000..5c2e952c --- /dev/null +++ b/crates/aisix-a2a/Cargo.toml @@ -0,0 +1,29 @@ +[package] +name = "aisix-a2a" +version.workspace = true +edition.workspace = true +rust-version.workspace = true +license.workspace = true +repository.workspace = true +authors.workspace = true +description = "aisix: A2A (Agent-to-Agent) gateway — upstream JSON-RPC client behind the A2aBridge trait" + +[dependencies] +aisix-core = { path = "../aisix-core" } +tokio.workspace = true +async-trait.workspace = true +futures.workspace = true +serde.workspace = true +serde_json.workspace = true +thiserror.workspace = true +tracing.workspace = true +http.workspace = true +# A2A has no official Rust SDK (the reference SDKs are Python/JS/Java/Go/.NET), +# so the JSON-RPC 2.0 + agent-card plumbing is hand-rolled directly on the +# workspace HTTP client rather than pulled from an SDK. +reqwest.workspace = true + +[dev-dependencies] +aisix-core = { path = "../aisix-core" } +tokio = { workspace = true, features = ["macros", "rt-multi-thread"] } +axum.workspace = true diff --git a/crates/aisix-a2a/src/bridge.rs b/crates/aisix-a2a/src/bridge.rs new file mode 100644 index 00000000..020b4ff5 --- /dev/null +++ b/crates/aisix-a2a/src/bridge.rs @@ -0,0 +1,386 @@ +//! The upstream A2A client, behind the [`A2aBridge`] trait. +//! +//! A bridge targets one upstream agent and exposes just the two operations the +//! gateway needs in this first cut: fetch the agent's card, and forward a +//! JSON-RPC request to it. Aggregating bridges behind the downstream-facing +//! `/a2a/` endpoint, agent-card URL rewriting, and wiring into the +//! shared guardrail/quota pipeline come in later steps — this layer only proves +//! a governed tunnel to one real upstream. +//! +//! The upstream credential is held here on the gateway side and is never +//! exposed to the calling client, which presents only its AISIX key. +//! +//! Wire references (verified against the A2A specification): +//! - Agent card discovery: `https://{domain}/.well-known/agent-card.json`, +//! an RFC 8615 well-known URI resolved at the domain origin. +//! +//! - `message/send` is a JSON-RPC 2.0 method whose envelope differs between the +//! A2A 0.3 and 1.0 wire formats. This bridge forwards the caller's request +//! verbatim and does not translate between versions, so the method name and +//! body shape are the caller's concern, not this layer's. +//! + +use std::sync::OnceLock; +use std::time::Duration; + +use aisix_core::{A2aAgent, A2aAuthType}; +use async_trait::async_trait; +use futures::StreamExt; +use serde::{Deserialize, Serialize}; + +use crate::error::A2aError; + +/// Default deadline for a single upstream operation (card fetch or send). +/// reqwest has no default request timeout, so without this a hung or slow +/// upstream pins the gateway request task indefinitely. Overridable per +/// upstream via the `A2aAgent.timeout_ms` field. +pub const DEFAULT_UPSTREAM_TIMEOUT: Duration = Duration::from_secs(30); + +/// Header carrying the gateway-held key for `api_key` upstream auth. +const API_KEY_HEADER: &str = "x-api-key"; + +/// Standard RFC 8615 well-known path for an A2A agent card. +const AGENT_CARD_PATH: &str = "/.well-known/agent-card.json"; + +/// Hard cap on an upstream response body the gateway will buffer. A registered +/// agent is semi-trusted, but a compromised or misbehaving one must not be able +/// to OOM the gateway with a multi-gigabyte (or unbounded streaming) response. +/// Generous for a JSON-RPC task result; a per-agent override can be added later. +const MAX_UPSTREAM_BODY_BYTES: usize = 16 * 1024 * 1024; + +/// The shared outbound HTTP client. Built once (a `reqwest::Client` is a +/// connection-pool handle — cloning is cheap and shares the pool) so every +/// upstream call reuses connections instead of standing up a fresh pool + TLS +/// handshake per request. +/// +/// Redirects are refused: the data plane runs inside the customer's VPC, and a +/// compromised or MITM'd upstream returning `302 Location: http://169.254.169.254/…` +/// (or a loopback address) would otherwise turn the gateway into an SSRF pivot. +/// A legitimate A2A agent does not redirect its JSON-RPC endpoint or card. This +/// mirrors the MCP OAuth client, which refuses redirects for the same reason. +fn shared_client() -> reqwest::Client { + static CLIENT: OnceLock = OnceLock::new(); + CLIENT + .get_or_init(|| { + reqwest::Client::builder() + .redirect(reqwest::redirect::Policy::none()) + .build() + .expect("reqwest client (redirect-disabled) builds") + }) + .clone() +} + +/// Read an upstream response body, refusing anything larger than +/// [`MAX_UPSTREAM_BODY_BYTES`]. An honestly-declared oversized `Content-Length` +/// is rejected up front; a lying or absent length (including a never-ending +/// stream) is caught as chunks accumulate, so the buffer can never exceed the +/// cap regardless of what the upstream claims. +async fn read_capped(resp: reqwest::Response) -> Result, A2aError> { + if let Some(len) = resp.content_length() { + if len > MAX_UPSTREAM_BODY_BYTES as u64 { + return Err(A2aError::Request(format!( + "upstream response too large: {len} bytes" + ))); + } + } + let mut stream = resp.bytes_stream(); + let mut buf: Vec = Vec::new(); + while let Some(chunk) = stream.next().await { + let chunk = chunk.map_err(|e| A2aError::Request(e.to_string()))?; + if buf.len() + chunk.len() > MAX_UPSTREAM_BODY_BYTES { + return Err(A2aError::Request( + "upstream response exceeded size cap".to_string(), + )); + } + buf.extend_from_slice(&chunk); + } + Ok(buf) +} + +/// How the gateway authenticates to an upstream A2A agent. The credential is +/// held here on the gateway side and is never exposed to the calling client — +/// the client presents only its AISIX key. +/// +/// The `oauth2` upstream auth type is accepted on the [`A2aAgent`] resource for +/// forward compatibility but is not yet implemented in this runtime; +/// [`upstream_from_a2a_agent`] rejects it with [`A2aError::Unsupported`]. +#[derive(Clone)] +pub enum A2aAuth { + /// No upstream auth — the agent is reachable as-is. + None, + /// Send `Authorization: Bearer ` on every upstream request. The + /// token is the raw value, without the `Bearer ` prefix. + Bearer(String), + /// Send `x-api-key: ` on every upstream request. + ApiKey(String), +} + +// Hand-written so the gateway-held credential never lands in logs via `{:?}`. +// This crate is the credential holder; a derived `Debug` would print the token +// in plaintext the moment any caller logs an upstream. +impl std::fmt::Debug for A2aAuth { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + A2aAuth::None => f.write_str("None"), + A2aAuth::Bearer(_) => f.write_str("Bearer(***redacted***)"), + A2aAuth::ApiKey(_) => f.write_str("ApiKey(***redacted***)"), + } + } +} + +/// Connection parameters for a single upstream A2A agent. +#[derive(Clone)] +pub struct A2aUpstream { + /// The agent's A2A service endpoint, where JSON-RPC requests are sent, e.g. + /// `https://agents.example.com/a2a`. The agent card is discovered at the + /// well-known path relative to this URL's origin. + pub url: String, + /// Upstream authentication, held gateway-side. + pub auth: A2aAuth, + /// Per-operation deadline. Defaults to [`DEFAULT_UPSTREAM_TIMEOUT`]. + pub timeout: Duration, +} + +// Manual so a `Bearer` token cannot leak through `A2aUpstream`'s `Debug` +// (delegates to the redacting `A2aAuth` impl above). +impl std::fmt::Debug for A2aUpstream { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("A2aUpstream") + .field("url", &self.url) + .field("auth", &self.auth) + .field("timeout", &self.timeout) + .finish() + } +} + +/// Build an [`A2aUpstream`] from a registered [`A2aAgent`] resource. +/// +/// Returns [`A2aError::Unsupported`] for the `oauth2` auth type, which this +/// runtime does not implement yet. +pub fn upstream_from_a2a_agent(agent: &A2aAgent) -> Result { + let secret = agent.secret.clone().unwrap_or_default(); + let auth = match agent.auth_type { + A2aAuthType::None => A2aAuth::None, + A2aAuthType::Bearer => A2aAuth::Bearer(secret), + A2aAuthType::ApiKey => A2aAuth::ApiKey(secret), + A2aAuthType::OAuth2 => { + return Err(A2aError::Unsupported( + "oauth2 upstream auth is not yet implemented".to_string(), + )) + } + }; + let timeout = agent + .timeout_ms + .map(Duration::from_millis) + .unwrap_or(DEFAULT_UPSTREAM_TIMEOUT); + Ok(A2aUpstream { + url: agent.url.clone(), + auth, + timeout, + }) +} + +/// An upstream agent's card, as fetched from its well-known URI. +/// +/// Only the fields the gateway acts on are named; every other field (skills, +/// capabilities, version, security schemes, …) is preserved in [`Self::rest`] +/// so the card can be re-serialized losslessly when the `/a2a` endpoint rewrites +/// the `url` to point at the gateway. +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +pub struct AgentCard { + /// The agent's advertised name. + pub name: String, + /// The A2A service endpoint the agent advertises for itself. + pub url: String, + /// Every other agent-card field, preserved verbatim for lossless round-trip. + #[serde(flatten)] + pub rest: serde_json::Map, +} + +/// A governed client tunnel to a single upstream A2A agent. +#[async_trait] +pub trait A2aBridge: Send + Sync { + /// Fetch and parse the upstream agent's card from its well-known URI. + async fn fetch_agent_card(&self) -> Result; + + /// Forward a JSON-RPC 2.0 request (such as `message/send`) to the upstream + /// service endpoint and return its JSON-RPC response verbatim. + async fn send(&self, request: &serde_json::Value) -> Result; +} + +/// The default [`A2aBridge`], built on the workspace HTTP client. +#[derive(Debug)] +pub struct HttpBridge { + upstream: A2aUpstream, + client: reqwest::Client, +} + +impl HttpBridge { + /// Build a bridge for one upstream agent. Reuses the shared, redirect-free + /// HTTP client (see [`shared_client`]); the per-agent timeout is applied + /// per-request, so a shared client does not lose the per-agent deadline. + pub fn new(upstream: A2aUpstream) -> Self { + Self { + upstream, + client: shared_client(), + } + } + + /// Apply the gateway-held upstream credential to an outgoing request. + fn apply_auth(&self, req: reqwest::RequestBuilder) -> reqwest::RequestBuilder { + match &self.upstream.auth { + A2aAuth::None => req, + A2aAuth::Bearer(token) => req.bearer_auth(token), + A2aAuth::ApiKey(key) => req.header(API_KEY_HEADER, key), + } + } + + /// Resolve the agent-card well-known URI from the service endpoint's origin + /// (RFC 8615): scheme + host + port, with the well-known path. + fn agent_card_url(&self) -> Result { + let mut url = reqwest::Url::parse(&self.upstream.url) + .map_err(|e| A2aError::Connect(format!("invalid upstream url: {e}")))?; + url.set_path(AGENT_CARD_PATH); + url.set_query(None); + Ok(url) + } +} + +#[async_trait] +impl A2aBridge for HttpBridge { + async fn fetch_agent_card(&self) -> Result { + let url = self.agent_card_url()?; + let resp = self + .apply_auth(self.client.get(url).timeout(self.upstream.timeout)) + .send() + .await + .map_err(|e| A2aError::Connect(e.to_string()))?; + if !resp.status().is_success() { + return Err(A2aError::Connect(format!( + "agent card fetch returned HTTP {}", + resp.status().as_u16() + ))); + } + let bytes = read_capped(resp).await?; + serde_json::from_slice::(&bytes) + .map_err(|e| A2aError::Request(format!("malformed agent card: {e}"))) + } + + async fn send(&self, request: &serde_json::Value) -> Result { + let resp = self + .apply_auth( + self.client + .post(&self.upstream.url) + .timeout(self.upstream.timeout) + .json(request), + ) + .send() + .await + .map_err(|e| A2aError::Connect(e.to_string()))?; + if !resp.status().is_success() { + // Surface the upstream STATUS only — never proxy the upstream error + // body verbatim to the caller, which could leak upstream internals. + return Err(A2aError::Request(format!( + "upstream returned HTTP {}", + resp.status().as_u16() + ))); + } + let bytes = read_capped(resp).await?; + serde_json::from_slice::(&bytes) + .map_err(|e| A2aError::Request(format!("malformed JSON-RPC response: {e}"))) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + fn agent(auth_type: &str) -> A2aAgent { + serde_json::from_str(&format!( + r#"{{"display_name":"a","url":"https://x/a2a","auth_type":"{auth_type}","secret":"s"}}"# + )) + .unwrap() + } + + #[test] + fn upstream_maps_none_bearer_api_key() { + let mut none = agent("none"); + none.secret = None; + assert!(matches!( + upstream_from_a2a_agent(&none).unwrap().auth, + A2aAuth::None + )); + assert!(matches!( + upstream_from_a2a_agent(&agent("bearer")).unwrap().auth, + A2aAuth::Bearer(_) + )); + assert!(matches!( + upstream_from_a2a_agent(&agent("api_key")).unwrap().auth, + A2aAuth::ApiKey(_) + )); + } + + #[test] + fn upstream_rejects_oauth2_as_unsupported() { + let err = upstream_from_a2a_agent(&agent("oauth2")).unwrap_err(); + assert!(matches!(err, A2aError::Unsupported(_))); + } + + #[test] + fn upstream_honours_timeout_ms() { + let mut a = agent("none"); + a.timeout_ms = Some(1234); + assert_eq!( + upstream_from_a2a_agent(&a).unwrap().timeout, + Duration::from_millis(1234) + ); + assert_eq!( + upstream_from_a2a_agent(&agent("none")).unwrap().timeout, + DEFAULT_UPSTREAM_TIMEOUT + ); + } + + #[test] + fn auth_debug_redacts_credentials() { + assert_eq!( + format!("{:?}", A2aAuth::Bearer("tok".into())), + "Bearer(***redacted***)" + ); + assert_eq!( + format!("{:?}", A2aAuth::ApiKey("k".into())), + "ApiKey(***redacted***)" + ); + // A bearer token must not leak through the upstream's Debug either. + let up = A2aUpstream { + url: "https://x/a2a".into(), + auth: A2aAuth::Bearer("super-secret".into()), + timeout: DEFAULT_UPSTREAM_TIMEOUT, + }; + assert!(!format!("{up:?}").contains("super-secret")); + } + + #[test] + fn agent_card_url_is_origin_well_known() { + let bridge = HttpBridge::new(A2aUpstream { + url: "https://agents.example.com/a2a/v1".into(), + auth: A2aAuth::None, + timeout: DEFAULT_UPSTREAM_TIMEOUT, + }); + assert_eq!( + bridge.agent_card_url().unwrap().as_str(), + "https://agents.example.com/.well-known/agent-card.json" + ); + } + + #[test] + fn agent_card_round_trips_unknown_fields() { + let card: AgentCard = serde_json::from_str( + r#"{"name":"Contract Reviewer","url":"https://x/a2a","version":"2.1.0","skills":[{"id":"s1"}]}"#, + ) + .unwrap(); + assert_eq!(card.name, "Contract Reviewer"); + let back = serde_json::to_value(&card).unwrap(); + assert_eq!(back["version"], "2.1.0"); + assert_eq!(back["skills"][0]["id"], "s1"); + } +} diff --git a/crates/aisix-a2a/src/error.rs b/crates/aisix-a2a/src/error.rs new file mode 100644 index 00000000..3f28f7f7 --- /dev/null +++ b/crates/aisix-a2a/src/error.rs @@ -0,0 +1,27 @@ +//! Typed errors for the A2A upstream client. +//! +//! Deliberately coarse for the first cut: the gateway only needs to +//! distinguish "could not reach the upstream agent" from "the request reached +//! it but failed" so the proxy layer can pick a client-visible status. Finer +//! mapping (per-JSON-RPC-error-code) lands when the `/a2a` endpoint wires A2A +//! traffic into the shared error-translation path. + +/// Error surfaced by an [`crate::A2aBridge`]. +#[derive(Debug, thiserror::Error)] +pub enum A2aError { + /// The upstream agent could not be reached — DNS/TCP/TLS failure, timeout, + /// or a non-success HTTP status when fetching the agent card. Non-retryable + /// without operator action. + #[error("failed to reach upstream A2A agent: {0}")] + Connect(String), + + /// The upstream was reached but the request failed — a non-success HTTP + /// status on the JSON-RPC call, or a malformed response body. + #[error("upstream A2A request failed: {0}")] + Request(String), + + /// The configured upstream authentication method is not yet supported by + /// this data-plane runtime. + #[error("unsupported A2A upstream auth: {0}")] + Unsupported(String), +} diff --git a/crates/aisix-a2a/src/lib.rs b/crates/aisix-a2a/src/lib.rs new file mode 100644 index 00000000..08c9123d --- /dev/null +++ b/crates/aisix-a2a/src/lib.rs @@ -0,0 +1,28 @@ +//! aisix-a2a — the A2A (Agent-to-Agent) gateway data-plane crate. +//! +//! First step: a governed client tunnel to a single upstream A2A agent over +//! HTTP + JSON-RPC 2.0, exposed through the [`A2aBridge`] trait. The bridge +//! fetches the agent's card (RFC 8615 well-known URI) and forwards JSON-RPC +//! requests to its service endpoint, holding the upstream credential so the +//! calling client never sees it. Aggregating the downstream-facing +//! `/a2a/` endpoint, agent-card URL rewriting, and wiring into the +//! shared guardrail/quota pipeline come in later steps — this layer only proves +//! a governed tunnel to one real upstream. +//! +//! Unlike the MCP gateway, there is no official A2A Rust SDK, so the JSON-RPC +//! plumbing is hand-rolled here on the workspace HTTP client. The bridge +//! forwards requests verbatim and does NOT translate between the A2A 0.3 and +//! 1.0 wire formats — version normalization is a later step; a single agent is +//! reached in whichever version it speaks (pinned on the `A2aAgent` resource). + +#![forbid(unsafe_code)] +#![deny(rust_2018_idioms)] + +pub mod bridge; +pub mod error; + +pub use bridge::{ + upstream_from_a2a_agent, A2aAuth, A2aBridge, A2aUpstream, AgentCard, HttpBridge, + DEFAULT_UPSTREAM_TIMEOUT, +}; +pub use error::A2aError; diff --git a/crates/aisix-a2a/tests/upstream_roundtrip.rs b/crates/aisix-a2a/tests/upstream_roundtrip.rs new file mode 100644 index 00000000..39c54464 --- /dev/null +++ b/crates/aisix-a2a/tests/upstream_roundtrip.rs @@ -0,0 +1,185 @@ +//! Roundtrip against a real (locally spawned) upstream A2A agent. +//! +//! Proves the governed tunnel end to end over real HTTP — no mocked network: +//! the bridge discovers the agent card at the RFC 8615 well-known URI, forwards +//! a JSON-RPC `message/send`, and the gateway-held upstream credential reaches +//! the upstream (and only the upstream) while an unauthenticated bridge sends +//! no credential at all. + +use std::net::SocketAddr; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::Arc; + +use aisix_a2a::{A2aAuth, A2aBridge, A2aError, A2aUpstream, HttpBridge, DEFAULT_UPSTREAM_TIMEOUT}; +use axum::http::header::LOCATION; +use axum::http::{HeaderMap, StatusCode}; +use axum::response::IntoResponse; +use axum::routing::{get, post}; +use axum::{Json, Router}; +use serde_json::{json, Value}; + +/// A minimal upstream A2A agent: serves its card at the well-known URI and +/// answers JSON-RPC by echoing back the request id and the credentials it saw, +/// so the test can assert what the gateway forwarded. +async fn spawn_agent() -> SocketAddr { + async fn card() -> Json { + Json(json!({ + "name": "Test Agent", + "url": "https://upstream.example.com/a2a", + "version": "1.0.0", + "skills": [{"id": "echo", "name": "Echo"}] + })) + } + + async fn rpc(headers: HeaderMap, Json(body): Json) -> Json { + let auth = headers + .get("authorization") + .and_then(|v| v.to_str().ok()) + .map(str::to_string); + let api_key = headers + .get("x-api-key") + .and_then(|v| v.to_str().ok()) + .map(str::to_string); + Json(json!({ + "jsonrpc": "2.0", + "id": body["id"].clone(), + "result": { + "kind": "task", + "id": "task-1", + "status": {"state": "completed"}, + "echoed_auth": auth, + "echoed_api_key": api_key, + } + })) + } + + let app = Router::new() + .route("/.well-known/agent-card.json", get(card)) + .route("/a2a", post(rpc)); + let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap(); + let addr = listener.local_addr().unwrap(); + tokio::spawn(async move { + axum::serve(listener, app.into_make_service()) + .await + .unwrap(); + }); + addr +} + +fn message_send(id: &str) -> Value { + json!({ + "jsonrpc": "2.0", + "id": id, + "method": "message/send", + "params": { + "message": { + "role": "user", + "parts": [{"kind": "text", "text": "hello"}], + "messageId": "m1" + } + } + }) +} + +#[tokio::test] +async fn fetches_card_and_forwards_bearer() { + let addr = spawn_agent().await; + let bridge = HttpBridge::new(A2aUpstream { + url: format!("http://{addr}/a2a"), + auth: A2aAuth::Bearer("tok-123".into()), + timeout: DEFAULT_UPSTREAM_TIMEOUT, + }); + + let card = bridge.fetch_agent_card().await.unwrap(); + assert_eq!(card.name, "Test Agent"); + // Unknown fields survive the round-trip (needed for later URL rewriting). + assert_eq!(card.rest["version"], "1.0.0"); + + let resp = bridge.send(&message_send("req-1")).await.unwrap(); + assert_eq!(resp["id"], "req-1", "JSON-RPC id must round-trip"); + assert_eq!(resp["result"]["id"], "task-1"); + // The gateway-held bearer reached the upstream. + assert_eq!(resp["result"]["echoed_auth"], "Bearer tok-123"); +} + +#[tokio::test] +async fn forwards_api_key_header() { + let addr = spawn_agent().await; + let bridge = HttpBridge::new(A2aUpstream { + url: format!("http://{addr}/a2a"), + auth: A2aAuth::ApiKey("k-secret".into()), + timeout: DEFAULT_UPSTREAM_TIMEOUT, + }); + + let resp = bridge.send(&message_send("req-2")).await.unwrap(); + assert_eq!(resp["result"]["echoed_api_key"], "k-secret"); + // api_key auth must not also mint an Authorization header. + assert!(resp["result"]["echoed_auth"].is_null()); +} + +#[tokio::test] +async fn sends_no_credential_when_none() { + let addr = spawn_agent().await; + let bridge = HttpBridge::new(A2aUpstream { + url: format!("http://{addr}/a2a"), + auth: A2aAuth::None, + timeout: DEFAULT_UPSTREAM_TIMEOUT, + }); + + let resp = bridge.send(&message_send("req-3")).await.unwrap(); + assert!(resp["result"]["echoed_auth"].is_null()); + assert!(resp["result"]["echoed_api_key"].is_null()); +} + +/// An upstream that answers `/redirect` with `302 -> /secret` and counts hits +/// on `/secret`. Lets a test prove the gateway does NOT chase the redirect — +/// otherwise a compromised agent could pivot the VPC-internal DP into an SSRF. +async fn spawn_redirect_probe() -> (SocketAddr, Arc) { + let hits = Arc::new(AtomicUsize::new(0)); + let secret_hits = hits.clone(); + let app = Router::new() + .route( + "/redirect", + post(|| async { (StatusCode::FOUND, [(LOCATION, "/secret")]).into_response() }), + ) + .route( + "/secret", + get(move || { + let h = secret_hits.clone(); + async move { + h.fetch_add(1, Ordering::SeqCst); + Json(json!({"jsonrpc": "2.0", "result": {"leaked": true}})) + } + }), + ); + let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap(); + let addr = listener.local_addr().unwrap(); + tokio::spawn(async move { + axum::serve(listener, app.into_make_service()) + .await + .unwrap(); + }); + (addr, hits) +} + +#[tokio::test] +async fn refuses_to_follow_upstream_redirect() { + let (addr, secret_hits) = spawn_redirect_probe().await; + let bridge = HttpBridge::new(A2aUpstream { + url: format!("http://{addr}/redirect"), + auth: A2aAuth::None, + timeout: DEFAULT_UPSTREAM_TIMEOUT, + }); + + let err = bridge.send(&message_send("r")).await.unwrap_err(); + // The 302 surfaces as a non-success status error — it is NOT followed. + assert!( + matches!(err, A2aError::Request(_)), + "a redirect must surface as an error, got {err:?}" + ); + assert_eq!( + secret_hits.load(Ordering::SeqCst), + 0, + "the redirect target must NOT be fetched — the gateway must not chase upstream redirects" + ); +} diff --git a/crates/aisix-admin/src/a2a_agents_handlers.rs b/crates/aisix-admin/src/a2a_agents_handlers.rs new file mode 100644 index 00000000..92be61ff --- /dev/null +++ b/crates/aisix-admin/src/a2a_agents_handlers.rs @@ -0,0 +1,205 @@ +//! CRUD handlers for `/admin/v1/a2a_agents`. +//! +//! Same shape as the McpServers handlers: validate against the JSON schema, +//! reject duplicate display_names (409), generate a uuid v4 on POST, bump +//! revision on PUT. The display_name is the path segment under which the agent +//! is exposed (`/a2a/`), so it must be a single URL path segment +//! (no `/`). The per-auth_type credential coupling is enforced here too, since +//! the flat schema stays permissive on it. + +use aisix_core::models::validate_a2a_agent; +use aisix_core::resource::ResourceEntry; +use aisix_core::{A2aAgent, A2aAuthType}; +use axum::extract::{Path, State}; +use axum::Json; +use serde_json::Value; +use uuid::Uuid; + +use crate::auth::AdminAuth; +use crate::error::AdminError; +use crate::state::AdminState; + +const STARTING_REVISION: i64 = 1; + +pub async fn list_a2a_agents( + _auth: AdminAuth, + State(state): State, +) -> Result>>, AdminError> { + let entries = state.store.list_a2a_agents().await?; + Ok(Json(entries)) +} + +pub async fn get_a2a_agent( + _auth: AdminAuth, + Path(id): Path, + State(state): State, +) -> Result>, AdminError> { + let entry = state + .store + .get_a2a_agent(&id) + .await? + .ok_or(AdminError::NotFound)?; + Ok(Json(entry)) +} + +pub async fn create_a2a_agent( + _auth: AdminAuth, + State(state): State, + Json(raw): Json, +) -> Result>, AdminError> { + let agent = decode(&raw)?; + let all = state.store.list_a2a_agents().await?; + assert_unique_display_name(&all, &agent.display_name, None)?; + + let id = Uuid::new_v4().to_string(); + let entry = ResourceEntry::new(&id, agent, STARTING_REVISION); + state.store.put_a2a_agent(entry.clone()).await?; + Ok(Json(entry)) +} + +pub async fn update_a2a_agent( + _auth: AdminAuth, + Path(id): Path, + State(state): State, + Json(raw): Json, +) -> Result>, AdminError> { + let existing = state + .store + .get_a2a_agent(&id) + .await? + .ok_or(AdminError::NotFound)?; + let agent = decode(&raw)?; + + let all = state.store.list_a2a_agents().await?; + assert_unique_display_name(&all, &agent.display_name, Some(&id))?; + + let entry = ResourceEntry::new(&id, agent, existing.revision + 1); + state.store.put_a2a_agent(entry.clone()).await?; + Ok(Json(entry)) +} + +pub async fn delete_a2a_agent( + _auth: AdminAuth, + Path(id): Path, + State(state): State, +) -> Result, AdminError> { + let removed = state.store.delete_a2a_agent(&id).await?; + if !removed { + return Err(AdminError::NotFound); + } + Ok(Json(serde_json::json!({"deleted": true, "id": id}))) +} + +fn decode(raw: &Value) -> Result { + validate_a2a_agent(raw)?; + let agent: A2aAgent = serde_json::from_value(raw.clone()) + .map_err(|e| AdminError::BadRequest(format!("malformed A2aAgent payload: {e}")))?; + // The display_name is the path segment in `/a2a/`, so it must + // be a single URL path segment. + if agent.display_name.contains('/') { + return Err(AdminError::BadRequest( + "display_name must not contain `/` — it is the agent's URL path segment".to_string(), + )); + } + // Per-auth_type credential coupling. The JSON schema stays flat and + // permissive on this (see the note on the A2aAgent struct); the write path + // is where an incomplete credential set is rejected outright. + let has_secret = !agent.secret.as_deref().unwrap_or_default().is_empty(); + match agent.auth_type { + A2aAuthType::None => {} + A2aAuthType::Bearer if !has_secret => { + return Err(AdminError::BadRequest( + "secret is required and must be non-empty when auth_type is `bearer`".to_string(), + )); + } + A2aAuthType::ApiKey if !has_secret => { + return Err(AdminError::BadRequest( + "secret is required and must be non-empty when auth_type is `api_key`".to_string(), + )); + } + A2aAuthType::OAuth2 => { + let has_client_id = !agent.client_id.as_deref().unwrap_or_default().is_empty(); + let has_token_url = !agent.token_url.as_deref().unwrap_or_default().is_empty(); + if !has_secret || !has_client_id || !has_token_url { + return Err(AdminError::BadRequest( + "client_id, token_url, and secret (the OAuth client secret) are required \ + and must be non-empty when auth_type is `oauth2`" + .to_string(), + )); + } + } + A2aAuthType::Bearer | A2aAuthType::ApiKey => {} + } + Ok(agent) +} + +fn assert_unique_display_name( + existing: &[ResourceEntry], + display_name: &str, + self_id: Option<&str>, +) -> Result<(), AdminError> { + for e in existing { + if e.value.display_name == display_name && self_id.is_none_or(|sid| sid != e.id) { + return Err(AdminError::Conflict(display_name.to_string())); + } + } + Ok(()) +} + +#[cfg(test)] +mod tests { + use super::*; + use serde_json::json; + + #[test] + fn decode_rejects_slash_in_display_name() { + let err = decode(&json!({"display_name": "a/b", "url": "https://x/a2a"})) + .expect_err("`/` in display_name must be rejected"); + assert!(matches!(err, AdminError::BadRequest(_))); + } + + #[test] + fn decode_rejects_bearer_without_secret() { + let err = decode(&json!({ + "display_name": "agent", + "url": "https://x/a2a", + "auth_type": "bearer" + })) + .expect_err("bearer auth without a secret must be rejected"); + assert!(matches!(err, AdminError::BadRequest(_))); + } + + #[test] + fn decode_rejects_incomplete_oauth2() { + for missing in ["client_id", "token_url", "secret"] { + let mut v = json!({ + "display_name": "agent", + "url": "https://x/a2a", + "auth_type": "oauth2", + "client_id": "cid", + "token_url": "https://auth.example.com/oauth/token", + "secret": "cs" + }); + v.as_object_mut().unwrap().remove(missing); + let err = decode(&v).unwrap_err(); + assert!( + matches!(err, AdminError::BadRequest(_)), + "oauth2 without `{missing}` must be a BadRequest" + ); + } + } + + #[test] + fn decode_accepts_valid_agent_with_pinned_version() { + let agent = decode(&json!({ + "display_name": "invoice-processor", + "url": "https://agents.example.com/a2a", + "protocol_version": "0.3", + "auth_type": "bearer", + "secret": "tok" + })) + .expect("valid agent should decode"); + assert_eq!(agent.display_name, "invoice-processor"); + assert_eq!(agent.secret.as_deref(), Some("tok")); + } +} diff --git a/crates/aisix-admin/src/apikeys_handlers.rs b/crates/aisix-admin/src/apikeys_handlers.rs index 6c4d32a9..ba666f44 100644 --- a/crates/aisix-admin/src/apikeys_handlers.rs +++ b/crates/aisix-admin/src/apikeys_handlers.rs @@ -36,6 +36,10 @@ struct StandaloneApiKeyBody { #[serde(default)] #[serde(skip_serializing_if = "Option::is_none")] allowed_tools: Option>, + /// A2A agents this key may reach (by registered name, or `"*"`). + #[serde(default)] + #[serde(skip_serializing_if = "Option::is_none")] + allowed_agents: Option>, /// RFC 3339 timestamp after which the key stops authenticating. /// Omitted or `null` means the key never expires. #[serde(default)] @@ -57,6 +61,8 @@ pub struct PublicApiKey { #[serde(skip_serializing_if = "Option::is_none")] pub allowed_tools: Option>, #[serde(skip_serializing_if = "Option::is_none")] + pub allowed_agents: Option>, + #[serde(skip_serializing_if = "Option::is_none")] pub expires_at: Option>, #[serde(skip_serializing_if = "std::ops::Not::not")] pub disabled: bool, @@ -69,6 +75,7 @@ impl From for PublicApiKey { allowed_models: value.allowed_models, rate_limit: value.rate_limit, allowed_tools: value.allowed_tools, + allowed_agents: value.allowed_agents, expires_at: value.expires_at, disabled: value.disabled, } diff --git a/crates/aisix-admin/src/etcd_store.rs b/crates/aisix-admin/src/etcd_store.rs index 7c65b1bb..1d01363d 100644 --- a/crates/aisix-admin/src/etcd_store.rs +++ b/crates/aisix-admin/src/etcd_store.rs @@ -20,7 +20,7 @@ use aisix_core::resource::ResourceEntry; use aisix_core::{ - ApiKey, CachePolicy, Guardrail, McpServer, Model, ObservabilityExporter, ProviderKey, + A2aAgent, ApiKey, CachePolicy, Guardrail, McpServer, Model, ObservabilityExporter, ProviderKey, }; use etcd_client::{Client, DeleteOptions, GetOptions}; use serde::de::DeserializeOwned; @@ -38,6 +38,7 @@ pub const GUARDRAILS_SUBKEY: &str = "guardrails"; pub const CACHE_POLICIES_SUBKEY: &str = "cache_policies"; pub const OBSERVABILITY_EXPORTERS_SUBKEY: &str = "observability_exporters"; pub const MCP_SERVERS_SUBKEY: &str = "mcp_servers"; +pub const A2A_AGENTS_SUBKEY: &str = "a2a_agents"; pub struct EtcdConfigStore { client: Mutex, @@ -365,6 +366,32 @@ impl ConfigStore for EtcdConfigStore { async fn delete_mcp_server(&self, id: &str) -> Result { self.delete_one(&self.key_for(MCP_SERVERS_SUBKEY, id)).await } + + async fn put_a2a_agent(&self, entry: ResourceEntry) -> Result<(), StoreError> { + let key = self.key_for(A2A_AGENTS_SUBKEY, &entry.id); + self.put_json(&key, &entry.value).await + } + + async fn get_a2a_agent(&self, id: &str) -> Result>, StoreError> { + let key = self.key_for(A2A_AGENTS_SUBKEY, id); + Ok(self + .get_one::(&key) + .await? + .map(|(v, rev)| ResourceEntry::new(id, v, rev))) + } + + async fn list_a2a_agents(&self) -> Result>, StoreError> { + Ok(self + .list_range::(A2A_AGENTS_SUBKEY) + .await? + .into_iter() + .map(|(id, v, rev)| ResourceEntry::new(id, v, rev)) + .collect()) + } + + async fn delete_a2a_agent(&self, id: &str) -> Result { + self.delete_one(&self.key_for(A2A_AGENTS_SUBKEY, id)).await + } } #[cfg(test)] diff --git a/crates/aisix-admin/src/lib.rs b/crates/aisix-admin/src/lib.rs index 97ed0eb2..3b5c2220 100644 --- a/crates/aisix-admin/src/lib.rs +++ b/crates/aisix-admin/src/lib.rs @@ -34,6 +34,7 @@ #![forbid(unsafe_code)] #![deny(rust_2018_idioms)] +mod a2a_agents_handlers; mod apikeys_handlers; mod auth; mod cache_policies_handlers; @@ -131,6 +132,17 @@ pub fn build_router(state: AdminState) -> Router { .put(mcp_servers_handlers::update_mcp_server) .delete(mcp_servers_handlers::delete_mcp_server), ) + .route( + "/admin/v1/a2a_agents", + get(a2a_agents_handlers::list_a2a_agents) + .post(a2a_agents_handlers::create_a2a_agent), + ) + .route( + "/admin/v1/a2a_agents/:id", + get(a2a_agents_handlers::get_a2a_agent) + .put(a2a_agents_handlers::update_a2a_agent) + .delete(a2a_agents_handlers::delete_a2a_agent), + ) .route( "/admin/v1/guardrails", get(guardrails_handlers::list_guardrails) diff --git a/crates/aisix-admin/src/openapi.rs b/crates/aisix-admin/src/openapi.rs index 17d1b047..97b03610 100644 --- a/crates/aisix-admin/src/openapi.rs +++ b/crates/aisix-admin/src/openapi.rs @@ -1705,6 +1705,352 @@ const OPENAPI_JSON_BASE: &str = r##"{ "description": "Delete an upstream MCP server resource by ID." } }, + "/admin/v1/a2a_agents": { + "get": { + "summary": "List A2A Agents", + "responses": { + "200": { + "description": "OK", + "content": { + "application/json": { + "schema": { + "type": "array", + "items": { + "$ref": "#/components/schemas/A2aAgentEntry" + } + } + } + } + }, + "401": { + "description": "Missing or invalid admin key", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/AdminError" + } + } + } + }, + "500": { + "description": "Configuration store operation failed", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/AdminError" + } + } + } + } + }, + "tags": [ + "A2A Agents" + ], + "description": "List registered upstream A2A agent resources." + }, + "post": { + "summary": "Create A2A Agent", + "requestBody": { + "required": true, + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/A2aAgent" + } + } + }, + "description": "Upstream A2A agent configuration." + }, + "responses": { + "200": { + "description": "OK", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/A2aAgentEntry" + } + } + } + }, + "400": { + "description": "Schema validation failed, the JSON body is malformed, `display_name` contains a `/` (it is the agent's URL path segment), or the credentials required by `auth_type` are missing (`secret` for `bearer`/`api_key`; `client_id`, `token_url`, and `secret` for `oauth2`)", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/AdminError" + } + }, + "text/plain": { + "schema": { + "type": "string" + } + } + } + }, + "401": { + "description": "Missing or invalid admin key", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/AdminError" + } + } + } + }, + "409": { + "description": "Duplicate display_name", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/AdminError" + } + } + } + }, + "413": { + "description": "JSON request body exceeds the admin body-size limit", + "content": { + "text/plain": { + "schema": { + "type": "string" + } + } + } + }, + "415": { + "description": "Missing or unsupported JSON content type", + "content": { + "text/plain": { + "schema": { + "type": "string" + } + } + } + }, + "500": { + "description": "Configuration store operation failed", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/AdminError" + } + } + } + } + }, + "tags": [ + "A2A Agents" + ], + "description": "Create an upstream A2A agent resource. The gateway validates the payload, rejects duplicate `display_name` values, and returns the stored resource entry." + } + }, + "/admin/v1/a2a_agents/{id}": { + "parameters": [ + { + "name": "id", + "in": "path", + "required": true, + "schema": { + "type": "string" + }, + "description": "A2A agent resource ID generated by the Admin API.", + "example": "1d95ac57-7f27-46a4-b5a3-55d3c3ad0a12" + } + ], + "get": { + "summary": "Get A2A Agent by ID", + "responses": { + "200": { + "description": "OK", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/A2aAgentEntry" + } + } + } + }, + "401": { + "description": "Missing or invalid admin key", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/AdminError" + } + } + } + }, + "404": { + "description": "Resource not found", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/AdminError" + } + } + } + }, + "500": { + "description": "Configuration store operation failed", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/AdminError" + } + } + } + } + }, + "tags": [ + "A2A Agents" + ], + "description": "Get an upstream A2A agent resource by ID." + }, + "put": { + "summary": "Update A2A Agent by ID", + "requestBody": { + "required": true, + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/A2aAgent" + } + } + }, + "description": "Replacement upstream A2A agent configuration." + }, + "responses": { + "200": { + "description": "OK", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/A2aAgentEntry" + } + } + } + }, + "400": { + "description": "Schema validation failed, the JSON body is malformed, `display_name` contains a `/` (it is the agent's URL path segment), or the credentials required by `auth_type` are missing (`secret` for `bearer`/`api_key`; `client_id`, `token_url`, and `secret` for `oauth2`)", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/AdminError" + } + }, + "text/plain": { + "schema": { + "type": "string" + } + } + } + }, + "401": { + "description": "Missing or invalid admin key", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/AdminError" + } + } + } + }, + "404": { + "description": "Resource not found", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/AdminError" + } + } + } + }, + "413": { + "description": "JSON request body exceeds the admin body-size limit", + "content": { + "text/plain": { + "schema": { + "type": "string" + } + } + } + }, + "415": { + "description": "Missing or unsupported JSON content type", + "content": { + "text/plain": { + "schema": { + "type": "string" + } + } + } + }, + "500": { + "description": "Configuration store operation failed", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/AdminError" + } + } + } + } + }, + "tags": [ + "A2A Agents" + ], + "description": "Update an upstream A2A agent resource by ID. The gateway validates the payload, rejects duplicate `display_name` values, preserves the resource ID, and increments the revision." + }, + "delete": { + "summary": "Delete A2A Agent by ID", + "responses": { + "200": { + "description": "OK", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/DeleteResponse" + } + } + } + }, + "401": { + "description": "Missing or invalid admin key", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/AdminError" + } + } + } + }, + "404": { + "description": "Resource not found", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/AdminError" + } + } + } + }, + "500": { + "description": "Configuration store operation failed", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/AdminError" + } + } + } + } + }, + "tags": [ + "A2A Agents" + ], + "description": "Delete an upstream A2A agent resource by ID." + } + }, "/admin/v1/guardrails": { "get": { "summary": "List Guardrails", @@ -3284,6 +3630,31 @@ const OPENAPI_JSON_BASE: &str = r##"{ }, "description": "Stored Admin API resource entry." }, + "A2aAgentEntry": { + "type": "object", + "required": [ + "id", + "value", + "revision" + ], + "properties": { + "id": { + "type": "string", + "description": "Resource ID generated by the Admin API.", + "example": "1d95ac57-7f27-46a4-b5a3-55d3c3ad0a12" + }, + "value": { + "$ref": "#/components/schemas/A2aAgent", + "description": "Stored upstream A2A agent configuration." + }, + "revision": { + "type": "integer", + "description": "Monotonic resource revision. Create operations start at 1, and update operations increment the revision.", + "example": 1845 + } + }, + "description": "Stored Admin API resource entry." + }, "GuardrailEntry": { "type": "object", "required": [ @@ -3606,6 +3977,10 @@ const RESOURCE_SCHEMAS: &[(&str, &str)] = &[ "McpServer", include_str!("../../../schemas/resources/mcp_server.schema.json"), ), + ( + "A2aAgent", + include_str!("../../../schemas/resources/a2a_agent.schema.json"), + ), ( "Guardrail", include_str!("../../../schemas/resources/guardrail.schema.json"), @@ -4104,6 +4479,8 @@ mod tests { "/admin/v1/provider_keys/{id}", "/admin/v1/mcp_servers", "/admin/v1/mcp_servers/{id}", + "/admin/v1/a2a_agents", + "/admin/v1/a2a_agents/{id}", "/admin/v1/guardrails", "/admin/v1/guardrails/{id}", "/admin/v1/cache_policies", @@ -4137,6 +4514,8 @@ mod tests { "ProviderKeyEntry", "McpServer", "McpServerEntry", + "A2aAgent", + "A2aAgentEntry", "Guardrail", "GuardrailEntry", "CachePolicy", @@ -4186,6 +4565,8 @@ mod tests { "/admin/v1/provider_keys/{id}", "/admin/v1/mcp_servers", "/admin/v1/mcp_servers/{id}", + "/admin/v1/a2a_agents", + "/admin/v1/a2a_agents/{id}", "/admin/v1/guardrails", "/admin/v1/guardrails/{id}", "/admin/v1/cache_policies", @@ -4761,6 +5142,8 @@ mod tests { ("/admin/v1/provider_keys/{id}", "put"), ("/admin/v1/mcp_servers", "post"), ("/admin/v1/mcp_servers/{id}", "put"), + ("/admin/v1/a2a_agents", "post"), + ("/admin/v1/a2a_agents/{id}", "put"), ("/admin/v1/guardrails", "post"), ("/admin/v1/guardrails/{id}", "put"), ("/admin/v1/cache_policies", "post"), diff --git a/crates/aisix-admin/src/store.rs b/crates/aisix-admin/src/store.rs index 16c2f872..f284e0b4 100644 --- a/crates/aisix-admin/src/store.rs +++ b/crates/aisix-admin/src/store.rs @@ -8,7 +8,7 @@ use aisix_core::resource::ResourceEntry; use aisix_core::{ - ApiKey, CachePolicy, Guardrail, McpServer, Model, ObservabilityExporter, ProviderKey, + A2aAgent, ApiKey, CachePolicy, Guardrail, McpServer, Model, ObservabilityExporter, ProviderKey, }; use dashmap::DashMap; use std::sync::Arc; @@ -75,6 +75,11 @@ pub trait ConfigStore: Send + Sync + 'static { ) -> Result>, StoreError>; async fn list_mcp_servers(&self) -> Result>, StoreError>; async fn delete_mcp_server(&self, id: &str) -> Result; + + async fn put_a2a_agent(&self, entry: ResourceEntry) -> Result<(), StoreError>; + async fn get_a2a_agent(&self, id: &str) -> Result>, StoreError>; + async fn list_a2a_agents(&self) -> Result>, StoreError>; + async fn delete_a2a_agent(&self, id: &str) -> Result; } /// In-memory store. Thread-safe via DashMap; mainly used by tests, but @@ -88,6 +93,7 @@ pub struct InMemoryStore { cache_policies: DashMap>, observability_exporters: DashMap>, mcp_servers: DashMap>, + a2a_agents: DashMap>, } impl InMemoryStore { @@ -240,6 +246,23 @@ impl ConfigStore for InMemoryStore { async fn delete_mcp_server(&self, id: &str) -> Result { Ok(self.mcp_servers.remove(id).is_some()) } + + async fn put_a2a_agent(&self, entry: ResourceEntry) -> Result<(), StoreError> { + self.a2a_agents.insert(entry.id.clone(), entry); + Ok(()) + } + + async fn get_a2a_agent(&self, id: &str) -> Result>, StoreError> { + Ok(self.a2a_agents.get(id).map(|r| r.clone())) + } + + async fn list_a2a_agents(&self) -> Result>, StoreError> { + Ok(self.a2a_agents.iter().map(|r| r.clone()).collect()) + } + + async fn delete_a2a_agent(&self, id: &str) -> Result { + Ok(self.a2a_agents.remove(id).is_some()) + } } #[cfg(test)] diff --git a/crates/aisix-core/src/bin/dump-schema.rs b/crates/aisix-core/src/bin/dump-schema.rs index cea61f6d..60a87d61 100644 --- a/crates/aisix-core/src/bin/dump-schema.rs +++ b/crates/aisix-core/src/bin/dump-schema.rs @@ -63,6 +63,7 @@ fn main() { schema::guardrail_attachment_root_schema(), ); dump_value(&out_dir, "mcp_server", schema::mcp_server_root_schema()); + dump_value(&out_dir, "a2a_agent", schema::a2a_agent_root_schema()); dump::(&out_dir, "ensemble"); dump::(&out_dir, "rate_limit"); diff --git a/crates/aisix-core/src/lib.rs b/crates/aisix-core/src/lib.rs index d6da9a29..247e1afe 100644 --- a/crates/aisix-core/src/lib.rs +++ b/crates/aisix-core/src/lib.rs @@ -32,15 +32,15 @@ pub use error::{ AdminError, AdminErrorEnvelope, BootstrapError, ProxyError, ProxyErrorEnvelope, RateLimitScope, }; pub use models::{ - validate_apikey, validate_cache_policy, validate_guardrail, validate_mcp_server, - validate_model, validate_observability_exporter, validate_provider_key, - validate_rate_limit_policy, Adapter, AisixSnapshot, ApiKey, AppliedGuardrail, CachePolicy, - CooldownConfig, ExporterKind, Guardrail, GuardrailHookPoint, GuardrailKind, KeywordConfig, - KeywordPattern, McpAuthType, McpServer, McpTransport, Model, ObservabilityExporter, - ParamConstraints, PolicyScope, PolicyWindow, ProviderKey, RateLimit, RateLimitPolicy, - RequestOverrides, ResponseOverrides, Routing, RoutingStrategy, RoutingTarget, SchemaError, - StreamDoneMarker, TelemetryKind, TelemetryTags, WhenAllUnavailablePolicy, - DEFAULT_COOLDOWN_TRIGGER_STATUSES, + validate_a2a_agent, validate_apikey, validate_cache_policy, validate_guardrail, + validate_mcp_server, validate_model, validate_observability_exporter, validate_provider_key, + validate_rate_limit_policy, A2aAgent, A2aAuthType, A2aProtocolVersion, Adapter, AisixSnapshot, + ApiKey, AppliedGuardrail, CachePolicy, CooldownConfig, ExporterKind, Guardrail, + GuardrailHookPoint, GuardrailKind, KeywordConfig, KeywordPattern, McpAuthType, McpServer, + McpTransport, Model, ObservabilityExporter, ParamConstraints, PolicyScope, PolicyWindow, + ProviderKey, RateLimit, RateLimitPolicy, RequestOverrides, ResponseOverrides, Routing, + RoutingStrategy, RoutingTarget, SchemaError, StreamDoneMarker, TelemetryKind, TelemetryTags, + WhenAllUnavailablePolicy, DEFAULT_COOLDOWN_TRIGGER_STATUSES, }; pub use resource::{Resource, ResourceEntry}; pub use snapshot::{ResourceTable, SnapshotHandle}; diff --git a/crates/aisix-core/src/models/a2a_agent.rs b/crates/aisix-core/src/models/a2a_agent.rs new file mode 100644 index 00000000..5fd7c10e --- /dev/null +++ b/crates/aisix-core/src/models/a2a_agent.rs @@ -0,0 +1,277 @@ +//! `A2aAgent` entity — a registered upstream A2A (Agent-to-Agent) agent. +//! +//! Registers an upstream agent that speaks the A2A protocol (HTTP + JSON-RPC +//! 2.0) so the gateway can front it: callers reach it through the gateway's own +//! `/a2a/` endpoint, its agent card is served (with URLs rewritten +//! to the gateway) at `/a2a//.well-known/agent.json`, and +//! `message/send` / `message/stream` are routed through the same auth / ACL / +//! guardrail / quota pipeline as LLM and MCP traffic. The upstream credential is +//! held by the gateway and is never exposed to the calling client. +//! +//! This is the `a2a_http` backend: a self-hosted agent reached over HTTP. +//! Managed-platform backends (Bedrock AgentCore, Azure AI Foundry, Vertex Agent +//! Engine) and gateway-composed virtual agents are later additions and are not +//! part of this entity yet. +//! +//! etcd path: `{prefix}/a2a_agents/{uuid}`. Secondary index on `display_name`. + +use serde::{Deserialize, Serialize}; + +use crate::resource::Resource; + +#[derive(Debug, Clone, Serialize, Deserialize, schemars::JsonSchema, PartialEq, Eq)] +#[serde(deny_unknown_fields)] +pub struct A2aAgent { + /// Operator-facing label, unique within the gateway. It is the path segment + /// under which the agent is exposed to callers as `/a2a/`, so + /// it must be a single non-empty URL path segment. + #[schemars(length(min = 1))] + pub display_name: String, + + /// The upstream agent's base URL, reached over HTTP with the A2A protocol + /// (JSON-RPC 2.0), such as `https://agents.example.com/a2a`. The agent card + /// is discovered relative to this URL. + #[schemars(length(min = 1))] + pub url: String, + + /// The A2A wire-format version the gateway pins for this agent. Pinned + /// explicitly rather than inferred from client signals, so the served agent + /// card and the accepted requests stay consistent. + #[serde(default)] + pub protocol_version: A2aProtocolVersion, + + /// How the gateway authenticates to the upstream agent. The credential is + /// held by the gateway and is never forwarded from or exposed to the calling + /// client. + #[serde(default)] + pub auth_type: A2aAuthType, + + /// Authentication credential for the upstream agent. Its meaning follows + /// `auth_type`: the bearer token when `auth_type` is `bearer` (sent as + /// `Authorization: Bearer `), the API key when `auth_type` is + /// `api_key` (sent as `x-api-key: `), or the OAuth client secret + /// when `auth_type` is `oauth2`. Leave unset when `auth_type` is `none`. + #[serde(default, skip_serializing_if = "Option::is_none")] + pub secret: Option, + + // Cross-field coupling (`oauth2` requires `client_id` + `secret` + + // `token_url`; `bearer`/`api_key` require `secret`) is deliberately NOT + // expressed in this flat schema — that would force restructuring the + // resource into a oneOf. The control plane enforces the coupling strictly + // at write time, this gateway's own Admin API re-checks it on write, and + // the runtime degrades gracefully when a snapshot-loaded agent is + // mis-configured: its credential exchange fails and the agent becomes + // unavailable, logged like any other upstream failure. + /// OAuth client identifier used for the OAuth 2.0 client credentials grant. + /// Required when `auth_type` is `oauth2`; ignored otherwise. + #[serde(default, skip_serializing_if = "Option::is_none")] + pub client_id: Option, + + /// OAuth token endpoint URL where the gateway exchanges the client + /// credentials for an access token, such as + /// `https://auth.example.com/oauth/token`. Required when `auth_type` is + /// `oauth2`; ignored otherwise. + #[serde(default, skip_serializing_if = "Option::is_none")] + pub token_url: Option, + + /// OAuth scopes to request. Joined with spaces into the `scope` parameter of + /// the token request. Only used when `auth_type` is `oauth2`. + #[serde(default, skip_serializing_if = "Option::is_none")] + pub scopes: Option>, + + /// Maximum time, in milliseconds, to wait for a single upstream operation + /// (fetching the agent card or invoking the agent). Must be at least `1` + /// when set. When omitted, the gateway applies a built-in default. + #[serde(default, skip_serializing_if = "Option::is_none")] + #[schemars(range(min = 1))] + pub timeout_ms: Option, + + /// Whether this agent is active. When `false`, it is not served and cannot + /// be reached. + #[serde(default = "default_enabled")] + pub enabled: bool, + + /// Filled in by the snapshot loader from the etcd key path. + #[serde(skip)] + pub(crate) runtime_id: String, +} + +fn default_enabled() -> bool { + true +} + +/// The A2A wire-format version pinned for an upstream agent. +#[derive( + Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize, schemars::JsonSchema, +)] +pub enum A2aProtocolVersion { + /// A2A 1.0 wire format (protobuf-JSON envelopes, PascalCase methods). + #[default] + #[serde(rename = "1.0")] + V1_0, + /// A2A 0.3 wire format (`kind`-discriminated JSON-RPC objects). + #[serde(rename = "0.3")] + V0_3, +} + +/// How the gateway authenticates to an upstream A2A agent. +#[derive( + Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize, schemars::JsonSchema, +)] +#[serde(rename_all = "snake_case")] +pub enum A2aAuthType { + /// No authentication; the agent is reached as-is. + #[default] + None, + /// Bearer token authentication. The token is supplied in `secret` and sent + /// as `Authorization: Bearer `. + Bearer, + /// API key authentication. The key is supplied in `secret` and sent as an + /// `x-api-key: ` header on every upstream request. + ApiKey, + /// OAuth 2.0 client credentials grant. The gateway exchanges `client_id`, + /// the client secret in `secret`, and the optional `scopes` at `token_url` + /// for an access token, and sends it as `Authorization: Bearer + /// ` on every upstream request. Access tokens are cached + /// until shortly before their reported expiry. + #[serde(rename = "oauth2")] + OAuth2, +} + +impl Resource for A2aAgent { + fn id(&self) -> &str { + &self.runtime_id + } + + fn name(&self) -> &str { + &self.display_name + } + + fn kind() -> &'static str { + "a2a_agents" + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn deserialises_minimal_a2a_agent() { + let a: A2aAgent = serde_json::from_str( + r#"{"display_name":"invoice-processor","url":"https://agents.example.com/a2a"}"#, + ) + .unwrap(); + assert_eq!(a.display_name, "invoice-processor"); + assert_eq!(a.url, "https://agents.example.com/a2a"); + // Defaults. + assert_eq!(a.protocol_version, A2aProtocolVersion::V1_0); + assert_eq!(a.auth_type, A2aAuthType::None); + assert!(a.secret.is_none()); + assert!(a.client_id.is_none()); + assert!(a.token_url.is_none()); + assert!(a.scopes.is_none()); + assert!(a.timeout_ms.is_none()); + assert!(a.enabled); + } + + #[test] + fn deserialises_with_bearer_auth_and_pinned_v0_3() { + let a: A2aAgent = serde_json::from_str( + r#"{"display_name":"tr","url":"https://x/a2a","protocol_version":"0.3","auth_type":"bearer","secret":"tok","timeout_ms":5000,"enabled":false}"#, + ) + .unwrap(); + assert_eq!(a.protocol_version, A2aProtocolVersion::V0_3); + assert_eq!(a.auth_type, A2aAuthType::Bearer); + assert_eq!(a.secret.as_deref(), Some("tok")); + assert_eq!(a.timeout_ms, Some(5000)); + assert!(!a.enabled); + } + + #[test] + fn deserialises_with_oauth2_auth() { + let a: A2aAgent = serde_json::from_str( + r#"{"display_name":"a","url":"https://x/a2a","auth_type":"oauth2","secret":"cs-1","client_id":"cid","token_url":"https://auth/x/token","scopes":["read","write"]}"#, + ) + .unwrap(); + assert_eq!(a.auth_type, A2aAuthType::OAuth2); + assert_eq!(a.secret.as_deref(), Some("cs-1")); + assert_eq!(a.client_id.as_deref(), Some("cid")); + assert_eq!(a.token_url.as_deref(), Some("https://auth/x/token")); + assert_eq!( + a.scopes.as_deref(), + Some(&["read".to_string(), "write".to_string()][..]) + ); + } + + #[test] + fn protocol_version_serialises_as_dotted_string() { + let a: A2aAgent = + serde_json::from_str(r#"{"display_name":"a","url":"https://x/a2a"}"#).unwrap(); + let s = serde_json::to_string(&a).unwrap(); + // Default V1_0 serialises as the wire string "1.0", not "v1_0". + assert!(s.contains(r#""protocol_version":"1.0""#), "got: {s}"); + } + + #[test] + fn oauth2_round_trips_and_omits_unset_optionals() { + let original: A2aAgent = serde_json::from_str( + r#"{"display_name":"a","url":"https://x/a2a","auth_type":"oauth2","secret":"cs","client_id":"cid","token_url":"https://auth/token"}"#, + ) + .unwrap(); + let s = serde_json::to_string(&original).unwrap(); + assert!(s.contains(r#""auth_type":"oauth2""#), "got: {s}"); + assert!(!s.contains("scopes"), "unset scopes must be omitted: {s}"); + let back: A2aAgent = serde_json::from_str(&s).unwrap(); + assert_eq!(original, back); + } + + #[test] + fn rejects_unknown_fields() { + let r: Result = + serde_json::from_str(r#"{"display_name":"x","url":"u","extra":1}"#); + assert!(r.is_err()); + } + + #[test] + fn rejects_unknown_protocol_version_and_auth_type() { + assert!(serde_json::from_str::( + r#"{"display_name":"x","url":"u","protocol_version":"2.0"}"# + ) + .is_err()); + assert!(serde_json::from_str::( + r#"{"display_name":"x","url":"u","auth_type":"oauth"}"# + ) + .is_err()); + } + + #[test] + fn resource_trait_routes_through_display_name() { + let mut a: A2aAgent = + serde_json::from_str(r#"{"display_name":"invoice","url":"https://x/a2a"}"#).unwrap(); + a.runtime_id = "uuid-a2a-1".into(); + assert_eq!(::kind(), "a2a_agents"); + assert_eq!(a.id(), "uuid-a2a-1"); + assert_eq!(a.name(), "invoice"); + } + + #[test] + fn round_trip_omits_default_optionals() { + let original = A2aAgent { + display_name: "invoice".into(), + url: "https://x/a2a".into(), + protocol_version: A2aProtocolVersion::V1_0, + auth_type: A2aAuthType::None, + secret: None, + client_id: None, + token_url: None, + scopes: None, + timeout_ms: None, + enabled: true, + runtime_id: String::new(), + }; + let s = serde_json::to_string(&original).unwrap(); + let back: A2aAgent = serde_json::from_str(&s).unwrap(); + assert_eq!(original, back); + } +} diff --git a/crates/aisix-core/src/models/apikey.rs b/crates/aisix-core/src/models/apikey.rs index a3b80f76..30a3fbb1 100644 --- a/crates/aisix-core/src/models/apikey.rs +++ b/crates/aisix-core/src/models/apikey.rs @@ -58,6 +58,14 @@ pub struct ApiKey { #[serde(default, skip_serializing_if = "Option::is_none")] pub allowed_tools: Option>, + /// A2A agents this key may reach, named by their registered names. Entries + /// are matched as single-`*` globs, mirroring `allowed_tools`: `"*"` grants + /// every agent and an entry without a `*` matches one agent exactly. When + /// omitted or set to `null`, the key has no A2A agent access — access is + /// granted explicitly. + #[serde(default, skip_serializing_if = "Option::is_none")] + pub allowed_agents: Option>, + /// RFC 3339 timestamp after which the key stops authenticating. /// Requests presenting an expired key are rejected with `401`. /// When omitted or set to `null`, the key never expires. @@ -130,6 +138,22 @@ impl ApiKey { } } + /// True if this key may reach the given A2A agent, named by its registered + /// name. + /// + /// Semantics mirror [`ApiKey::can_access_tool`]: entries are single-`*` + /// globs, so `"*"` grants every agent; entries without a `*` match exactly. + /// A key with no `allowed_agents` (or an empty list) may reach no A2A agent + /// — access is granted explicitly. + pub fn can_access_agent(&self, agent: &str) -> bool { + match &self.allowed_agents { + None => false, + Some(allowed) => allowed + .iter() + .any(|a| crate::wildcard::wildcard_matches(a, agent)), + } + } + /// Iterate over the names of models this key may access, filtering them /// against a known universe of model names. Delegates to [`Self::can_access`] /// so glob entries stay consistent with per-request authz: `*` expands to @@ -212,6 +236,7 @@ mod tests { user_id: None, user_name: None, allowed_tools: None, + allowed_agents: None, expires_at: None, disabled: false, runtime_id: String::new(), @@ -278,6 +303,32 @@ mod tests { assert!(!any_server.can_access_tool("github__readonly_admin")); } + #[test] + fn can_access_agent_enforces_allowlist() { + // No `allowed_agents` (or null / empty) → no A2A agent access. + let none: ApiKey = + serde_json::from_str(r#"{"key_hash":"h","allowed_models":["*"]}"#).unwrap(); + assert!(!none.can_access_agent("invoice-processor")); + let empty: ApiKey = + serde_json::from_str(r#"{"key_hash":"h","allowed_models":[],"allowed_agents":[]}"#) + .unwrap(); + assert!(!empty.can_access_agent("invoice-processor")); + + // Exact name grants only that agent. + let specific: ApiKey = serde_json::from_str( + r#"{"key_hash":"h","allowed_models":[],"allowed_agents":["invoice-processor"]}"#, + ) + .unwrap(); + assert!(specific.can_access_agent("invoice-processor")); + assert!(!specific.can_access_agent("translator")); + + // Wildcard grants every agent. + let wildcard: ApiKey = + serde_json::from_str(r#"{"key_hash":"h","allowed_models":[],"allowed_agents":["*"]}"#) + .unwrap(); + assert!(wildcard.can_access_agent("anything")); + } + #[test] fn can_access_checks_whitelist() { let k = sample(); diff --git a/crates/aisix-core/src/models/mod.rs b/crates/aisix-core/src/models/mod.rs index 00f8e82a..663f873b 100644 --- a/crates/aisix-core/src/models/mod.rs +++ b/crates/aisix-core/src/models/mod.rs @@ -15,6 +15,7 @@ //! Standalone deployments do per-key rate-limiting via //! `ApiKey::rate_limit`. +pub mod a2a_agent; pub mod apikey; pub mod cache_policy; pub mod embedding; @@ -31,6 +32,7 @@ pub mod schema; pub mod semantic; pub mod snapshot; +pub use a2a_agent::{A2aAgent, A2aAuthType, A2aProtocolVersion}; pub use apikey::ApiKey; pub use cache_policy::{AppliesTo, CacheBackend, CachePolicy}; pub use embedding::EmbeddingConfig; @@ -58,9 +60,10 @@ pub use rate_limit::RateLimit; pub use rate_limit_policy::{PolicyScope, PolicyWindow, RateLimitPolicy}; pub use routing::{Routing, RoutingStrategy, RoutingTarget, WhenAllUnavailablePolicy}; pub use schema::{ - validate_apikey, validate_cache_policy, validate_guardrail, validate_guardrail_attachment, - validate_mcp_server, validate_model, validate_observability_exporter, validate_provider_key, - validate_rate_limit_policy, SchemaError, + validate_a2a_agent, validate_apikey, validate_cache_policy, validate_guardrail, + validate_guardrail_attachment, validate_mcp_server, validate_model, + validate_observability_exporter, validate_provider_key, validate_rate_limit_policy, + SchemaError, }; pub use semantic::{ Aggregation, DistanceMetric, EmbeddingFailureMode, OnEmbeddingFailure, Semantic, SemanticMatch, diff --git a/crates/aisix-core/src/models/schema.rs b/crates/aisix-core/src/models/schema.rs index 8a11cb0d..dabd2efc 100644 --- a/crates/aisix-core/src/models/schema.rs +++ b/crates/aisix-core/src/models/schema.rs @@ -31,6 +31,7 @@ pub struct Schemas { pub observability_exporter: Validator, pub rate_limit_policy: Validator, pub mcp_server: Validator, + pub a2a_agent: Validator, } pub static SCHEMAS: Lazy> = Lazy::new(|| Arc::new(Schemas::compile())); @@ -65,6 +66,9 @@ impl Schemas { mcp_server: jsonschema::options() .build(&mcp_server_root_schema()) .expect("mcp_server schema is well-formed"), + a2a_agent: jsonschema::options() + .build(&a2a_agent_root_schema()) + .expect("a2a_agent schema is well-formed"), } } } @@ -125,6 +129,10 @@ pub fn validate_mcp_server(value: &Value) -> Result<(), SchemaError> { validate(&SCHEMAS.mcp_server, value) } +pub fn validate_a2a_agent(value: &Value) -> Result<(), SchemaError> { + validate(&SCHEMAS.a2a_agent, value) +} + /// Build a resource's canonical JSON Schema from its struct via `schemars`, /// the single source of field shapes and per-field constraints. /// @@ -213,6 +221,28 @@ pub fn mcp_server_root_schema() -> Value { schema } +pub fn a2a_agent_root_schema() -> Value { + let mut schema = struct_root_schema::(true); + if let Some(Value::Object(defs)) = schema.get_mut("definitions") { + title_single_value_enum_variants( + defs, + "A2aAuthType", + &[ + ("none", "No authentication"), + ("bearer", "Bearer token"), + ("api_key", "API key"), + ("oauth2", "OAuth 2.0 client credentials"), + ], + ); + title_single_value_enum_variants( + defs, + "A2aProtocolVersion", + &[("1.0", "A2A 1.0"), ("0.3", "A2A 0.3")], + ); + } + schema +} + fn title_single_value_enum_variants( defs: &mut serde_json::Map, schema_name: &str, diff --git a/crates/aisix-core/src/models/snapshot.rs b/crates/aisix-core/src/models/snapshot.rs index b5083e15..1091acae 100644 --- a/crates/aisix-core/src/models/snapshot.rs +++ b/crates/aisix-core/src/models/snapshot.rs @@ -4,6 +4,7 @@ //! coherent rebuild (compaction, initial load) and atomically swaps it into //! a [`SnapshotHandle`]. The data plane only sees the handle. +use super::a2a_agent::A2aAgent; use super::apikey::ApiKey; use super::cache_policy::CachePolicy; use super::guardrail::{Guardrail, GuardrailAttachment}; @@ -39,6 +40,10 @@ pub struct AisixSnapshot { /// MCP gateway endpoint aggregates each enabled server's tools and routes /// tool calls back to the owning server. pub mcp_servers: ResourceTable, + /// Registered upstream A2A agents: `/aisix//a2a_agents/`. The + /// A2A gateway endpoint fronts each enabled agent, forwarding JSON-RPC + /// requests to it and serving its card with URLs rewritten to the gateway. + pub a2a_agents: ResourceTable, } impl AisixSnapshot { @@ -58,6 +63,7 @@ impl AisixSnapshot { + self.observability_exporters.len() + self.rate_limit_policies.len() + self.mcp_servers.len() + + self.a2a_agents.len() } } diff --git a/crates/aisix-etcd/src/loader.rs b/crates/aisix-etcd/src/loader.rs index a9c4fae3..598855f2 100644 --- a/crates/aisix-etcd/src/loader.rs +++ b/crates/aisix-etcd/src/loader.rs @@ -12,10 +12,11 @@ //! entry; it serves the rest." use aisix_core::models::{ - validate_apikey, validate_cache_policy, validate_guardrail, validate_guardrail_attachment, - validate_mcp_server, validate_model, validate_observability_exporter, validate_provider_key, - validate_rate_limit_policy, ApiKey, CachePolicy, Guardrail, GuardrailAttachment, McpServer, - Model, ObservabilityExporter, ProviderKey, RateLimitPolicy, SchemaError, + validate_a2a_agent, validate_apikey, validate_cache_policy, validate_guardrail, + validate_guardrail_attachment, validate_mcp_server, validate_model, + validate_observability_exporter, validate_provider_key, validate_rate_limit_policy, A2aAgent, + ApiKey, CachePolicy, Guardrail, GuardrailAttachment, McpServer, Model, ObservabilityExporter, + ProviderKey, RateLimitPolicy, SchemaError, }; use aisix_core::resource::ResourceEntry; use aisix_core::AisixSnapshot; @@ -256,6 +257,18 @@ pub fn build_snapshot(prefix: &str, entries: &[RawEntry]) -> (AisixSnapshot, Bui snapshot.mcp_servers.insert(entry); } } + "a2a_agents" => { + if let Some(entry) = validate_and_parse::( + &raw.key, + raw.revision, + parsed, + &value, + validate_a2a_agent, + &mut stats, + ) { + snapshot.a2a_agents.insert(entry); + } + } other => { tracing::debug!(key = %raw.key, kind = %other, "unknown etcd kind; skipping"); stats.unknown_kind += 1; diff --git a/crates/aisix-etcd/src/supervisor.rs b/crates/aisix-etcd/src/supervisor.rs index baeeea68..901504b1 100644 --- a/crates/aisix-etcd/src/supervisor.rs +++ b/crates/aisix-etcd/src/supervisor.rs @@ -379,6 +379,9 @@ impl Supervisor

{ for e in tiny.mcp_servers.entries() { new.mcp_servers.insert(clone_entry(&e)); } + for e in tiny.a2a_agents.entries() { + new.a2a_agents.insert(clone_entry(&e)); + } new }); self.remove_rejection_for_key(&entry.key); @@ -434,6 +437,7 @@ impl Supervisor

{ } "rate_limit_policies" => snap.rate_limit_policies.get_by_id(parsed.id).is_some(), "mcp_servers" => snap.mcp_servers.get_by_id(parsed.id).is_some(), + "a2a_agents" => snap.a2a_agents.get_by_id(parsed.id).is_some(), _ => false, }; let removed_rejection = self.remove_rejection_for_key(key_str); @@ -482,6 +486,9 @@ impl Supervisor

{ "mcp_servers" => { new.mcp_servers.remove(parsed.id); } + "a2a_agents" => { + new.a2a_agents.remove(parsed.id); + } _ => {} } new @@ -718,6 +725,9 @@ fn clone_snapshot(src: &AisixSnapshot) -> AisixSnapshot { for e in src.mcp_servers.entries() { out.mcp_servers.insert(clone_entry(&e)); } + for e in src.a2a_agents.entries() { + out.a2a_agents.insert(clone_entry(&e)); + } out } diff --git a/crates/aisix-obs/src/usage.rs b/crates/aisix-obs/src/usage.rs index 96dae7a9..aba5fcef 100644 --- a/crates/aisix-obs/src/usage.rs +++ b/crates/aisix-obs/src/usage.rs @@ -335,6 +335,18 @@ pub struct UsageEvent { /// non-MCP events; cp-api stores empty as NULL. #[serde(default, skip_serializing_if = "String::is_empty")] pub mcp_tool_name: String, + + // ─── A2A gateway attribution ─── + /// Registered name of the upstream A2A agent a request was routed to. + /// Empty for non-A2A events; cp-api stores empty as NULL. Older cp-api + /// images that predate this field ignore it (DP-first rollout). + #[serde(default, skip_serializing_if = "String::is_empty")] + pub a2a_agent_name: String, + + /// The JSON-RPC method invoked on the A2A agent (such as `message/send`). + /// Empty for non-A2A events; cp-api stores empty as NULL. + #[serde(default, skip_serializing_if = "String::is_empty")] + pub a2a_method: String, } #[inline] diff --git a/crates/aisix-proxy/Cargo.toml b/crates/aisix-proxy/Cargo.toml index 8807de6a..67aaaaea 100644 --- a/crates/aisix-proxy/Cargo.toml +++ b/crates/aisix-proxy/Cargo.toml @@ -9,6 +9,7 @@ authors.workspace = true description = "aisix: /v1/* proxy router (OpenAI-compatible) + middleware + hooks" [dependencies] +aisix-a2a = { path = "../aisix-a2a" } aisix-core = { path = "../aisix-core" } aisix-gateway = { path = "../aisix-gateway" } # Mounts the downstream-facing `/mcp` MCP gateway endpoint (mcp.rs): the diff --git a/crates/aisix-proxy/src/a2a.rs b/crates/aisix-proxy/src/a2a.rs new file mode 100644 index 00000000..6fbbbf6b --- /dev/null +++ b/crates/aisix-proxy/src/a2a.rs @@ -0,0 +1,522 @@ +//! `/a2a/:agent` — the downstream-facing A2A gateway endpoint. +//! +//! AISIX fronts each registered A2A agent: a caller reaches an agent through +//! `/a2a/`, and its card is served (with the service URL rewritten to +//! point back at the gateway) at `/a2a//.well-known/agent-card.json`. +//! The caller authenticates with an AISIX API key — the [`AuthenticatedKey`] +//! extractor rejects a missing or invalid key with `401` before the request +//! reaches the agent. The endpoint is rebuilt from the current configuration +//! snapshot on each request, so it always reflects the live `a2a_agents` set. +//! +//! A `message/send` (and every other JSON-RPC call) is governed by the SAME +//! pipeline as an LLM request, keyed on the caller's API key: per-agent access +//! control (the key's `allowed_agents`), rate-limit + budget (`quota::enforce`), +//! and a usage event into the shared sink. The upstream credential is held +//! gateway-side and never reaches the caller. Guardrails over A2A message +//! content are a later step. +//! +//! The request body is forwarded verbatim to the upstream agent, so the caller +//! speaks whichever A2A wire version the agent is pinned to; the gateway does +//! not translate between the 0.3 and 1.0 formats here. + +use std::time::{Duration, Instant}; + +use aisix_a2a::{upstream_from_a2a_agent, A2aBridge, A2aError, HttpBridge}; +use aisix_obs::{AccessLog, RequestOutcome, UsageEvent}; +use axum::body::to_bytes; +use axum::extract::{Path, Request, State}; +use axum::http::{header, HeaderMap, StatusCode}; +use axum::response::{IntoResponse, Response}; +use serde::Deserialize; + +use crate::auth::AuthenticatedKey; +use crate::request_id::new_request_id; +use crate::state::ProxyState; + +/// Bounded `model` metric label for /a2a requests — A2A has no resolved model, +/// and the agent name is a path segment (bounded by the registered set, but +/// kept as a fixed label to match the /mcp convention and #451). +const A2A_MODEL_LABEL: &str = "a2a"; + +/// Just enough of a JSON-RPC request to record the method for usage and echo +/// the id back in a synthesized error. Unknown fields are ignored. +#[derive(Deserialize)] +struct JsonRpcPeek { + method: Option, + id: Option, +} + +/// Serve a JSON-RPC request to `/a2a/:agent`. Authentication (`401`), per-agent +/// ACL (`403`), and rate-limit + budget (`429` / budget error) gate the call +/// before the request is forwarded to the upstream agent; a usage event is +/// emitted either way. +pub async fn a2a_endpoint( + auth: AuthenticatedKey, + Path(agent): Path, + State(state): State, + request: Request, +) -> Response { + let started = Instant::now(); + let request_id = new_request_id(); + let api_key_id = auth.entry.id.clone(); + let http_method = request.method().clone(); + + let response = dispatch(auth, &agent, &state, request, &request_id).await; + + let elapsed = started.elapsed(); + let status = response.status().as_u16(); + AccessLog { + method: http_method.as_str(), + path: "/a2a", + status, + latency: elapsed, + provider: Some("a2a"), + model: None, + api_key_id: Some(&api_key_id), + prompt_tokens: None, + completion_tokens: None, + total_tokens: None, + request_id: &request_id, + served_by_model: None, + routing_attempt_count: None, + routing_fallback_count: None, + } + .emit(); + state.metrics.record_request( + "a2a", + A2A_MODEL_LABEL, + status, + RequestOutcome::from_status(status), + elapsed, + ); + response +} + +async fn dispatch( + auth: AuthenticatedKey, + agent: &str, + state: &ProxyState, + request: Request, + request_id: &str, +) -> Response { + // Resolve the agent from the live snapshot. A disabled agent is treated as + // absent — not served, same as a missing one. + let snapshot = state.snapshot.load(); + let entry = match snapshot.a2a_agents.get_by_name(agent) { + Some(entry) if entry.value.enabled => entry, + _ => return (StatusCode::NOT_FOUND, format!("unknown A2A agent: {agent}")).into_response(), + }; + + // Per-agent access control, keyed on the same API key object as LLM/MCP + // access. A key with no `allowed_agents` reaches none (grant is explicit). + if !auth.key().can_access_agent(agent) { + return ( + StatusCode::FORBIDDEN, + format!("this key may not reach A2A agent: {agent}"), + ) + .into_response(); + } + + let upstream = match upstream_from_a2a_agent(&entry.value) { + Ok(upstream) => upstream, + // Currently only oauth2 upstream auth, which the runtime does not + // implement yet — surface it as "not implemented". + Err(err) => { + emit_a2a_usage( + state, + &auth, + request_id, + agent, + "", + StatusCode::NOT_IMPLEMENTED.as_u16(), + Duration::ZERO, + ); + return (StatusCode::NOT_IMPLEMENTED, err.to_string()).into_response(); + } + }; + + let (_parts, body) = request.into_parts(); + let bytes = match to_bytes(body, state.request_body_limit_bytes).await { + Ok(bytes) => bytes, + Err(_) => return (StatusCode::BAD_REQUEST, "invalid request body").into_response(), + }; + let value: serde_json::Value = match serde_json::from_slice(&bytes) { + Ok(value) => value, + Err(_) => return (StatusCode::BAD_REQUEST, "invalid JSON-RPC body").into_response(), + }; + let peek = serde_json::from_slice::(&bytes).ok(); + let method = peek + .as_ref() + .and_then(|p| p.method.clone()) + .unwrap_or_default(); + let rpc_id = peek.and_then(|p| p.id); + + // Reuse the LLM path's rate-limit + budget gate. The reservation is held + // for the call and dropped after (an A2A call carries no token cost yet). + // On 429 / budget-exceeded this returns before the upstream is contacted. + let _reservation = match crate::quota::enforce(state, &auth, None).await { + Ok(reservation) => reservation, + Err(err) => { + let response = err.into_response(); + emit_a2a_usage( + state, + &auth, + request_id, + agent, + &method, + response.status().as_u16(), + Duration::ZERO, + ); + return response; + } + }; + + let bridge = HttpBridge::new(upstream); + let started = Instant::now(); + let result = bridge.send(&value).await; + let latency = started.elapsed(); + + match result { + Ok(response_value) => { + emit_a2a_usage( + state, + &auth, + request_id, + agent, + &method, + StatusCode::OK.as_u16(), + latency, + ); + axum::Json(response_value).into_response() + } + Err(err) => { + let status = a2a_error_status(&err); + tracing::warn!(agent = %agent, error = %err, "A2A upstream call failed"); + emit_a2a_usage( + state, + &auth, + request_id, + agent, + &method, + status.as_u16(), + latency, + ); + a2a_error_response(rpc_id, status, &err.to_string()) + } + } +} + +/// Serve the upstream agent's card at `/a2a/:agent/.well-known/agent-card.json`, +/// rewriting its advertised service `url` to point back at this gateway so +/// callers discover the agent through `/a2a/`. +pub async fn a2a_agent_card( + auth: AuthenticatedKey, + Path(agent): Path, + State(state): State, + headers: HeaderMap, +) -> Response { + let snapshot = state.snapshot.load(); + let entry = match snapshot.a2a_agents.get_by_name(&agent) { + Some(entry) if entry.value.enabled => entry, + _ => return (StatusCode::NOT_FOUND, format!("unknown A2A agent: {agent}")).into_response(), + }; + if !auth.key().can_access_agent(&agent) { + return ( + StatusCode::FORBIDDEN, + format!("this key may not reach A2A agent: {agent}"), + ) + .into_response(); + } + let upstream = match upstream_from_a2a_agent(&entry.value) { + Ok(upstream) => upstream, + Err(err) => return (StatusCode::NOT_IMPLEMENTED, err.to_string()).into_response(), + }; + + let bridge = HttpBridge::new(upstream); + let mut card = match bridge.fetch_agent_card().await { + Ok(card) => card, + Err(err) => { + tracing::warn!(agent = %agent, error = %err, "A2A agent card fetch failed"); + return (StatusCode::BAD_GATEWAY, err.to_string()).into_response(); + } + }; + // Rewrite the advertised service endpoint to the gateway so downstream + // callers route subsequent requests through `/a2a/`. Derived from + // the request's Host header (and forwarded scheme) since the gateway's + // public URL is not otherwise known here. + if let Some(base) = gateway_base(&headers) { + card.url = format!("{base}/a2a/{agent}"); + } + axum::Json(card).into_response() +} + +/// Reconstruct the gateway's public base (`scheme://host`) from request +/// headers: the `Host` header, and `X-Forwarded-Proto` when a proxy set it +/// (defaulting to `https`). Returns `None` when no Host header is present, in +/// which case the card's `url` is left as the upstream advertised it. +fn gateway_base(headers: &HeaderMap) -> Option { + let host = headers.get(header::HOST)?.to_str().ok()?; + let scheme = headers + .get("x-forwarded-proto") + .and_then(|v| v.to_str().ok()) + .unwrap_or("https"); + Some(format!("{scheme}://{host}")) +} + +/// Map a bridge error to the client-visible HTTP status: an upstream that could +/// not be reached is a bad gateway; a not-yet-supported config is not +/// implemented; anything else from the call is a bad gateway too. +fn a2a_error_status(err: &A2aError) -> StatusCode { + match err { + A2aError::Unsupported(_) => StatusCode::NOT_IMPLEMENTED, + A2aError::Connect(_) | A2aError::Request(_) => StatusCode::BAD_GATEWAY, + } +} + +/// Build a JSON-RPC error envelope for a gateway-side failure, echoing the +/// request id. A2A clients expect a JSON-RPC body, so the failure surfaces as +/// an error object they can handle rather than a bare HTTP error. +fn a2a_error_response( + id: Option, + status: StatusCode, + message: &str, +) -> Response { + let body = serde_json::json!({ + "jsonrpc": "2.0", + "id": id.unwrap_or(serde_json::Value::Null), + "error": { "code": -32000, "message": message }, + }); + (status, axum::Json(body)).into_response() +} + +/// Emit a usage event for a single A2A call into the same sink as LLM usage. +/// A2A calls carry no token cost yet, so token/cost fields stay zero; the event +/// records who called which agent with which method, the outcome, and latency. +fn emit_a2a_usage( + state: &ProxyState, + auth: &AuthenticatedKey, + request_id: &str, + agent: &str, + method: &str, + status_code: u16, + latency: Duration, +) { + let event = UsageEvent { + request_id: request_id.to_string(), + occurred_at: chrono::Utc::now().to_rfc3339_opts(chrono::SecondsFormat::Secs, true), + api_key_id: auth.entry.id.clone(), + status_code, + latency_ms: latency.as_millis().min(u32::MAX as u128) as u32, + inbound_protocol: "a2a".to_string(), + a2a_agent_name: agent.to_string(), + a2a_method: method.to_string(), + ..Default::default() + }; + state.usage_sink.try_emit("a2a", event.clone()); + let snap = state.snapshot.load(); + let exporters = snap.observability_exporters.entries(); + state + .otlp_fan_out + .fan_out(&event, None, exporters.iter().map(|e| &e.value)); +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn error_status_maps_unsupported_to_501_and_transport_to_502() { + assert_eq!( + a2a_error_status(&A2aError::Unsupported("oauth2".into())), + StatusCode::NOT_IMPLEMENTED + ); + assert_eq!( + a2a_error_status(&A2aError::Connect("dns".into())), + StatusCode::BAD_GATEWAY + ); + assert_eq!( + a2a_error_status(&A2aError::Request("500".into())), + StatusCode::BAD_GATEWAY + ); + } + + #[test] + fn gateway_base_uses_forwarded_proto_then_defaults_https() { + let mut headers = HeaderMap::new(); + headers.insert(header::HOST, "gw.example.com".parse().unwrap()); + assert_eq!( + gateway_base(&headers).as_deref(), + Some("https://gw.example.com") + ); + headers.insert("x-forwarded-proto", "http".parse().unwrap()); + assert_eq!( + gateway_base(&headers).as_deref(), + Some("http://gw.example.com") + ); + } + + #[test] + fn gateway_base_is_none_without_host() { + assert_eq!(gateway_base(&HeaderMap::new()), None); + } + + // ---- endpoint integration tests: drive the real router via oneshot ---- + use crate::build_router; + use aisix_core::{A2aAgent, AisixSnapshot, ApiKey, ProxyConfig, ResourceEntry, SnapshotHandle}; + use axum::body::Body; + use axum::http::Request as HttpRequest; + use std::sync::Arc; + use tower::ServiceExt; + + const TOKEN: &str = "sk-a2a-endpoint-test"; + + fn proxy_cfg() -> ProxyConfig { + ProxyConfig { + addr: "127.0.0.1:0".into(), + request_body_limit_bytes: 1_048_576, + real_ip: Default::default(), + tls: None, + } + } + + /// Snapshot with one API key (granting `allowed_agents`, or none when + /// `allowed_agents` is `null`) and one `invoice` agent at `agent_url`. + fn snapshot_with( + agent_url: &str, + enabled: bool, + allowed_agents: serde_json::Value, + ) -> AisixSnapshot { + let mut key = serde_json::json!({ + "key_hash": ApiKey::hash_bearer(TOKEN), + "allowed_models": ["*"], + }); + if !allowed_agents.is_null() { + key["allowed_agents"] = allowed_agents; + } + let apikey: ApiKey = serde_json::from_value(key).expect("valid apikey"); + let agent: A2aAgent = serde_json::from_value(serde_json::json!({ + "display_name": "invoice", + "url": agent_url, + "enabled": enabled, + })) + .expect("valid a2a agent"); + + let snap = AisixSnapshot::new(); + snap.apikeys.insert(ResourceEntry::new("ak-1", apikey, 1)); + snap.a2a_agents.insert(ResourceEntry::new("ag-1", agent, 1)); + snap + } + + fn router_with(snap: AisixSnapshot) -> axum::Router { + let handle = SnapshotHandle::new(snap); + let hub = Arc::new(aisix_gateway::Hub::new()); + build_router(ProxyState::new(handle, hub, &proxy_cfg()).without_cache()) + } + + fn a2a_post(agent: &str, auth: bool) -> HttpRequest { + let body = serde_json::json!({"jsonrpc": "2.0", "id": 1, "method": "message/send"}); + let mut b = HttpRequest::post(format!("/a2a/{agent}")) + .header("host", "a2a.aisix.example.com") + .header("content-type", "application/json"); + if auth { + b = b.header("authorization", format!("Bearer {TOKEN}")); + } + b.body(Body::from(body.to_string())).unwrap() + } + + #[tokio::test] + async fn endpoint_denies_key_without_allowed_agents_403() { + // Unreachable upstream on purpose: the ACL must reject BEFORE any + // upstream call is made. + let app = router_with(snapshot_with( + "http://127.0.0.1:1/a2a", + true, + serde_json::Value::Null, + )); + let resp = app.oneshot(a2a_post("invoice", true)).await.unwrap(); + assert_eq!(resp.status(), StatusCode::FORBIDDEN); + } + + #[tokio::test] + async fn endpoint_disabled_agent_is_404() { + let app = router_with(snapshot_with( + "http://127.0.0.1:1/a2a", + false, + serde_json::json!(["*"]), + )); + let resp = app.oneshot(a2a_post("invoice", true)).await.unwrap(); + assert_eq!(resp.status(), StatusCode::NOT_FOUND); + } + + #[tokio::test] + async fn endpoint_unknown_agent_is_404() { + let app = router_with(snapshot_with( + "http://127.0.0.1:1/a2a", + true, + serde_json::json!(["*"]), + )); + let resp = app.oneshot(a2a_post("does-not-exist", true)).await.unwrap(); + assert_eq!(resp.status(), StatusCode::NOT_FOUND); + } + + #[tokio::test] + async fn endpoint_missing_key_is_401() { + let app = router_with(snapshot_with( + "http://127.0.0.1:1/a2a", + true, + serde_json::json!(["*"]), + )); + let resp = app.oneshot(a2a_post("invoice", false)).await.unwrap(); + assert_eq!(resp.status(), StatusCode::UNAUTHORIZED); + } + + /// A stub upstream that serves an agent card advertising an internal URL. + async fn spawn_card_stub() -> std::net::SocketAddr { + let app = axum::Router::new().route( + "/.well-known/agent-card.json", + axum::routing::get(|| async { + axum::Json(serde_json::json!({ + "name": "Invoice Agent", + "url": "https://upstream.internal/a2a", + "version": "2.1.0", + "skills": [{"id": "extract"}] + })) + }), + ); + let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap(); + let addr = listener.local_addr().unwrap(); + tokio::spawn(async move { + axum::serve(listener, app.into_make_service()) + .await + .unwrap(); + }); + addr + } + + #[tokio::test] + async fn endpoint_rewrites_agent_card_url_to_gateway() { + let addr = spawn_card_stub().await; + let app = router_with(snapshot_with( + &format!("http://{addr}/a2a"), + true, + serde_json::json!(["*"]), + )); + let req = HttpRequest::get("/a2a/invoice/.well-known/agent-card.json") + .header("host", "a2a.aisix.example.com") + .header("authorization", format!("Bearer {TOKEN}")) + .body(Body::empty()) + .unwrap(); + let resp = app.oneshot(req).await.unwrap(); + assert_eq!(resp.status(), StatusCode::OK); + let bytes = axum::body::to_bytes(resp.into_body(), 1_048_576) + .await + .unwrap(); + let card: serde_json::Value = serde_json::from_slice(&bytes).unwrap(); + // The advertised service URL is rewritten to the gateway; the caller's + // Host is reflected and every other card field is preserved. + assert_eq!(card["url"], "https://a2a.aisix.example.com/a2a/invoice"); + assert_eq!(card["name"], "Invoice Agent"); + assert_eq!(card["version"], "2.1.0"); + assert_eq!(card["skills"][0]["id"], "extract"); + } +} diff --git a/crates/aisix-proxy/src/lib.rs b/crates/aisix-proxy/src/lib.rs index 962c4c43..88f1ed35 100644 --- a/crates/aisix-proxy/src/lib.rs +++ b/crates/aisix-proxy/src/lib.rs @@ -24,6 +24,7 @@ #![forbid(unsafe_code)] #![deny(rust_2018_idioms)] +mod a2a; mod attempt; mod audio; mod auth; @@ -112,6 +113,15 @@ pub fn build_router(state: ProxyState) -> Router { // enforced inside the handler via the `AuthenticatedKey` extractor. .route("/mcp", any(mcp::mcp_endpoint)) .route("/mcp/", any(mcp::mcp_endpoint)) + // Downstream-facing A2A gateway. One route per registered agent; the + // agent's card (with the service URL rewritten to the gateway) is served + // at the RFC 8615 well-known path under it. Authentication (AISIX API + // key) is enforced inside the handlers via `AuthenticatedKey`. + .route("/a2a/:agent", post(a2a::a2a_endpoint)) + .route( + "/a2a/:agent/.well-known/agent-card.json", + get(a2a::a2a_agent_card), + ) // Wire the configured cap into axum's request-body extractor // chain (`Json` defers to `Bytes` which honors this layer). // Without this, axum 0.7's `DefaultBodyLimit` falls back to @@ -186,6 +196,7 @@ fn normalize_endpoint_label(path: &str) -> &'static str { "/v1/audio/translations" => "/v1/audio/translations", "/v1/audio/speech" => "/v1/audio/speech", "/mcp" | "/mcp/" => "/mcp", + _ if path.starts_with("/a2a/") => "/a2a", _ if path.starts_with("/passthrough/") => "/passthrough/:provider/*rest", _ => "other", } @@ -196,6 +207,8 @@ fn inbound_protocol_for_endpoint(endpoint: &str) -> &'static str { "anthropic" } else if endpoint == "/mcp" { "mcp" + } else if endpoint == "/a2a" { + "a2a" } else { "openai" } diff --git a/schemas/resources/a2a_agent.schema.json b/schemas/resources/a2a_agent.schema.json new file mode 100644 index 00000000..41719114 --- /dev/null +++ b/schemas/resources/a2a_agent.schema.json @@ -0,0 +1,145 @@ +{ + "$schema": "http://json-schema.org/draft-07/schema#", + "additionalProperties": false, + "definitions": { + "A2aAuthType": { + "description": "How the gateway authenticates to an upstream A2A agent.", + "oneOf": [ + { + "description": "No authentication; the agent is reached as-is.", + "enum": [ + "none" + ], + "title": "No authentication", + "type": "string" + }, + { + "description": "Bearer token authentication. The token is supplied in `secret` and sent as `Authorization: Bearer `.", + "enum": [ + "bearer" + ], + "title": "Bearer token", + "type": "string" + }, + { + "description": "API key authentication. The key is supplied in `secret` and sent as an `x-api-key: ` header on every upstream request.", + "enum": [ + "api_key" + ], + "title": "API key", + "type": "string" + }, + { + "description": "OAuth 2.0 client credentials grant. The gateway exchanges `client_id`, the client secret in `secret`, and the optional `scopes` at `token_url` for an access token, and sends it as `Authorization: Bearer ` on every upstream request. Access tokens are cached until shortly before their reported expiry.", + "enum": [ + "oauth2" + ], + "title": "OAuth 2.0 client credentials", + "type": "string" + } + ] + }, + "A2aProtocolVersion": { + "description": "The A2A wire-format version pinned for an upstream agent.", + "oneOf": [ + { + "description": "A2A 1.0 wire format (protobuf-JSON envelopes, PascalCase methods).", + "enum": [ + "1.0" + ], + "title": "A2A 1.0", + "type": "string" + }, + { + "description": "A2A 0.3 wire format (`kind`-discriminated JSON-RPC objects).", + "enum": [ + "0.3" + ], + "title": "A2A 0.3", + "type": "string" + } + ] + } + }, + "properties": { + "auth_type": { + "allOf": [ + { + "$ref": "#/definitions/A2aAuthType" + } + ], + "default": "none", + "description": "How the gateway authenticates to the upstream agent. The credential is held by the gateway and is never forwarded from or exposed to the calling client." + }, + "client_id": { + "description": "OAuth client identifier used for the OAuth 2.0 client credentials grant. Required when `auth_type` is `oauth2`; ignored otherwise.", + "type": [ + "string", + "null" + ] + }, + "display_name": { + "description": "Operator-facing label, unique within the gateway. It is the path segment under which the agent is exposed to callers as `/a2a/`, so it must be a single non-empty URL path segment.", + "minLength": 1, + "type": "string" + }, + "enabled": { + "default": true, + "description": "Whether this agent is active. When `false`, it is not served and cannot be reached.", + "type": "boolean" + }, + "protocol_version": { + "allOf": [ + { + "$ref": "#/definitions/A2aProtocolVersion" + } + ], + "default": "1.0", + "description": "The A2A wire-format version the gateway pins for this agent. Pinned explicitly rather than inferred from client signals, so the served agent card and the accepted requests stay consistent." + }, + "scopes": { + "description": "OAuth scopes to request. Joined with spaces into the `scope` parameter of the token request. Only used when `auth_type` is `oauth2`.", + "items": { + "type": "string" + }, + "type": [ + "array", + "null" + ] + }, + "secret": { + "description": "Authentication credential for the upstream agent. Its meaning follows `auth_type`: the bearer token when `auth_type` is `bearer` (sent as `Authorization: Bearer `), the API key when `auth_type` is `api_key` (sent as `x-api-key: `), or the OAuth client secret when `auth_type` is `oauth2`. Leave unset when `auth_type` is `none`.", + "type": [ + "string", + "null" + ] + }, + "timeout_ms": { + "description": "Maximum time, in milliseconds, to wait for a single upstream operation (fetching the agent card or invoking the agent). Must be at least `1` when set. When omitted, the gateway applies a built-in default.", + "format": "uint64", + "minimum": 1.0, + "type": [ + "integer", + "null" + ] + }, + "token_url": { + "description": "OAuth token endpoint URL where the gateway exchanges the client credentials for an access token, such as `https://auth.example.com/oauth/token`. Required when `auth_type` is `oauth2`; ignored otherwise.", + "type": [ + "string", + "null" + ] + }, + "url": { + "description": "The upstream agent's base URL, reached over HTTP with the A2A protocol (JSON-RPC 2.0), such as `https://agents.example.com/a2a`. The agent card is discovered relative to this URL.", + "minLength": 1, + "type": "string" + } + }, + "required": [ + "display_name", + "url" + ], + "title": "A2aAgent", + "type": "object" +} diff --git a/schemas/resources/api_key.schema.json b/schemas/resources/api_key.schema.json index a52341a7..a72aad30 100644 --- a/schemas/resources/api_key.schema.json +++ b/schemas/resources/api_key.schema.json @@ -73,6 +73,16 @@ } }, "properties": { + "allowed_agents": { + "description": "A2A agents this key may reach, named by their registered names. Entries are matched as single-`*` globs, mirroring `allowed_tools`: `\"*\"` grants every agent and an entry without a `*` matches one agent exactly. When omitted or set to `null`, the key has no A2A agent access — access is granted explicitly.", + "items": { + "type": "string" + }, + "type": [ + "array", + "null" + ] + }, "allowed_models": { "description": "Model identifiers this key may use. An empty array denies access to every model.", "items": {