Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 73 additions & 0 deletions Cargo.lock
Original file line number Diff line number Diff line change
Expand Up @@ -1705,8 +1705,18 @@ dependencies = [
name = "linkerd-http-classify"
version = "0.1.0"
dependencies = [
"futures",
"http",
"http-body",
"linkerd-error",
"linkerd-http-box",
"linkerd-stack",
"linkerd-tracing",
"pin-project",
"tokio",
"tokio-test",
"tower-test",
"tracing",
]

[[package]]
Expand Down Expand Up @@ -1753,6 +1763,16 @@ dependencies = [
"tracing",
]

[[package]]
name = "linkerd-http-override-authority"
version = "0.1.0"
dependencies = [
"http",
"linkerd-stack",
"tower",
"tracing",
]

[[package]]
name = "linkerd-http-prom"
version = "0.1.0"
Expand All @@ -1772,6 +1792,17 @@ dependencies = [
"tokio",
]

[[package]]
name = "linkerd-http-retain"
version = "0.1.0"
dependencies = [
"http",
"http-body",
"linkerd-stack",
"pin-project",
"tower",
]

[[package]]
name = "linkerd-http-retry"
version = "0.1.0"
Expand Down Expand Up @@ -1808,6 +1839,44 @@ dependencies = [
"url",
]

[[package]]
name = "linkerd-http-stream-timeouts"
version = "0.1.0"
dependencies = [
"futures",
"http",
"http-body",
"linkerd-error",
"linkerd-stack",
"parking_lot",
"pin-project",
"thiserror",
"tokio",
"tracing",
]

[[package]]
name = "linkerd-http-upgrade"
version = "0.1.0"
dependencies = [
"bytes",
"drain",
"futures",
"http",
"http-body",
"hyper",
"linkerd-duplex",
"linkerd-error",
"linkerd-http-version",
"linkerd-io",
"linkerd-stack",
"pin-project",
"tokio",
"tower",
"tracing",
"try-lock",
]

[[package]]
name = "linkerd-http-version"
version = "0.1.0"
Expand Down Expand Up @@ -2178,6 +2247,10 @@ dependencies = [
"linkerd-http-executor",
"linkerd-http-h2",
"linkerd-http-insert",
"linkerd-http-override-authority",
"linkerd-http-retain",
"linkerd-http-stream-timeouts",
"linkerd-http-upgrade",
"linkerd-http-version",
"linkerd-io",
"linkerd-proxy-balance",
Expand Down
4 changes: 4 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,13 @@ members = [
"linkerd/http/h2",
"linkerd/http/insert",
"linkerd/http/metrics",
"linkerd/http/override-authority",
"linkerd/http/prom",
"linkerd/http/retain",
"linkerd/http/retry",
"linkerd/http/route",
"linkerd/http/stream-timeouts",
"linkerd/http/upgrade",
"linkerd/http/version",
"linkerd/identity",
"linkerd/idle-cache",
Expand Down
12 changes: 12 additions & 0 deletions linkerd/http/classify/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,18 @@ edition = "2021"
publish = false

[dependencies]
futures = { version = "0.3", default-features = false }
http = "0.2"
http-body = "0.4"
pin-project = "1"
tokio = { version = "1", default-features = false }
tracing = "0.1"

linkerd-error = { path = "../../error" }
linkerd-http-box = { path = "../../http/box" }
linkerd-stack = { path = "../../stack" }

[dev-dependencies]
tokio-test = "0.4"
tower-test = "0.4"
linkerd-tracing = { path = "../../tracing", features = ["ansi"] }
Original file line number Diff line number Diff line change
Expand Up @@ -207,10 +207,10 @@ where

// === impl ResponseBody ===

impl<C, B> hyper::body::HttpBody for ResponseBody<C, B>
impl<C, B> http_body::Body for ResponseBody<C, B>
where
C: ClassifyEos + Unpin,
B: hyper::body::HttpBody<Error = Error>,
B: http_body::Body<Error = Error>,
{
type Data = B::Data;
type Error = B::Error;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::classify::{BroadcastClassification, ClassifyResponse};
use crate::{channel::BroadcastClassification, ClassifyResponse};
use linkerd_stack::{gate, layer, ExtractParam, Gate, NewService};
use std::marker::PhantomData;
use tokio::sync::mpsc;
Expand Down
10 changes: 10 additions & 0 deletions linkerd/http/classify/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,16 @@

use linkerd_error::Error;

pub use self::{
channel::{BroadcastClassification, NewBroadcastClassification, Tx},
gate::{NewClassifyGate, NewClassifyGateSet},
insert::{InsertClassifyResponse, NewInsertClassifyResponse},
};

pub mod channel;
pub mod gate;
mod insert;

/// Determines how a request's response should be classified.
pub trait Classify {
type Class: Clone + Send + Sync + 'static;
Expand Down
17 changes: 17 additions & 0 deletions linkerd/http/override-authority/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
[package]
name = "linkerd-http-override-authority"
version = "0.1.0"
authors = ["Linkerd Developers <[email protected]>"]
license = "Apache-2.0"
edition = "2021"
publish = false
description = """
Tower middleware to override request authorities.
"""

[dependencies]
http = "0.2"
tower = { version = "0.4", default-features = false }
tracing = "0.1"

linkerd-stack = { path = "../../stack" }
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
use http::{header::AsHeaderName, uri::Authority};
use http::{
header::AsHeaderName,
uri::{self, Authority},
};
use linkerd_stack::{layer, NewService, Param};
use std::{
fmt,
Expand All @@ -23,6 +26,27 @@ pub struct OverrideAuthority<S, H> {
inner: S,
}

/// Sets the [`Authority`] of the given URI.
pub fn set_authority(uri: &mut uri::Uri, auth: Authority) {
let mut parts = uri::Parts::from(std::mem::take(uri));

parts.authority = Some(auth);

// If this was an origin-form target (path only),
// then we can't *only* set the authority, as that's
// an illegal target (such as `example.com/docs`).
//
// But don't set a scheme if this was authority-form (CONNECT),
// since that would change its meaning (like `https://example.com`).
if parts.path_and_query.is_some() {
parts.scheme = Some(http::uri::Scheme::HTTP);
}

let new = http::uri::Uri::from_parts(parts).expect("absolute uri");

*uri = new;
}

// === impl NewOverrideAuthority ===

impl<H: Clone, N> NewOverrideAuthority<H, N> {
Expand Down
20 changes: 20 additions & 0 deletions linkerd/http/retain/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
[package]
name = "linkerd-http-retain"
version = "0.1.0"
authors = ["Linkerd Developers <[email protected]>"]
license = "Apache-2.0"
edition = "2021"
publish = false
description = """
Tower middleware to manage service lifecycles.

This is mostly intended to support cache eviction.
"""

[dependencies]
http = "0.2"
http-body = "0.4"
pin-project = "1"
tower = { version = "0.4", default-features = false }

linkerd-stack = { path = "../../stack" }
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
//! Provides a middleware that holds an inner service as long as responses are
//! being processed.
//! Tower middleware to manage service lifecycles.
//!
//! Provides a [`Retain<S, B>`] middleware that holds an inner service as long as responses are
//! being processed. This is mostly intended to support cache eviction.

use linkerd_stack::layer;
use pin_project::pin_project;
Expand Down
23 changes: 23 additions & 0 deletions linkerd/http/stream-timeouts/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
[package]
name = "linkerd-http-stream-timeouts"
version = "0.1.0"
authors = ["Linkerd Developers <[email protected]>"]
license = "Apache-2.0"
edition = "2021"
publish = false
description = """
Tower middleware to express deadlines on streams.
"""

[dependencies]
futures = { version = "0.3", default-features = false }
http = "0.2"
http-body = "0.4"
parking_lot = "0.12"
pin-project = "1"
thiserror = "1"
tokio = { version = "1", default-features = false }
tracing = "0.1"

linkerd-error = { path = "../../error" }
linkerd-stack = { path = "../../stack" }
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
//! Tower middleware to express deadlines on streams.
//!
//! See [`EnforceTimeouts<S>`].

use futures::FutureExt;
use linkerd_error::{Error, Result};
use linkerd_stack as svc;
Expand Down Expand Up @@ -345,9 +349,9 @@ where

// === impl RequestBody ===

impl<B> crate::HttpBody for RequestBody<B>
impl<B> http_body::Body for RequestBody<B>
where
B: crate::HttpBody<Error = Error>,
B: http_body::Body<Error = Error>,
{
type Data = B::Data;
type Error = Error;
Expand Down Expand Up @@ -405,9 +409,9 @@ where

// === impl ResponseBody ===

impl<B> crate::HttpBody for ResponseBody<B>
impl<B> http_body::Body for ResponseBody<B>
where
B: crate::HttpBody<Error = Error>,
B: http_body::Body<Error = Error>,
{
type Data = B::Data;
type Error = Error;
Expand Down
29 changes: 29 additions & 0 deletions linkerd/http/upgrade/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
[package]
name = "linkerd-http-upgrade"
version = "0.1.0"
authors = ["Linkerd Developers <[email protected]>"]
license = "Apache-2.0"
edition = "2021"
publish = false
description = """
Facilities for HTTP/1 upgrades.
"""

[dependencies]
bytes = "1"
drain = "0.1"
futures = { version = "0.3", default-features = false }
http = "0.2"
http-body = "0.4"
hyper = { version = "0.14", default-features = false, features = ["client"] }
pin-project = "1"
tokio = { version = "1", default-features = false }
tower = { version = "0.4", default-features = false }
tracing = "0.1"
try-lock = "0.2"

linkerd-duplex = { path = "../../duplex" }
linkerd-error = { path = "../../error" }
linkerd-http-version = { path = "../version" }
linkerd-io = { path = "../../io" }
linkerd-stack = { path = "../../stack" }
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ pub struct HyperConnectFuture<F> {
inner: F,
absolute_form: bool,
}

// === impl UpgradeBody ===

impl HttpBody for UpgradeBody {
Expand Down Expand Up @@ -107,7 +108,7 @@ impl From<hyper::Body> for UpgradeBody {
}

impl UpgradeBody {
pub(crate) fn new(
pub fn new(
body: hyper::Body,
upgrade: Option<(Http11Upgrade, hyper::upgrade::OnUpgrade)>,
) -> Self {
Expand All @@ -129,7 +130,7 @@ impl PinnedDrop for UpgradeBody {
// === impl HyperConnect ===

impl<C, T> HyperConnect<C, T> {
pub(super) fn new(connect: C, target: T, absolute_form: bool) -> Self {
pub fn new(connect: C, target: T, absolute_form: bool) -> Self {
HyperConnect {
connect,
target,
Expand All @@ -140,7 +141,7 @@ impl<C, T> HyperConnect<C, T> {

impl<C, T> Service<hyper::Uri> for HyperConnect<C, T>
where
C: MakeConnection<(crate::Version, T)> + Clone + Send + Sync,
C: MakeConnection<(linkerd_http_version::Version, T)> + Clone + Send + Sync,
C::Connection: Unpin + Send,
C::Future: Unpin + Send + 'static,
T: Clone + Send + Sync,
Expand All @@ -157,7 +158,7 @@ where
HyperConnectFuture {
inner: self
.connect
.connect((crate::Version::Http1, self.target.clone())),
.connect((linkerd_http_version::Version::Http1, self.target.clone())),
absolute_form: self.absolute_form,
}
}
Expand Down
Loading
Loading