From 2f55cb98fdabafc1fd05be89248113924a2c94b9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?jordan=20gonz=C3=A1lez?= <30836115+duncanista@users.noreply.github.com> Date: Fri, 1 Aug 2025 16:30:21 -0600 Subject: [PATCH 1/6] update dependencies --- bottlecap/Cargo.lock | 146 ++++++++++++------------------------------- bottlecap/Cargo.toml | 12 ++-- 2 files changed, 45 insertions(+), 113 deletions(-) diff --git a/bottlecap/Cargo.lock b/bottlecap/Cargo.lock index b1c8abb5e..1897342a8 100644 --- a/bottlecap/Cargo.lock +++ b/bottlecap/Cargo.lock @@ -310,7 +310,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "021e862c184ae977658b36c4500f7feac3221ca5da43e3f25bd04ab6c79a29b5" dependencies = [ "axum-core", - "bytes 1.10.1", + "bytes", "form_urlencoded", "futures-util", "http 1.3.1", @@ -318,7 +318,7 @@ dependencies = [ "http-body-util", "hyper 1.6.0", "hyper-util", - "itoa 1.0.15", + "itoa", "matchit", "memchr", "mime", @@ -343,7 +343,7 @@ version = "0.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "68464cd0412f486726fb3373129ef5d2993f90c34bc2bc1c1e9943b2f4fc7ca6" dependencies = [ - "bytes 1.10.1", + "bytes", "futures-core", "http 1.3.1", "http-body 1.0.1", @@ -489,7 +489,7 @@ dependencies = [ "async-trait", "axum", "base64 0.22.1", - "bytes 1.10.1", + "bytes", "chrono", "datadog-fips", "datadog-protos 0.1.0 (git+https://github.com/DataDog/saluki/?rev=c89b58e5784b985819baf11f13f7d35876741222)", @@ -501,10 +501,10 @@ dependencies = [ "ddsketch-agent 0.1.0 (git+https://github.com/DataDog/saluki/)", "dogstatsd", "figment", - "futures 0.3.31", + "futures", "hex", "hmac", - "http-body 0.1.0", + "http-body 1.0.1", "http-body-util", "httpmock", "hyper 1.6.0", @@ -556,16 +556,6 @@ version = "1.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b" -[[package]] -name = "bytes" -version = "0.4.12" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "206fdffcfa2df7cbe15601ef46c813fce0965eb3286db6b56c583b814b51c81c" -dependencies = [ - "byteorder", - "iovec", -] - [[package]] name = "bytes" version = "1.10.1" @@ -735,7 +725,7 @@ name = "datadog-protos" version = "0.1.0" source = "git+https://github.com/DataDog/saluki/?rev=c89b58e5784b985819baf11f13f7d35876741222#c89b58e5784b985819baf11f13f7d35876741222" dependencies = [ - "bytes 1.10.1", + "bytes", "prost", "protobuf", "protobuf-codegen", @@ -748,7 +738,7 @@ name = "datadog-protos" version = "0.1.0" source = "git+https://github.com/DataDog/saluki/#2d0d136bb6c52ca057c5436edecafa48c715d718" dependencies = [ - "bytes 1.10.1", + "bytes", "prost", "protobuf", "protobuf-codegen", @@ -798,12 +788,12 @@ version = "19.1.0" source = "git+https://github.com/DataDog/libdatadog?rev=f8a01a563f3cacbc825cb5bff5d5611b2ac88f55#f8a01a563f3cacbc825cb5bff5d5611b2ac88f55" dependencies = [ "anyhow", - "bytes 1.10.1", + "bytes", "datadog-trace-normalization", "datadog-trace-protobuf", "ddcommon", "flate2", - "futures 0.3.31", + "futures", "http-body-util", "hyper 1.6.0", "hyper-http-proxy", @@ -828,7 +818,7 @@ dependencies = [ "anyhow", "cc", "const_format", - "futures 0.3.31", + "futures", "futures-core", "futures-util", "hex", @@ -1101,12 +1091,6 @@ version = "1.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "42703706b716c37f96a77aea830392ad231f44c9e9a67872fa5548707e11b11c" -[[package]] -name = "futures" -version = "0.1.31" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3a471a38ef8ed83cd6e40aa59c1ffe17db6855c18e3604d9c4ed8c08ebc28678" - [[package]] name = "futures" version = "0.3.31" @@ -1277,7 +1261,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "17da50a276f1e01e0ba6c029e47b7100754904ee8a278f886546e98575380785" dependencies = [ "atomic-waker", - "bytes 1.10.1", + "bytes", "fnv", "futures-core", "futures-sink", @@ -1308,7 +1292,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b3314d5adb5d94bcdf56771f2e50dbbc80bb4bdf88967526706205ac9eff24eb" dependencies = [ "base64 0.22.1", - "bytes 1.10.1", + "bytes", "headers-core", "http 1.3.1", "httpdate", @@ -1361,26 +1345,15 @@ dependencies = [ "windows-sys 0.59.0", ] -[[package]] -name = "http" -version = "0.1.21" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d6ccf5ede3a895d8856620237b2f02972c1bbc78d2965ad7fe8838d4a0ed41f0" -dependencies = [ - "bytes 0.4.12", - "fnv", - "itoa 0.4.8", -] - [[package]] name = "http" version = "0.2.12" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "601cbb57e577e2f5ef5be8e7b83f0f63994f25aa94d673e54a92d5c516d101f1" dependencies = [ - "bytes 1.10.1", + "bytes", "fnv", - "itoa 1.0.15", + "itoa", ] [[package]] @@ -1389,21 +1362,9 @@ version = "1.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f4a85d31aea989eead29a3aaf9e1115a180df8282431156e533de47660892565" dependencies = [ - "bytes 1.10.1", + "bytes", "fnv", - "itoa 1.0.15", -] - -[[package]] -name = "http-body" -version = "0.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6741c859c1b2463a423a1dbce98d418e6c3c3fc720fb0d45528657320920292d" -dependencies = [ - "bytes 0.4.12", - "futures 0.1.31", - "http 0.1.21", - "tokio-buf", + "itoa", ] [[package]] @@ -1412,7 +1373,7 @@ version = "0.4.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7ceab25649e9960c0311ea418d17bee82c0dcec1bd053b5f9a66e265a693bed2" dependencies = [ - "bytes 1.10.1", + "bytes", "http 0.2.12", "pin-project-lite", ] @@ -1423,7 +1384,7 @@ version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1efedce1fb8e6913f23e0c92de8e62cd5b772a67e7b3946df930a62566c93184" dependencies = [ - "bytes 1.10.1", + "bytes", "http 1.3.1", ] @@ -1433,7 +1394,7 @@ version = "0.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b021d93e26becf5dc7e1b75b1bed1fd93124b374ceb73f43d4d4eafec896a64a" dependencies = [ - "bytes 1.10.1", + "bytes", "futures-core", "http 1.3.1", "http-body 1.0.1", @@ -1486,7 +1447,7 @@ version = "0.14.32" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "41dfc780fdec9373c01bae43289ea34c972e40ee3c9f6b3c8801a35f35586ce7" dependencies = [ - "bytes 1.10.1", + "bytes", "futures-channel", "futures-core", "futures-util", @@ -1494,7 +1455,7 @@ dependencies = [ "http-body 0.4.6", "httparse", "httpdate", - "itoa 1.0.15", + "itoa", "pin-project-lite", "socket2 0.5.10", "tokio", @@ -1509,7 +1470,7 @@ version = "1.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cc2b571658e38e0c01b1fdca3bbbe93c00d3d71693ff2770043f8c29bc7d6f80" dependencies = [ - "bytes 1.10.1", + "bytes", "futures-channel", "futures-util", "h2", @@ -1517,7 +1478,7 @@ dependencies = [ "http-body 1.0.1", "httparse", "httpdate", - "itoa 1.0.15", + "itoa", "pin-project-lite", "smallvec", "tokio", @@ -1530,7 +1491,7 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7ad4b0a1e37510028bc4ba81d0e38d239c39671b0f0ce9e02dfa93a8133f7c08" dependencies = [ - "bytes 1.10.1", + "bytes", "futures-util", "headers", "http 1.3.1", @@ -1582,7 +1543,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8d9b05277c7e8da2c93a568989bb6207bef0112e8d17df7a6eda4a3cf143bc5e" dependencies = [ "base64 0.22.1", - "bytes 1.10.1", + "bytes", "futures-channel", "futures-core", "futures-util", @@ -1743,15 +1704,6 @@ dependencies = [ "libc", ] -[[package]] -name = "iovec" -version = "0.1.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b2b3ea6ff95e175473f8ffe6a7eb7c00d054240321b84c57051175fe3c1e075e" -dependencies = [ - "libc", -] - [[package]] name = "ipnet" version = "2.11.0" @@ -1795,12 +1747,6 @@ dependencies = [ "either", ] -[[package]] -name = "itoa" -version = "0.4.8" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b71991ff56294aa922b450139ee08b3bfc70982c6b2c7562771375cf73542dd4" - [[package]] name = "itoa" version = "1.0.15" @@ -2397,7 +2343,7 @@ version = "0.13.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2796faa41db3ec313a31f7624d9286acf277b52de526150b7e69f3debf891ee5" dependencies = [ - "bytes 1.10.1", + "bytes", "prost-derive", ] @@ -2449,7 +2395,7 @@ version = "3.7.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d65a1d4ddae7d8b5de68153b48f6aa3bba8cb002b243dbdbc55a5afbc98f99f4" dependencies = [ - "bytes 1.10.1", + "bytes", "once_cell", "protobuf-support", "thiserror 1.0.69", @@ -2507,7 +2453,7 @@ version = "0.11.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "626214629cda6781b6dc1d316ba307189c85ba657213ce642d9c77670f8202c8" dependencies = [ - "bytes 1.10.1", + "bytes", "cfg_aliases", "pin-project-lite", "quinn-proto", @@ -2527,7 +2473,7 @@ version = "0.11.12" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "49df843a9161c85bb8aae55f101bc0bac8bcafd637a620d9122fd7e0b2f7422e" dependencies = [ - "bytes 1.10.1", + "bytes", "getrandom 0.3.3", "lru-slab", "rand 0.9.2", @@ -2710,7 +2656,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cbc931937e6ca3a06e3b6c0aa7841849b160a90351d6ab467a8b9b9959767531" dependencies = [ "base64 0.22.1", - "bytes 1.10.1", + "bytes", "futures-core", "h2", "http 1.3.1", @@ -3024,7 +2970,7 @@ version = "1.0.142" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "030fedb782600dcbd6f02d479bf0d817ac3bb40d644745b769d6a96bc3afc5a7" dependencies = [ - "itoa 1.0.15", + "itoa", "memchr", "ryu", "serde", @@ -3036,7 +2982,7 @@ version = "0.1.17" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "59fab13f937fa393d08645bf3a84bdfe86e296747b506ada67bb15f10f218b2a" dependencies = [ - "itoa 1.0.15", + "itoa", "serde", ] @@ -3057,7 +3003,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d3491c14715ca2294c4d6a88f15e84739788c1d030eed8c110436aafdaa2f3fd" dependencies = [ "form_urlencoded", - "itoa 1.0.15", + "itoa", "ryu", "serde", ] @@ -3069,7 +3015,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6a8b1a1a2ebf674015cc02edccce75287f1a0130d394307b36743c2f5d504b47" dependencies = [ "indexmap 2.10.0", - "itoa 1.0.15", + "itoa", "ryu", "serde", "unsafe-libyaml", @@ -3081,7 +3027,7 @@ version = "3.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1b258109f244e1d6891bf1053a55d63a5cd4f8f4c30cf9a1280989f80e7a1fa9" dependencies = [ - "futures 0.3.31", + "futures", "log", "once_cell", "parking_lot", @@ -3404,7 +3350,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "43864ed400b6043a4757a25c7a64a8efde741aed79a056a2fb348a406701bb35" dependencies = [ "backtrace", - "bytes 1.10.1", + "bytes", "io-uring", "libc", "mio", @@ -3416,16 +3362,6 @@ dependencies = [ "windows-sys 0.59.0", ] -[[package]] -name = "tokio-buf" -version = "0.1.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8fb220f46c53859a4b7ec083e41dec9778ff0b1851c0942b211edb89e0ccdc46" -dependencies = [ - "bytes 0.4.12", - "futures 0.1.31", -] - [[package]] name = "tokio-macros" version = "2.5.0" @@ -3464,7 +3400,7 @@ version = "0.7.15" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "66a539a9ad6d5d281510d5bd368c973d636c02dbf8a67300bfb6b950696ad7df" dependencies = [ - "bytes 1.10.1", + "bytes", "futures-core", "futures-sink", "pin-project-lite", @@ -3479,7 +3415,7 @@ checksum = "877c5b330756d856ffcc4553ab34a5684481ade925ecc54bcd1bf02b1d0d4d52" dependencies = [ "async-trait", "base64 0.22.1", - "bytes 1.10.1", + "bytes", "http 1.3.1", "http-body 1.0.1", "http-body-util", @@ -3505,7 +3441,7 @@ checksum = "7e581ba15a835f4d9ea06c55ab1bd4dce26fc53752c69a04aac00703bfb49ba9" dependencies = [ "async-trait", "base64 0.22.1", - "bytes 1.10.1", + "bytes", "http 1.3.1", "http-body 1.0.1", "http-body-util", @@ -3589,7 +3525,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "adc82fd73de2a9722ac5da747f12383d2bfdb93591ee6c58486e0097890f05f2" dependencies = [ "bitflags 2.9.1", - "bytes 1.10.1", + "bytes", "futures-util", "http 1.3.1", "http-body 1.0.1", diff --git a/bottlecap/Cargo.toml b/bottlecap/Cargo.toml index e0cba5c08..073035792 100644 --- a/bottlecap/Cargo.toml +++ b/bottlecap/Cargo.toml @@ -10,13 +10,9 @@ bytes = { version = "1.2", default-features = false } chrono = { version = "0.4", features = ["serde", "std", "now"], default-features = false } figment = { version = "0.10", default-features = false, features = ["yaml", "env"] } hyper = { version = "1.6", default-features = false, features = ["server"] } -hyper-util = { version = "0.1.10", features = [ - "http1", - "client", - "client-legacy", -] } -http-body = "0.1" -http-body-util = "0.1" +hyper-util = { version = "0.1.16", features = ["http1", "client", "client-legacy"] } +http-body = { version = "1", default-features = false } +http-body-util = { version = "0.1", default-features = false } lazy_static = { version = "1.5", default-features = false } log = { version = "0.4", default-features = false } nix = { version = "0.26", default-features = false, features = ["feature", "fs"] } @@ -26,7 +22,7 @@ reqwest = { version = "0.12.11", features = ["json", "http2"], default-features serde = { version = "1.0", default-features = false, features = ["derive"] } serde_json = { version = "1.0", default-features = false, features = ["alloc"] } thiserror = { version = "1.0", default-features = false } -tokio = { version = "1.37", default-features = false, features = ["macros", "rt-multi-thread"] } +tokio = { version = "1.47", default-features = false, features = ["macros", "rt-multi-thread"] } tokio-util = { version = "0.7", default-features = false } tracing = { version = "0.1", default-features = false } tracing-core = { version = "0.1", default-features = false } From 39c4c15ad0b2364c80ff921f09c9de4773bd22f8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?jordan=20gonz=C3=A1lez?= <30836115+duncanista@users.noreply.github.com> Date: Fri, 1 Aug 2025 16:30:55 -0600 Subject: [PATCH 2/6] create `TeeBodyWithCompletion` basically allows us to funnel the data into a buffer for later usage --- bottlecap/src/proxy/tee_body.rs | 114 ++++++++++++++++++++++++++++++++ 1 file changed, 114 insertions(+) create mode 100644 bottlecap/src/proxy/tee_body.rs diff --git a/bottlecap/src/proxy/tee_body.rs b/bottlecap/src/proxy/tee_body.rs new file mode 100644 index 000000000..069abe6fa --- /dev/null +++ b/bottlecap/src/proxy/tee_body.rs @@ -0,0 +1,114 @@ +use axum::body::Bytes; +use std::pin::Pin; +use std::sync::Arc; +use std::task::{Context, Poll}; +use tokio::sync::{Mutex, oneshot}; +use tracing::error; + +/// Body that tees data into a buffer and signals completion. +/// +/// This is used to capture the proxied payload so it can be processed by the interceptor. +/// +/// The completion signal is sent when the body is fully read. +/// +/// The buffer is used to store the proxied payload. +pub struct TeeBodyWithCompletion { + inner: B, + buffer: Arc>>, + completion_sender: Arc>>>, +} + +impl TeeBodyWithCompletion { + pub fn new(body: B) -> (Self, oneshot::Receiver) { + let buffer = Arc::new(Mutex::new(Vec::new())); + let (completion_sender, completion_receiver) = oneshot::channel(); + + let tee_body = Self { + inner: body, + buffer, + completion_sender: Arc::new(Mutex::new(Some(completion_sender))), + }; + + (tee_body, completion_receiver) + } + + fn send_completion(&self) { + let buffer = self.buffer.clone(); + let sender = self.completion_sender.clone(); + + tokio::spawn(async move { + if let Ok(mut sender_guard) = sender.try_lock() { + if let Some(sender) = sender_guard.take() { + let collected_data = { + let buffer_guard = buffer.lock().await; + let data = buffer_guard.clone(); + Bytes::from(data) + }; + + if sender.send(collected_data).is_err() { + error!( + "PROXY | tee_body | unable to send completion signal, proxied payload won't be processed" + ); + } + } + } + }); + } +} + +impl http_body::Body for TeeBodyWithCompletion +where + B: http_body::Body, + B::Error: Into>, + B::Data: AsRef<[u8]> + Send + Sync + 'static, +{ + type Data = B::Data; + type Error = B::Error; + + fn poll_frame( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll, Self::Error>>> { + // Use unsafe to access the inner body while maintaining pinning + let inner = unsafe { self.as_mut().map_unchecked_mut(|s| &mut s.inner) }; + + match inner.poll_frame(cx) { + Poll::Ready(Some(Ok(frame))) => { + if let Some(data) = frame.data_ref() { + let buffer = self.buffer.clone(); + let data_bytes = data.as_ref().to_vec(); + + if let Ok(mut buf) = buffer.try_lock() { + buf.extend_from_slice(&data_bytes); + } + } + + // After processing the frame, check if the inner body is now at end of stream + // This handles cases where the body completes after receiving the final data frame + if self.inner.is_end_stream() { + self.send_completion(); + } + + Poll::Ready(Some(Ok(frame))) + } + Poll::Ready(None) => { + self.send_completion(); + Poll::Ready(None) + } + Poll::Ready(Some(Err(e))) => { + // Send completion even on error so receiver doesn't hang + self.send_completion(); + Poll::Ready(Some(Err(e))) + } + other => other, + } + } + + fn is_end_stream(&self) -> bool { + self.inner.is_end_stream() + } + + fn size_hint(&self) -> http_body::SizeHint { + self.inner.size_hint() + } +} From 31c615a3caebe1ff3a49daec4a5cf20c88d92640 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?jordan=20gonz=C3=A1lez?= <30836115+duncanista@users.noreply.github.com> Date: Fri, 1 Aug 2025 16:33:03 -0600 Subject: [PATCH 3/6] avoid awaiting for complete body, proxy directly completion signal from `tee_body` allows us to then process in a separate task without compromising performance --- bottlecap/src/proxy/interceptor.rs | 270 +++++++++++++++-------------- bottlecap/src/proxy/mod.rs | 1 + 2 files changed, 145 insertions(+), 126 deletions(-) diff --git a/bottlecap/src/proxy/interceptor.rs b/bottlecap/src/proxy/interceptor.rs index d2fe45f6a..93aa7ed4e 100644 --- a/bottlecap/src/proxy/interceptor.rs +++ b/bottlecap/src/proxy/interceptor.rs @@ -1,16 +1,16 @@ use crate::{ - EXTENSION_HOST, config::aws::AwsConfig, http::extract_request_body, + EXTENSION_HOST, config::aws::AwsConfig, lifecycle::invocation::processor::Processor as InvocationProcessor, lwa, + proxy::tee_body::TeeBodyWithCompletion, }; use axum::{ Router, body::{Body, Bytes}, extract::{Path, Request, State}, - http::{self, Request as HttpRequest, StatusCode, Uri}, + http::{self, HeaderMap, StatusCode, Uri}, response::{IntoResponse, Response}, routing::{get, post}, }; -use http_body_util::BodyExt; use hyper_util::{ client::legacy::{Client, connect::HttpConnector}, rt::TokioExecutor, @@ -129,53 +129,62 @@ async fn invocation_next_proxy( request: Request, ) -> Response { debug!("PROXY | invocation_next_proxy | api_version: {api_version}"); - let (parts, body_bytes) = match extract_request_body(request).await { + let (parts, body) = request.into_parts(); + let request = match build_proxy_request(&aws_config, parts, body) { Ok(r) => r, Err(e) => { + error!("PROXY | invocation_next_proxy | error building proxy request"); return ( StatusCode::INTERNAL_SERVER_ERROR, - format!("Failed to extract request body: {e}"), + format!("Failed to build proxy request: {e}"), ) .into_response(); } }; - let (intercepted_parts, intercepted_bytes) = - match proxy_request(&client, &aws_config, parts, body_bytes).await { - Ok(r) => r, - Err(e) => { - error!("PROXY | passthrough_proxy | error proxying request: {e}"); - return ( - StatusCode::INTERNAL_SERVER_ERROR, - format!("Failed to proxy request: {e}"), + debug!("PROXY | invocation_next_proxy | proxying {}", request.uri()); + let intercepted_response = match client.request(request).await { + Ok(r) => r, + Err(e) => { + error!("PROXY | invocation_next_proxy | error proxying request"); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + format!("Failed to proxy: {e}"), + ) + .into_response(); + } + }; + + let (intercepted_parts, intercepted_body) = intercepted_response.into_parts(); + + // Intercepted body is what the AWS Lambda event will be set as. + // Which is what we want to process. + let (intercepted_tee_body, intercepted_completion_receiver) = + TeeBodyWithCompletion::new(intercepted_body); + + let mut join_set = tasks.lock().await; + let intercepted_parts_clone = intercepted_parts.clone(); + join_set.spawn(async move { + if let Ok(body) = intercepted_completion_receiver.await { + debug!("PROXY | invocation_next_proxy | intercepted body completed"); + if aws_config.aws_lwa_proxy_lambda_runtime_api.is_some() { + lwa::process_invocation_next( + &invocation_processor, + &intercepted_parts_clone, + &body, ) - .into_response(); + .await; } - }; - - if aws_config.aws_lwa_proxy_lambda_runtime_api.is_some() { - let mut tasks = tasks.lock().await; - - let invocation_processor = invocation_processor.clone(); - let intercepted_parts = intercepted_parts.clone(); - let intercepted_bytes = intercepted_bytes.clone(); - tasks.spawn(async move { - lwa::process_invocation_next( - &invocation_processor, - &intercepted_parts, - &intercepted_bytes, - ) - .await; - }); - } + } + }); - match build_forward_response(intercepted_parts, intercepted_bytes) { + match build_forward_response(intercepted_parts, Body::new(intercepted_tee_body)) { Ok(r) => r, Err(e) => { - error!("PROXY | passthrough_proxy | error building forward response: {e}"); + error!("PROXY | invocation_next_proxy | error building response: {e}"); ( StatusCode::INTERNAL_SERVER_ERROR, - format!("Failed to build forward response: {e}"), + format!("Failed to build response: {e}"), ) .into_response() } @@ -190,47 +199,58 @@ async fn invocation_response_proxy( debug!( "PROXY | invocation_response_proxy | api_version: {api_version}, request_id: {request_id}" ); - let (parts, body_bytes) = match extract_request_body(request).await { + let (parts, body) = request.into_parts(); + let (outgoing_tee_body, outgoing_completion_receiver) = TeeBodyWithCompletion::new(body); + + // This is where LWA will be processed + let mut join_set = tasks.lock().await; + let aws_config_clone = aws_config.clone(); + join_set.spawn(async move { + if let Ok(body) = outgoing_completion_receiver.await { + debug!("PROXY | invocation_response_proxy | intercepted outgoing body completed"); + if aws_config_clone.aws_lwa_proxy_lambda_runtime_api.is_some() { + lwa::process_invocation_response(&invocation_processor, &body).await; + } + } + }); + + let request = match build_proxy_request(&aws_config, parts, outgoing_tee_body) { Ok(r) => r, Err(e) => { + error!("PROXY | invocation_response_proxy | error building proxy request"); return ( StatusCode::INTERNAL_SERVER_ERROR, - format!("Failed to extract request body: {e}"), + format!("Failed to build proxy request: {e}"), ) .into_response(); } }; - if aws_config.aws_lwa_proxy_lambda_runtime_api.is_some() { - let mut tasks = tasks.lock().await; - - let invocation_processor = invocation_processor.clone(); - let body_bytes = body_bytes.clone(); - tasks.spawn(async move { - lwa::process_invocation_response(&invocation_processor, &body_bytes).await; - }); - } - - let (intercepted_parts, intercepted_bytes) = - match proxy_request(&client, &aws_config, parts, body_bytes).await { - Ok(r) => r, - Err(e) => { - error!("PROXY | passthrough_proxy | error proxying request: {e}"); - return ( - StatusCode::INTERNAL_SERVER_ERROR, - format!("Failed to proxy request: {e}"), - ) - .into_response(); - } - }; + debug!( + "PROXY | invocation_response_proxy | proxying {}", + request.uri() + ); + // Send the streaming request + let intercepted_response = match client.request(request).await { + Ok(r) => r, + Err(e) => { + error!("PROXY | invocation_response_proxy | error proxying request"); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + format!("Failed to proxy: {e}"), + ) + .into_response(); + } + }; - match build_forward_response(intercepted_parts, intercepted_bytes) { + let (intercepted_parts, intercepted_body) = intercepted_response.into_parts(); + match build_forward_response(intercepted_parts, Body::new(intercepted_body)) { Ok(r) => r, Err(e) => { - error!("PROXY | passthrough_proxy | error building forward response: {e}"); + error!("PROXY | invocation_response_proxy | error building response: {e}"); ( StatusCode::INTERNAL_SERVER_ERROR, - format!("Failed to build forward response: {e}"), + format!("Failed to build response: {e}"), ) .into_response() } @@ -241,119 +261,118 @@ async fn passthrough_proxy( State((aws_config, client, _, _)): State, request: Request, ) -> Response { - let (parts, body_bytes) = match extract_request_body(request).await { + let (parts, body) = request.into_parts(); + + let request = match build_proxy_request(&aws_config, parts, body) { Ok(r) => r, Err(e) => { - error!("PROXY | passthrough_proxy | error extracting request body: {e}"); + error!("PROXY | passthrough_proxy | error building proxy request"); return ( StatusCode::INTERNAL_SERVER_ERROR, - format!("Failed to extract request body: {e}"), + format!("Failed to build proxy request: {e}"), ) .into_response(); } }; - let (intercepted_parts, intercepted_bytes) = - match proxy_request(&client, &aws_config, parts, body_bytes).await { - Ok(r) => r, - Err(e) => { - error!("PROXY | passthrough_proxy | error proxying request: {e}"); - return ( - StatusCode::INTERNAL_SERVER_ERROR, - format!("Failed to proxy request: {e}"), - ) - .into_response(); - } - }; + // Send the streaming request + let intercepted_response = match client.request(request).await { + Ok(r) => r, + Err(e) => { + error!("PROXY | passthrough_proxy | error proxying request"); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + format!("Failed to proxy: {e}"), + ) + .into_response(); + } + }; - match build_forward_response(intercepted_parts, intercepted_bytes) { + let (intercepted_parts, intercepted_body) = intercepted_response.into_parts(); + match build_forward_response(intercepted_parts, Body::new(intercepted_body)) { Ok(r) => r, Err(e) => { - error!("PROXY | passthrough_proxy | error building forward response: {e}"); + error!("PROXY | passthrough_proxy | error building response: {e}"); ( StatusCode::INTERNAL_SERVER_ERROR, - format!("Failed to build forward response: {e}"), + format!("Failed to build response: {e}"), ) .into_response() } } } -async fn proxy_request( - client: &Client, - aws_config: &AwsConfig, - parts: http::request::Parts, - body_bytes: Bytes, -) -> Result<(http::response::Parts, Bytes), Box> { - let request = build_proxy_request(aws_config, parts, body_bytes)?; - debug!("PROXY | proxy_request | calling {}", request.uri()); - let intercepted_response = client.request(request).await?; - let (parts, body) = intercepted_response.into_parts(); - let bytes = body.collect().await?.to_bytes(); - - Ok((parts, bytes)) +fn clean_proxy_headers(headers: &mut HeaderMap) { + // Remove hop-by-hop headers that shouldn't be forwarded + headers.remove("connection"); + headers.remove("upgrade"); + headers.remove("proxy-connection"); + headers.remove("proxy-authenticate"); + headers.remove("proxy-authorization"); + headers.remove("te"); + + // For streaming, we preserve transfer-encoding and content-length + // The underlying HTTP implementation will handle them correctly } -fn build_forward_response( +fn build_forward_response( parts: http::response::Parts, - body_bytes: Bytes, -) -> Result, Box> { - let mut forward_response = Response::builder() + body: B, +) -> Result, Box> +where + B: http_body::Body + Send + 'static, + B::Error: Into>, +{ + let mut response_builder = Response::builder() .status(parts.status) .version(parts.version); - if let Some(h) = forward_response.headers_mut() { - *h = parts.headers; - - // Since the body has been already collected, we can set the content-length header instead. - if h.contains_key("transfer-encoding") { - h.remove("transfer-encoding"); - h.insert("content-length", body_bytes.len().to_string().parse()?); - } + if let Some(headers) = response_builder.headers_mut() { + *headers = parts.headers; + clean_proxy_headers(headers); } - let forward_response = forward_response.body(Body::from(body_bytes))?; + let response = response_builder.body(body)?; - Ok(forward_response) + Ok(response) } -fn build_proxy_request( +fn build_proxy_request( aws_config: &AwsConfig, parts: http::request::Parts, - body_bytes: Bytes, -) -> Result, Box> { + body: B, +) -> Result, Box> +where + B: http_body::Body + Send + 'static, + B::Error: Into>, +{ let uri = parts.uri.clone(); - let target_path = uri .path_and_query() .map(std::string::ToString::to_string) .unwrap_or(uri.path().to_string()); - let target_uri = format!("http://{}{}", aws_config.runtime_api, target_path); - let uri = target_uri.parse::()?; + let parsed_uri = target_uri.parse::()?; - let mut request = HttpRequest::builder() + let mut request_builder = hyper::Request::builder() .method(&parts.method) - .uri(uri) + .uri(parsed_uri) .version(parts.version); - if let Some(h) = request.headers_mut() { - *h = parts.headers.clone(); - - // Since the body has been already collected, we can set the content-length header instead. - if h.contains_key("transfer-encoding") { - h.remove("transfer-encoding"); - h.insert("content-length", body_bytes.len().to_string().parse()?); - } + if let Some(headers) = request_builder.headers_mut() { + *headers = parts.headers.clone(); + clean_proxy_headers(headers); } - let request = request.body(Body::from(body_bytes))?; + let hyper_body = Body::new(body); + let request = request_builder.body(hyper_body)?; Ok(request) } #[cfg(test)] mod tests { + use http_body_util::BodyExt; use std::{ collections::HashMap, sync::Mutex, @@ -442,9 +461,8 @@ mod tests { .await; } - let body_bytes = ask_proxy - .expect("failed to retrieve response") - .into_body() + let (_, body) = ask_proxy.expect("failed to retrieve response").into_parts(); + let body_bytes = body .collect() .await .expect("failed to collect response body") diff --git a/bottlecap/src/proxy/mod.rs b/bottlecap/src/proxy/mod.rs index c7c83b444..8ec70565f 100644 --- a/bottlecap/src/proxy/mod.rs +++ b/bottlecap/src/proxy/mod.rs @@ -3,6 +3,7 @@ use std::{env, sync::Arc}; use crate::config::{Config, aws::AwsConfig}; pub mod interceptor; +pub mod tee_body; /// Returns true if the proxy should be started. /// From c940eeb409dcd801d34c8482743ca702524aa0f5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?jordan=20gonz=C3=A1lez?= <30836115+duncanista@users.noreply.github.com> Date: Fri, 1 Aug 2025 16:37:40 -0600 Subject: [PATCH 4/6] update license --- bottlecap/LICENSE-3rdparty.csv | 5 ----- 1 file changed, 5 deletions(-) diff --git a/bottlecap/LICENSE-3rdparty.csv b/bottlecap/LICENSE-3rdparty.csv index 10cd11164..a49476fa6 100644 --- a/bottlecap/LICENSE-3rdparty.csv +++ b/bottlecap/LICENSE-3rdparty.csv @@ -18,7 +18,6 @@ block-buffer,https://github.com/RustCrypto/utils,MIT OR Apache-2.0,RustCrypto De bumpalo,https://github.com/fitzgen/bumpalo,MIT OR Apache-2.0,Nick Fitzgerald bytemuck,https://github.com/Lokathor/bytemuck,Zlib OR Apache-2.0 OR MIT,Lokathor byteorder,https://github.com/BurntSushi/byteorder,Unlicense OR MIT,Andrew Gallant -bytes,https://github.com/carllerche/bytes,MIT,Carl Lerche bytes,https://github.com/tokio-rs/bytes,MIT,"Carl Lerche , Sean McArthur " cc,https://github.com/rust-lang/cc-rs,MIT OR Apache-2.0,Alex Crichton cfg-if,https://github.com/rust-lang/cfg-if,MIT OR Apache-2.0,Alex Crichton @@ -52,7 +51,6 @@ flate2,https://github.com/rust-lang/flate2-rs,MIT OR Apache-2.0,"Alex Crichton < float-cmp,https://github.com/mikedilger/float-cmp,MIT,Mike Dilger fnv,https://github.com/servo/rust-fnv,Apache-2.0 OR MIT,Alex Crichton form_urlencoded,https://github.com/servo/rust-url,MIT OR Apache-2.0,The rust-url developers -futures,https://github.com/rust-lang-nursery/futures-rs,MIT OR Apache-2.0,Alex Crichton futures,https://github.com/rust-lang/futures-rs,MIT OR Apache-2.0,The futures Authors futures-channel,https://github.com/rust-lang/futures-rs,MIT OR Apache-2.0,The futures-channel Authors futures-core,https://github.com/rust-lang/futures-rs,MIT OR Apache-2.0,The futures-core Authors @@ -73,7 +71,6 @@ headers-core,https://github.com/hyperium/headers,MIT,Sean McArthur hmac,https://github.com/RustCrypto/MACs,MIT OR Apache-2.0,RustCrypto Developers http,https://github.com/hyperium/http,MIT OR Apache-2.0,"Alex Crichton , Carl Lerche , Sean McArthur " -http-body,https://github.com/hyperium/http-body,MIT,Carl Lerche http-body,https://github.com/hyperium/http-body,MIT,"Carl Lerche , Lucio Franco , Sean McArthur " http-body-util,https://github.com/hyperium/http-body,MIT,"Carl Lerche , Lucio Franco , Sean McArthur " httparse,https://github.com/seanmonstar/httparse,MIT OR Apache-2.0,Sean McArthur @@ -96,7 +93,6 @@ indexmap,https://github.com/bluss/indexmap,Apache-2.0 OR MIT,The indexmap Author indexmap,https://github.com/indexmap-rs/indexmap,Apache-2.0 OR MIT,The indexmap Authors inlinable_string,https://github.com/fitzgen/inlinable_string,Apache-2.0 OR MIT,Nick Fitzgerald io-uring,https://github.com/tokio-rs/io-uring,MIT OR Apache-2.0,quininer -iovec,https://github.com/carllerche/iovec,MIT OR Apache-2.0,Carl Lerche ipnet,https://github.com/krisprice/ipnet,MIT OR Apache-2.0,Kris Price iri-string,https://github.com/lo48576/iri-string,MIT OR Apache-2.0,YOSHIOKA Takuma itertools,https://github.com/rust-itertools/itertools,MIT OR Apache-2.0,bluss @@ -210,7 +206,6 @@ tinystr,https://github.com/unicode-org/icu4x,Unicode-3.0,The ICU4X Project Devel tinyvec,https://github.com/Lokathor/tinyvec,Zlib OR Apache-2.0 OR MIT,Lokathor tinyvec_macros,https://github.com/Soveu/tinyvec_macros,MIT OR Apache-2.0 OR Zlib,Soveu tokio,https://github.com/tokio-rs/tokio,MIT,Tokio Contributors -tokio-buf,https://github.com/tokio-rs/tokio,MIT,Carl Lerche tokio-macros,https://github.com/tokio-rs/tokio,MIT,Tokio Contributors tokio-rustls,https://github.com/rustls/tokio-rustls,MIT OR Apache-2.0,The tokio-rustls Authors tokio-stream,https://github.com/tokio-rs/tokio,MIT,Tokio Contributors From 3966a4b05f3352b721d3eb8c242cad5c82a76f3f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?jordan=20gonz=C3=A1lez?= <30836115+duncanista@users.noreply.github.com> Date: Fri, 1 Aug 2025 16:40:33 -0600 Subject: [PATCH 5/6] update docs --- bottlecap/src/proxy/interceptor.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/bottlecap/src/proxy/interceptor.rs b/bottlecap/src/proxy/interceptor.rs index 93aa7ed4e..e93ecec6f 100644 --- a/bottlecap/src/proxy/interceptor.rs +++ b/bottlecap/src/proxy/interceptor.rs @@ -202,7 +202,8 @@ async fn invocation_response_proxy( let (parts, body) = request.into_parts(); let (outgoing_tee_body, outgoing_completion_receiver) = TeeBodyWithCompletion::new(body); - // This is where LWA will be processed + // The outgoing body is what the final user will see. + // Which is what AWS Lambda returns, in turn what we want to process. let mut join_set = tasks.lock().await; let aws_config_clone = aws_config.clone(); join_set.spawn(async move { From fbc83018ac80fe036b1d0ba8c770744e00ee2ce8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?jordan=20gonz=C3=A1lez?= <30836115+duncanista@users.noreply.github.com> Date: Mon, 4 Aug 2025 10:28:18 -0600 Subject: [PATCH 6/6] use `lock` as opposed to `try_lock` also added safety docs on unsafe pin --- bottlecap/src/proxy/tee_body.rs | 31 ++++++++++++++++++------------- 1 file changed, 18 insertions(+), 13 deletions(-) diff --git a/bottlecap/src/proxy/tee_body.rs b/bottlecap/src/proxy/tee_body.rs index 069abe6fa..e977a5d3f 100644 --- a/bottlecap/src/proxy/tee_body.rs +++ b/bottlecap/src/proxy/tee_body.rs @@ -37,19 +37,18 @@ impl TeeBodyWithCompletion { let sender = self.completion_sender.clone(); tokio::spawn(async move { - if let Ok(mut sender_guard) = sender.try_lock() { - if let Some(sender) = sender_guard.take() { - let collected_data = { - let buffer_guard = buffer.lock().await; - let data = buffer_guard.clone(); - Bytes::from(data) - }; + let mut sender_guard = sender.lock().await; + if let Some(sender) = sender_guard.take() { + let collected_data = { + let buffer_guard = buffer.lock().await; + let data = buffer_guard.clone(); + Bytes::from(data) + }; - if sender.send(collected_data).is_err() { - error!( - "PROXY | tee_body | unable to send completion signal, proxied payload won't be processed" - ); - } + if sender.send(collected_data).is_err() { + error!( + "PROXY | tee_body | unable to send completion signal, proxied payload won't be processed" + ); } } }); @@ -69,7 +68,11 @@ where mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll, Self::Error>>> { - // Use unsafe to access the inner body while maintaining pinning + // SAFETY: This is safe because: + // 1. We're only accessing the `inner` field, which is the only field that needs pinning + // 2. The `inner` field implements `http_body::Body` which has proper pinning guarantees + // 3. We're not moving or dropping the pinned data, just calling methods on it + // 4. The `Arc>` fields don't require pinning as they're shared references let inner = unsafe { self.as_mut().map_unchecked_mut(|s| &mut s.inner) }; match inner.poll_frame(cx) { @@ -78,6 +81,8 @@ where let buffer = self.buffer.clone(); let data_bytes = data.as_ref().to_vec(); + // Use try_lock here since we're in a sync context and don't want to block + // If it fails, we'll just skip this chunk of data rather than blocking if let Ok(mut buf) = buffer.try_lock() { buf.extend_from_slice(&data_bytes); }