Skip to content
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Added common internal HTTP metrics to the connector used by AWS sinks.

authors: gwenaskell
16 changes: 12 additions & 4 deletions src/aws/auth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,10 @@ impl AwsAuthentication {
external_id: Option<&str>,
session_name: Option<&str>,
) -> crate::Result<AssumeRoleProviderBuilder> {
let connector = super::connector(proxy, tls_options)?;
let connector = super::AwsHttpClient {
http: super::connector(proxy, tls_options)?,
region: region.clone(),
};
Comment thread
gwenaskell marked this conversation as resolved.
let config = SdkConfig::builder()
.http_client(connector)
.region(region.clone())
Expand Down Expand Up @@ -311,15 +314,17 @@ impl AwsAuthentication {
profile,
region,
} => {
let connector = super::connector(proxy, tls_options)?;

// The SDK uses the default profile out of the box, but doesn't provide an optional
// type in the builder. We can just hardcode it so that everything works.
let profile_files = EnvConfigFiles::builder()
.with_file(EnvConfigFileKind::Credentials, credentials_file)
.build();

let auth_region = region.clone().map(Region::new).unwrap_or(service_region);
let connector = super::AwsHttpClient {
http: super::connector(proxy, tls_options)?,
region: auth_region.clone(),
};
let provider_config = ProviderConfig::empty()
.with_region(Option::from(auth_region))
.with_http_client(connector);
Expand Down Expand Up @@ -391,7 +396,10 @@ async fn default_credentials_provider(
tls_options: Option<&TlsConfig>,
imds: ImdsAuthentication,
) -> crate::Result<SharedCredentialsProvider> {
let connector = super::connector(proxy, tls_options)?;
let connector = super::AwsHttpClient {
http: super::connector(proxy, tls_options)?,
region: region.clone(),
};
Comment thread
gwenaskell marked this conversation as resolved.

let provider_config = ProviderConfig::empty()
.with_region(Some(region.clone()))
Expand Down
114 changes: 100 additions & 14 deletions src/aws/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use std::{
atomic::{AtomicUsize, Ordering},
},
task::{Context, Poll},
time::{Duration, SystemTime},
time::{Duration, Instant, SystemTime},
};

pub use auth::{AwsAuthentication, ImdsAuthentication};
Expand All @@ -37,8 +37,7 @@ use aws_smithy_runtime_api::client::{
use aws_smithy_types::body::SdkBody;
use aws_types::sdk_config::SharedHttpClient;
use bytes::Bytes;
use futures_util::FutureExt;
use http::HeaderMap;
use http::{HeaderMap, header::HeaderValue};
use http_body::{Body, combinators::BoxBody};
use pin_project::pin_project;
use regex::RegexSet;
Expand All @@ -49,7 +48,13 @@ pub use timeout::AwsTimeout;
use crate::{
config::ProxyConfig,
http::{build_proxy_connector, build_tls_connector, status},
internal_events::AwsBytesSent,
internal_events::{
AwsBytesSent,
http_client::{
AboutToSendHttpRequest, GotHttpResponse, GotHttpWarning, HttpRequestTelemetry,
HttpResponseTelemetry,
},
},
tls::{MaybeTlsSettings, TlsConfig},
};

Expand Down Expand Up @@ -134,6 +139,7 @@ pub fn region_provider(
proxy: &ProxyConfig,
tls_options: Option<&TlsConfig>,
) -> crate::Result<impl ProvideRegion + use<>> {
// Region is not yet known here, so we cannot wrap with AwsHttpClient for observability.
let config = aws_config::provider_config::ProviderConfig::default()
.with_http_client(connector(proxy, tls_options)?);

Expand Down Expand Up @@ -350,12 +356,77 @@ struct AwsConnector<T> {
region: Region,
}

// ── Telemetry trait implementations for the AWS SDK HTTP types ────────────────
//
// `HttpRequest` and `HttpResponse` are the SDK's own structs (not `http::Request`
// / `http::Response`), so they cannot implement the hyper-specific body/version
// accessors. The required method impls (method/uri and status) are enough for
// metric labels; the optional rich-logging fields fall back to `None`.

impl HttpRequestTelemetry for HttpRequest {
fn method(&self) -> &str {
self.method()
}

fn uri(&self) -> String {
self.uri().to_string()
}

fn headers(&self) -> HeaderMap<HeaderValue> {
smithy_headers_to_map(self.headers())
}

fn body_size_hint(&self) -> (u64, Option<u64>) {
let hint = Body::size_hint(self.body());
(hint.lower(), hint.upper())
}
}

impl HttpResponseTelemetry for HttpResponse {
fn status_u16(&self) -> u16 {
self.status().as_u16()
}

fn headers(&self) -> HeaderMap<HeaderValue> {
smithy_headers_to_map(self.headers())
}

fn body_size_hint(&self) -> (u64, Option<u64>) {
let hint = Body::size_hint(self.body());
(hint.lower(), hint.upper())
}
}

/// Converts the AWS SDK's string-pair header iterator into an `http::HeaderMap`.
/// Sanitization (marking sensitive headers) is handled by the trait's default
/// `sanitized_headers` method.
fn smithy_headers_to_map(
headers: &aws_smithy_runtime_api::http::Headers,
) -> HeaderMap<HeaderValue> {
let mut map = HeaderMap::with_capacity(headers.len());
for (name, value) in headers {
let Ok(header_name) = http::HeaderName::from_bytes(name.as_bytes()) else {
continue;
};
let Ok(header_value) = HeaderValue::from_str(value) else {
continue;
};
map.insert(header_name, header_value);
}
map
}

// ─────────────────────────────────────────────────────────────────────────────

impl<T> HttpConnector for AwsConnector<T>
where
T: HttpConnector,
{
fn call(&self, req: HttpRequest) -> HttpConnectorFuture {
let bytes_sent = Arc::new(std::sync::atomic::AtomicUsize::new(0));
let bytes_sent = Arc::new(AtomicUsize::new(0));

emit!(AboutToSendHttpRequest { request: &req });

let req = req.map(|body| {
let bytes_sent = Arc::clone(&bytes_sent);
body.map_preserve_contents(move |body| {
Expand All @@ -367,17 +438,32 @@ where
let fut = self.http.call(req);
let region = self.region.clone();

HttpConnectorFuture::new(fut.inspect(move |result| {
HttpConnectorFuture::new(async move {
let before = Instant::now();
let result = fut.await;
let roundtrip = before.elapsed();
let byte_size = bytes_sent.load(Ordering::Relaxed);
if let Ok(result) = result
&& result.status().is_success()
{
emit!(AwsBytesSent {
byte_size,
region: Some(region),
});

match &result {
Ok(response) => {
if response.status().is_success() {
emit!(AwsBytesSent {
byte_size,
region: Some(region),
});
}
emit!(GotHttpResponse {
response,
roundtrip
});
}
Err(error) => {
emit!(GotHttpWarning { error, roundtrip });
}
}
}))

result
})
}
}

Expand Down
Loading
Loading