Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
224 changes: 162 additions & 62 deletions Cargo.lock

Large diffs are not rendered by default.

13 changes: 8 additions & 5 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,8 @@ ahash = "0.8.3"
anyhow = { version = "1.0.68", features = ["backtrace"] }
anymap = "0.12"
async-trait = "0.1.68"
axum = "0.6"
axum = { version = "0.7", features = ["tracing"] }
axum-extra = { version = "0.9", features = ["typed-header"] }
arrayvec = "0.7.2"
backtrace = "0.3.66"
base64 = "0.21.2"
Expand Down Expand Up @@ -99,12 +100,14 @@ futures = "0.3"
futures-channel = "0.3"
getrandom = { version = "0.2.7", features = ["custom"] }
glob = "0.3.1"
headers = "0.4"
hex = "0.4.3"
hostname = "^0.3"
home = "0.5"
http = "0.2.9"
http = "1.0"
humantime = "2.1.0"
hyper = "0.14.18"
hyper = "1.0"
hyper-util = { version = "0.1", features = ["tokio"] }
im = "15.1"
imara-diff = "0.1.3"
indexmap = "2.0.0"
Expand Down Expand Up @@ -167,9 +170,9 @@ thiserror = "1.0.37"
tokio = { version = "1.25.1", features = ["full"] }
tokio-util = { version = "0.7.4", features = ["time"] }
tokio-postgres = { version = "0.7.8", features = ["with-chrono-0_4"] }
tokio-tungstenite = { version = "0.20", features = ["native-tls"] }
tokio-tungstenite = { version = "0.21", features = ["native-tls"] }
toml = "0.8"
tower-http = { version = "0.4.1", features = ["cors"] }
tower-http = { version = "0.5", features = ["cors"] }
tracing = { version = "0.1.37", features = ["release_max_level_off"] }
tracing-appender = "0.2.2"
tracing-core = "0.1.31"
Expand Down
9 changes: 6 additions & 3 deletions crates/client-api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,12 @@ tempfile.workspace = true
async-trait = "0.1.60"
chrono = { version = "0.4.23", features = ["serde"]}
rand = "0.8.5"
axum = { version = "0.6.16", features = ["headers", "tracing"] }
hyper = "0.14"
http = "0.2"
axum.workspace = true
axum-extra.workspace = true
hyper.workspace = true
hyper-util.workspace = true
http.workspace = true
headers.workspace = true
mime = "0.3.17"
tokio-stream = { version = "0.1.12", features = ["sync"] }
futures = "0.3"
Expand Down
10 changes: 4 additions & 6 deletions crates/client-api/src/auth.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
use std::fmt::Write;
use std::time::Duration;

use axum::extract::rejection::{TypedHeaderRejection, TypedHeaderRejectionReason};
use axum::extract::Query;
use axum::headers::authorization::Credentials;
use axum::headers::{self, authorization};
use axum::response::IntoResponse;
use axum::TypedHeader;
use axum_extra::typed_header::{TypedHeader, TypedHeaderRejection, TypedHeaderRejectionReason};
use bytes::BytesMut;
use headers::authorization::{self, Credentials};
use http::{request, HeaderValue, StatusCode};
use serde::Deserialize;
use spacetimedb::auth::identity::{
Expand Down Expand Up @@ -79,10 +77,10 @@ impl<S: NodeDelegate + Send + Sync> axum::extract::FromRequestParts<S> for Space
type Rejection = AuthorizationRejection;
async fn from_request_parts(parts: &mut request::Parts, state: &S) -> Result<Self, Self::Rejection> {
match (
axum::TypedHeader::from_request_parts(parts, state).await,
TypedHeader::from_request_parts(parts, state).await,
Query::<TokenQueryParam>::from_request_parts(parts, state).await,
) {
(Ok(axum::TypedHeader(headers::Authorization(creds @ SpacetimeCreds { .. }))), _) => {
(Ok(TypedHeader(headers::Authorization(creds @ SpacetimeCreds { .. }))), _) => {
let claims = creds
.decode_token(state.public_key())
.map_err(|e| AuthorizationRejection {
Expand Down
8 changes: 4 additions & 4 deletions crates/client-api/src/routes/database.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use axum::body::Bytes;
use axum::body::{Body, Bytes};
use axum::extract::{DefaultBodyLimit, Path, Query, State};
use axum::response::{ErrorResponse, IntoResponse};
use axum::{headers, TypedHeader};
use axum_extra::TypedHeader;
use chrono::Utc;
use futures::StreamExt;
use http::StatusCode;
Expand Down Expand Up @@ -487,9 +487,9 @@ where
.chain(stream)
.map(Ok::<_, std::convert::Infallible>);

axum::body::boxed(axum::body::StreamBody::new(stream))
Body::from_stream(stream)
} else {
axum::body::boxed(axum::body::Full::from(lines))
Body::from(lines)
};

Ok((
Expand Down
2 changes: 1 addition & 1 deletion crates/client-api/src/routes/subscribe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::time::Duration;

use axum::extract::{Path, Query, State};
use axum::response::IntoResponse;
use axum::TypedHeader;
use axum_extra::TypedHeader;
use futures::{SinkExt, StreamExt};
use http::{HeaderValue, StatusCode};
use serde::Deserialize;
Expand Down
17 changes: 5 additions & 12 deletions crates/client-api/src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,11 @@ pub mod websocket;
use core::fmt;
use std::net::IpAddr;

use axum::body::{Bytes, HttpBody};
use axum::extract::FromRequest;
use axum::headers;
use axum::body::Bytes;
use axum::extract::{FromRequest, Request};
use axum::response::IntoResponse;
use bytestring::ByteString;
use http::{HeaderName, HeaderValue, Request, StatusCode};
use http::{HeaderName, HeaderValue, StatusCode};

use spacetimedb::address::Address;
use spacetimedb_lib::address::AddressForUrl;
Expand All @@ -21,16 +20,10 @@ use crate::{log_and_500, ControlStateReadAccess};
pub struct ByteStringBody(pub ByteString);

#[async_trait::async_trait]
impl<S, B> FromRequest<S, B> for ByteStringBody
where
B: HttpBody + Send + 'static,
B::Data: Send,
B::Error: Into<axum::BoxError>,
S: Send + Sync,
{
impl<S: Send + Sync> FromRequest<S> for ByteStringBody {
type Rejection = axum::response::Response;

async fn from_request(req: Request<B>, state: &S) -> Result<Self, Self::Rejection> {
async fn from_request(req: Request, state: &S) -> Result<Self, Self::Rejection> {
let bytes = Bytes::from_request(req, state)
.await
.map_err(IntoResponse::into_response)?;
Expand Down
12 changes: 6 additions & 6 deletions crates/client-api/src/util/websocket.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,19 @@
//! A more flexible version of axum::extract::ws. This could probably get pulled out into its own crate at some point.

use axum::extract::FromRequestParts;
use axum::headers::{
self, Connection, HeaderMapExt, SecWebsocketAccept, SecWebsocketKey, SecWebsocketVersion, Upgrade,
};
use axum::response::{IntoResponse, Response};
use axum::TypedHeader;
use axum_extra::TypedHeader;
use headers::{Connection, HeaderMapExt, SecWebsocketAccept, SecWebsocketKey, SecWebsocketVersion, Upgrade};
use http::{HeaderName, HeaderValue, Method, StatusCode};
use hyper::upgrade::{OnUpgrade, Upgraded};
use hyper_util::rt::TokioIo;

use super::flat_csv::FlatCsv;

pub use tokio_tungstenite::tungstenite;
pub use tungstenite::protocol::{frame::coding::CloseCode, CloseFrame, Message, WebSocketConfig};
pub type WebSocketStream = tokio_tungstenite::WebSocketStream<Upgraded>;

pub type WebSocketStream = tokio_tungstenite::WebSocketStream<TokioIo<Upgraded>>;

pub struct RequestSecWebsocketProtocol(FlatCsv);

Expand Down Expand Up @@ -182,7 +182,7 @@ pub struct PendingWebSocket(OnUpgrade);
impl PendingWebSocket {
#[inline]
pub async fn upgrade(self, config: WebSocketConfig) -> hyper::Result<WebSocketStream> {
let stream = self.0.await?;
let stream = TokioIo::new(self.0.await?);
Ok(WebSocketStream::from_raw_socket(stream, tungstenite::protocol::Role::Server, Some(config)).await)
}

Expand Down
8 changes: 4 additions & 4 deletions crates/standalone/src/subcommands/start.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use clap::{Arg, ArgMatches};
use spacetimedb::config::{FilesGlobal, FilesLocal, SpacetimeDbFiles};
use spacetimedb::db::{Config, FsyncPolicy, Storage};
use spacetimedb::startup;
use std::net::TcpListener;
use tokio::net::TcpListener;

#[cfg(feature = "string")]
impl From<std::string::String> for OsStr {
Expand Down Expand Up @@ -226,11 +226,11 @@ pub async fn exec(args: &ArgMatches) -> anyhow::Result<()> {

let ctx = spacetimedb_client_api::ArcEnv(StandaloneEnv::init(config).await?);

let service = router().with_state(ctx).into_make_service();
let service = router().with_state(ctx);

let tcp = TcpListener::bind(listen_addr).unwrap();
let tcp = TcpListener::bind(listen_addr).await?;
log::debug!("Starting SpacetimeDB listening on {}", tcp.local_addr().unwrap());
axum::Server::from_tcp(tcp)?.serve(service).await?;
axum::serve(tcp, service).await?;
Ok(())
}

Expand Down