Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions crates/client-api/src/routes/subscribe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use spacetimedb::client::{ClientActorId, ClientClosed, ClientConnection, DataMes
use spacetimedb::util::future_queue;
use spacetimedb_lib::address::AddressForUrl;
use spacetimedb_lib::Address;
use std::time::Instant;
use tokio::sync::mpsc;

use crate::auth::{SpacetimeAuthHeader, SpacetimeIdentity, SpacetimeIdentityToken};
Expand Down Expand Up @@ -173,7 +174,7 @@ async fn ws_client_actor(client: ClientConnection, mut ws: WebSocketStream, mut
// Build a queue of incoming messages to handle,
// to be processed one at a time, in the order they're received.
// TODO: do we want this to have a fixed capacity? or should it be unbounded
let mut handle_queue = pin!(future_queue(|message| client.handle_message(message)));
let mut handle_queue = pin!(future_queue(|(message, timer)| client.handle_message(message, timer)));

let mut closed = false;
loop {
Expand Down Expand Up @@ -250,7 +251,10 @@ async fn ws_client_actor(client: ClientConnection, mut ws: WebSocketStream, mut
// and `Item::Message` comes from exactly one distinct `select!` branch.
// Consider merging this `match` with the previous `select!`.
match message {
Item::Message(ClientMessage::Message(message)) => handle_queue.as_mut().push(message),
Item::Message(ClientMessage::Message(message)) => {
let timer = Instant::now();
handle_queue.as_mut().push((message, timer))
}
Item::HandleResult(res) => {
if let Err(e) = res {
if let MessageHandleError::Execution(err) = e {
Expand Down
4 changes: 3 additions & 1 deletion crates/core/src/client/client_connection.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use std::ops::Deref;
use std::time::Instant;

use crate::error::DBError;
use crate::host::{ModuleHost, ReducerArgs, ReducerCallError, ReducerCallResult};
Expand Down Expand Up @@ -146,8 +147,9 @@ impl ClientConnection {
pub fn handle_message(
&self,
message: impl Into<DataMessage>,
timer: Instant,
) -> impl Future<Output = Result<(), MessageHandleError>> + '_ {
message_handlers::handle(self, message.into())
message_handlers::handle(self, message.into(), timer)
}

pub async fn call_reducer(&self, reducer: &str, args: ReducerArgs) -> Result<ReducerCallResult, ReducerCallError> {
Expand Down
44 changes: 33 additions & 11 deletions crates/core/src/client/message_handlers.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::time::Duration;
use std::time::{Duration, Instant};

use crate::energy::EnergyQuanta;
use crate::execution_context::WorkloadType;
use crate::host::module_host::{EventStatus, ModuleEvent, ModuleFunctionCall};
use crate::host::{ReducerArgs, Timestamp};
use crate::identity::Identity;
Expand Down Expand Up @@ -30,7 +31,7 @@ pub enum MessageHandleError {
Execution(#[from] MessageExecutionError),
}

pub async fn handle(client: &ClientConnection, message: DataMessage) -> Result<(), MessageHandleError> {
pub async fn handle(client: &ClientConnection, message: DataMessage, timer: Instant) -> Result<(), MessageHandleError> {
let message_kind = match message {
DataMessage::Text(_) => "text",
DataMessage::Binary(_) => "binary",
Expand All @@ -47,12 +48,16 @@ pub async fn handle(client: &ClientConnection, message: DataMessage) -> Result<(
.inc();

match message {
DataMessage::Text(message) => handle_text(client, message).await,
DataMessage::Binary(message_buf) => handle_binary(client, message_buf).await,
DataMessage::Text(message) => handle_text(client, message, timer).await,
DataMessage::Binary(message_buf) => handle_binary(client, message_buf, timer).await,
}
}

async fn handle_binary(client: &ClientConnection, message_buf: Vec<u8>) -> Result<(), MessageHandleError> {
async fn handle_binary(
client: &ClientConnection,
message_buf: Vec<u8>,
timer: Instant,
) -> Result<(), MessageHandleError> {
let message = Message::decode(Bytes::from(message_buf))?;
let message = match message.r#type {
Some(message::Type::FunctionCall(FunctionCall { ref reducer, arg_bytes })) => {
Expand All @@ -67,7 +72,7 @@ async fn handle_binary(client: &ClientConnection, message_buf: Vec<u8>) -> Resul
_ => return Err(MessageHandleError::InvalidMessage),
};

message.handle(client).await?;
message.handle(client, timer).await?;

Ok(())
}
Expand All @@ -93,7 +98,7 @@ enum RawJsonMessage<'a> {
},
}

async fn handle_text(client: &ClientConnection, message: String) -> Result<(), MessageHandleError> {
async fn handle_text(client: &ClientConnection, message: String, timer: Instant) -> Result<(), MessageHandleError> {
let message = ByteString::from(message);
let msg = serde_json::from_str::<RawJsonMessage>(&message)?;
let mut message_id_ = Vec::new();
Expand All @@ -118,7 +123,7 @@ async fn handle_text(client: &ClientConnection, message: String) -> Result<(), M
}
};

msg.handle(client).await?;
msg.handle(client, timer).await?;

Ok(())
}
Expand All @@ -136,19 +141,36 @@ enum DecodedMessage<'a> {
}

impl DecodedMessage<'_> {
async fn handle(self, client: &ClientConnection) -> Result<(), MessageExecutionError> {
async fn handle(self, client: &ClientConnection, timer: Instant) -> Result<(), MessageExecutionError> {
let address = client.module.info().address;
let res = match self {
DecodedMessage::Call { reducer, args } => {
let res = client.call_reducer(reducer, args).await;
WORKER_METRICS
.request_round_trip
.with_label_values(&WorkloadType::Reducer, &address, reducer)
.observe(timer.elapsed().as_secs_f64());
res.map(drop).map_err(|e| (Some(reducer), e.into()))
}
DecodedMessage::Subscribe(subscription) => {
client.subscribe(subscription).await.map_err(|e| (None, e.into()))
let res = client.subscribe(subscription).await;
WORKER_METRICS
.request_round_trip
.with_label_values(&WorkloadType::Subscribe, &address, "")
.observe(timer.elapsed().as_secs_f64());
res.map_err(|e| (None, e.into()))
}
DecodedMessage::OneOffQuery {
query_string: query,
message_id,
} => client.one_off_query(query, message_id).await.map_err(|err| (None, err)),
} => {
let res = client.one_off_query(query, message_id).await;
WORKER_METRICS
.request_round_trip
.with_label_values(&WorkloadType::Sql, &address, "")
.observe(timer.elapsed().as_secs_f64());
res.map_err(|err| (None, err))
}
};
res.map_err(|(reducer, err)| MessageExecutionError {
reducer: reducer.map(str::to_owned),
Expand Down
6 changes: 6 additions & 0 deletions crates/core/src/worker_metrics/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::{collections::HashMap, sync::Mutex};

use crate::execution_context::WorkloadType;
use crate::hash::Hash;
use once_cell::sync::Lazy;
use prometheus::{GaugeVec, HistogramVec, IntCounterVec, IntGaugeVec};
Expand Down Expand Up @@ -91,6 +92,11 @@ metrics_group!(
#[help = "The number of active subscription queries"]
#[labels(database_address: Address)]
pub subscription_queries: IntGaugeVec,

#[name = spacetime_request_round_trip_time]
#[help = "The total time it takes for request to complete"]
#[labels(txn_type: WorkloadType, database_address: Address, reducer_symbol: str)]
pub request_round_trip: HistogramVec,
}
);

Expand Down
4 changes: 3 additions & 1 deletion crates/testing/src/modules.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use std::future::Future;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::sync::OnceLock;
use std::time::Instant;

use tokio::runtime::{Builder, Runtime};

Expand Down Expand Up @@ -64,7 +65,8 @@ impl ModuleHandle {
}

pub async fn send(&self, message: impl Into<DataMessage>) -> anyhow::Result<()> {
self.client.handle_message(message).await.map_err(Into::into)
let timer = Instant::now();
self.client.handle_message(message, timer).await.map_err(Into::into)
}

pub async fn read_log(&self, size: Option<u32>) -> String {
Expand Down