Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions crates/client-api/src/routes/subscribe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use spacetimedb::client::{ClientActorId, ClientClosed, ClientConnection, DataMes
use spacetimedb::util::future_queue;
use spacetimedb_lib::address::AddressForUrl;
use spacetimedb_lib::Address;
use std::time::Instant;
use tokio::sync::mpsc;

use crate::auth::{SpacetimeAuthHeader, SpacetimeIdentity, SpacetimeIdentityToken};
Expand Down Expand Up @@ -173,7 +174,7 @@ async fn ws_client_actor(client: ClientConnection, mut ws: WebSocketStream, mut
// Build a queue of incoming messages to handle,
// to be processed one at a time, in the order they're received.
// TODO: do we want this to have a fixed capacity? or should it be unbounded
let mut handle_queue = pin!(future_queue(|message| client.handle_message(message)));
let mut handle_queue = pin!(future_queue(|(message, timer)| client.handle_message(message, timer)));

let mut closed = false;
loop {
Expand Down Expand Up @@ -250,7 +251,10 @@ async fn ws_client_actor(client: ClientConnection, mut ws: WebSocketStream, mut
// and `Item::Message` comes from exactly one distinct `select!` branch.
// Consider merging this `match` with the previous `select!`.
match message {
Item::Message(ClientMessage::Message(message)) => handle_queue.as_mut().push(message),
Item::Message(ClientMessage::Message(message)) => {
let timer = Instant::now();
handle_queue.as_mut().push((message, timer))
}
Item::HandleResult(res) => {
if let Err(e) = res {
if let MessageHandleError::Execution(err) = e {
Expand Down
4 changes: 3 additions & 1 deletion crates/core/src/client/client_connection.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use std::ops::Deref;
use std::time::Instant;

use crate::error::DBError;
use crate::host::{ModuleHost, ReducerArgs, ReducerCallError, ReducerCallResult};
Expand Down Expand Up @@ -146,8 +147,9 @@ impl ClientConnection {
pub fn handle_message(
&self,
message: impl Into<DataMessage>,
timer: Instant,
) -> impl Future<Output = Result<(), MessageHandleError>> + '_ {
message_handlers::handle(self, message.into())
message_handlers::handle(self, message.into(), timer)
}

pub async fn call_reducer(&self, reducer: &str, args: ReducerArgs) -> Result<ReducerCallResult, ReducerCallError> {
Expand Down
45 changes: 34 additions & 11 deletions crates/core/src/client/message_handlers.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use std::time::Duration;
use std::time::{Duration, Instant};

use crate::db::db_metrics::DB_METRICS;
use crate::energy::EnergyQuanta;
use crate::execution_context::WorkloadType;
use crate::host::module_host::{EventStatus, ModuleEvent, ModuleFunctionCall};
use crate::host::{ReducerArgs, Timestamp};
use crate::identity::Identity;
Expand Down Expand Up @@ -30,7 +32,7 @@ pub enum MessageHandleError {
Execution(#[from] MessageExecutionError),
}

pub async fn handle(client: &ClientConnection, message: DataMessage) -> Result<(), MessageHandleError> {
pub async fn handle(client: &ClientConnection, message: DataMessage, timer: Instant) -> Result<(), MessageHandleError> {
let message_kind = match message {
DataMessage::Text(_) => "text",
DataMessage::Binary(_) => "binary",
Expand All @@ -47,12 +49,16 @@ pub async fn handle(client: &ClientConnection, message: DataMessage) -> Result<(
.inc();

match message {
DataMessage::Text(message) => handle_text(client, message).await,
DataMessage::Binary(message_buf) => handle_binary(client, message_buf).await,
DataMessage::Text(message) => handle_text(client, message, timer).await,
DataMessage::Binary(message_buf) => handle_binary(client, message_buf, timer).await,
}
}

async fn handle_binary(client: &ClientConnection, message_buf: Vec<u8>) -> Result<(), MessageHandleError> {
async fn handle_binary(
client: &ClientConnection,
message_buf: Vec<u8>,
timer: Instant,
) -> Result<(), MessageHandleError> {
let message = Message::decode(Bytes::from(message_buf))?;
let message = match message.r#type {
Some(message::Type::FunctionCall(FunctionCall { ref reducer, arg_bytes })) => {
Expand All @@ -67,7 +73,7 @@ async fn handle_binary(client: &ClientConnection, message_buf: Vec<u8>) -> Resul
_ => return Err(MessageHandleError::InvalidMessage),
};

message.handle(client).await?;
message.handle(client, timer).await?;

Ok(())
}
Expand All @@ -93,7 +99,7 @@ enum RawJsonMessage<'a> {
},
}

async fn handle_text(client: &ClientConnection, message: String) -> Result<(), MessageHandleError> {
async fn handle_text(client: &ClientConnection, message: String, timer: Instant) -> Result<(), MessageHandleError> {
let message = ByteString::from(message);
let msg = serde_json::from_str::<RawJsonMessage>(&message)?;
let mut message_id_ = Vec::new();
Expand All @@ -118,7 +124,7 @@ async fn handle_text(client: &ClientConnection, message: String) -> Result<(), M
}
};

msg.handle(client).await?;
msg.handle(client, timer).await?;

Ok(())
}
Expand All @@ -136,19 +142,36 @@ enum DecodedMessage<'a> {
}

impl DecodedMessage<'_> {
async fn handle(self, client: &ClientConnection) -> Result<(), MessageExecutionError> {
async fn handle(self, client: &ClientConnection, timer: Instant) -> Result<(), MessageExecutionError> {
let address = client.module.info().address;
let res = match self {
DecodedMessage::Call { reducer, args } => {
let res = client.call_reducer(reducer, args).await;
DB_METRICS
.request_round_trip
.with_label_values(&WorkloadType::Reducer, &address, reducer)
.observe(timer.elapsed().as_secs_f64());
res.map(drop).map_err(|e| (Some(reducer), e.into()))
}
DecodedMessage::Subscribe(subscription) => {
client.subscribe(subscription).await.map_err(|e| (None, e.into()))
let res = client.subscribe(subscription).await;
DB_METRICS
.request_round_trip
.with_label_values(&WorkloadType::Subscribe, &address, "")
.observe(timer.elapsed().as_secs_f64());
res.map_err(|e| (None, e.into()))
}
DecodedMessage::OneOffQuery {
query_string: query,
message_id,
} => client.one_off_query(query, message_id).await.map_err(|err| (None, err)),
} => {
let res = client.one_off_query(query, message_id).await;
DB_METRICS
.request_round_trip
.with_label_values(&WorkloadType::Sql, &address, "")
.observe(timer.elapsed().as_secs_f64());
res.map_err(|err| (None, err))
}
};
res.map_err(|(reducer, err)| MessageExecutionError {
reducer: reducer.map(str::to_owned),
Expand Down
5 changes: 5 additions & 0 deletions crates/core/src/db/db_metrics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,11 @@ metrics_group!(
#[help = "For a given module, the size of its log file (in bytes)"]
#[labels(db: Address)]
pub module_log_file_size: IntGaugeVec,

#[name = spacetime_request_round_time]
#[help = "The total time it takes for request to complete"]
#[labels(txn_type: WorkloadType, database_address: Address, reducer_symbol: str)]
pub request_round_trip: HistogramVec,
}
);

Expand Down
4 changes: 3 additions & 1 deletion crates/testing/src/modules.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use std::future::Future;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::sync::OnceLock;
use std::time::Instant;

use tokio::runtime::{Builder, Runtime};

Expand Down Expand Up @@ -64,7 +65,8 @@ impl ModuleHandle {
}

pub async fn send(&self, message: impl Into<DataMessage>) -> anyhow::Result<()> {
self.client.handle_message(message).await.map_err(Into::into)
let timer = Instant::now();
self.client.handle_message(message, timer).await.map_err(Into::into)
}

pub async fn read_log(&self, size: Option<u32>) -> String {
Expand Down