Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 12 additions & 28 deletions crates/core/src/host/module_host.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,8 @@ use crate::identity::Identity;
use crate::json::client_api::{SubscriptionUpdateJson, TableRowOperationJson, TableUpdateJson};
use crate::protobuf::client_api::{table_row_operation, SubscriptionUpdate, TableRowOperation, TableUpdate};
use crate::subscription::module_subscription_actor::ModuleSubscriptionManager;
use crate::util::lending_pool::{Closed, LendingPool, LentResource, PoolClosed, WaiterGauge};
use crate::util::lending_pool::{Closed, LendingPool, LentResource, PoolClosed};
use crate::util::notify_once::NotifyOnce;
use crate::util::prometheus_handle::{GaugeInc, IntGaugeExt};
use crate::worker_metrics::WORKER_METRICS;
use base64::{engine::general_purpose::STANDARD as BASE_64_STD, Engine as _};
use futures::{Future, FutureExt};
use indexmap::IndexMap;
Expand Down Expand Up @@ -336,7 +334,7 @@ impl fmt::Debug for ModuleHost {
trait DynModuleHost: Send + Sync + 'static {
async fn get_instance(
&self,
waiter_gauge_context: <InstancePoolGauge as WaiterGauge>::Context<'_>,
ctx: (Identity, Hash, Address, &str),
) -> Result<(&HostThreadpool, Box<dyn ModuleInstance>), NoSuchModule>;
fn inject_logs(&self, log_level: LogLevel, message: &str);
fn one_off_query(
Expand All @@ -353,7 +351,7 @@ trait DynModuleHost: Send + Sync + 'static {
struct HostControllerActor<T: Module> {
module: Arc<T>,
threadpool: Arc<HostThreadpool>,
instance_pool: LendingPool<T::Instance, InstancePoolGauge>,
instance_pool: LendingPool<T::Instance>,
start: NotifyOnce,
}

Expand Down Expand Up @@ -381,38 +379,24 @@ async fn select_first<A: Future, B: Future<Output = ()>>(fut_a: A, fut_b: B) ->
}
}

#[derive(Clone)]
struct InstancePoolGauge;

impl WaiterGauge for InstancePoolGauge {
type Context<'a> = &'a (&'a Identity, &'a Hash, &'a Address, &'a str);
type IncGuard = GaugeInc;
fn inc(&self, &(identity, module_hash, database_address, reducer_symbol): Self::Context<'_>) -> Self::IncGuard {
WORKER_METRICS
.instance_queue_length
.with_label_values(identity, module_hash, database_address, reducer_symbol)
.inc_scope()
}
}

#[async_trait::async_trait]
impl<T: Module> DynModuleHost for HostControllerActor<T> {
async fn get_instance(
&self,
waiter_gauge_context: <InstancePoolGauge as WaiterGauge>::Context<'_>,
ctx: (Identity, Hash, Address, &str),
) -> Result<(&HostThreadpool, Box<dyn ModuleInstance>), NoSuchModule> {
self.start.notified().await;
// in the future we should do something like in the else branch here -- add more instances based on load.
// we need to do write-skew retries first - right now there's only ever once instance per module.
let inst = if true {
self.instance_pool
.request_with_context(waiter_gauge_context)
.request_with_context(ctx)
.await
.map_err(|_| NoSuchModule)?
} else {
const GET_INSTANCE_TIMEOUT: Duration = Duration::from_millis(500);
select_first(
self.instance_pool.request_with_context(waiter_gauge_context),
self.instance_pool.request_with_context(ctx),
tokio::time::sleep(GET_INSTANCE_TIMEOUT).map(|()| self.spinup_new_instance()),
)
.await
Expand Down Expand Up @@ -500,7 +484,7 @@ pub enum InitDatabaseError {
impl ModuleHost {
pub fn new(threadpool: Arc<HostThreadpool>, mut module: impl Module) -> Self {
let info = module.info();
let instance_pool = LendingPool::with_gauge(InstancePoolGauge);
let instance_pool = LendingPool::new();
instance_pool.add_multiple(module.initial_instances()).unwrap();
let inner = Arc::new(HostControllerActor {
module: Arc::new(module),
Expand Down Expand Up @@ -530,13 +514,13 @@ impl ModuleHost {
F: FnOnce(&mut dyn ModuleInstance) -> R + Send + 'static,
R: Send + 'static,
{
let waiter_gauge_context = (
&self.info.identity,
&self.info.module_hash,
&self.info.address,
let context = (
self.info.identity,
self.info.module_hash,
self.info.address,
reducer_name,
);
let (threadpool, mut inst) = self.inner.get_instance(&waiter_gauge_context).await?;
let (threadpool, mut inst) = self.inner.get_instance(context).await?;

let (tx, rx) = oneshot::channel();
threadpool.spawn(move || {
Expand Down
124 changes: 40 additions & 84 deletions crates/core/src/util/lending_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,31 +9,28 @@ use std::sync::Arc;
use std::task::{Context, Poll};

use parking_lot::Mutex;
use prometheus::IntGauge;
use spacetimedb_lib::{Address, Hash, Identity};
use tokio::sync::{OwnedSemaphorePermit, Semaphore};

use crate::util::prometheus_handle::IntGaugeExt;
use crate::worker_metrics::WORKER_METRICS;

use super::notify_once::{NotifiedOnce, NotifyOnce};
use super::prometheus_handle::GaugeInc;

pub struct LendingPool<T, G = ()> {
pub struct LendingPool<T> {
sem: Arc<Semaphore>,
waiter_gauge: G,
inner: Arc<LendingPoolInner<T>>,
}

impl<T, G: WaiterGauge + Default> Default for LendingPool<T, G> {
impl<T> Default for LendingPool<T> {
fn default() -> Self {
Self::new()
}
}

impl<T, G: Clone> Clone for LendingPool<T, G> {
impl<T> Clone for LendingPool<T> {
fn clone(&self) -> Self {
Self {
sem: self.sem.clone(),
waiter_gauge: self.waiter_gauge.clone(),
inner: self.inner.clone(),
}
}
Expand All @@ -52,50 +49,38 @@ struct PoolVec<T> {
#[derive(Debug)]
pub struct PoolClosed;

impl<T, G: WaiterGauge> LendingPool<T, G> {
pub fn new() -> Self
where
G: Default,
{
Self::with_gauge(G::default())
}

pub fn with_gauge(waiter_gauge: G) -> Self {
Self::from_iter_with_gauge(std::iter::empty(), waiter_gauge)
}

pub fn from_iter_with_gauge<I: IntoIterator<Item = T>>(iter: I, waiter_gauge: G) -> Self {
let deque = VecDeque::from_iter(iter);
Self {
sem: Arc::new(Semaphore::new(deque.len())),
waiter_gauge,
inner: Arc::new(LendingPoolInner {
closed_notify: NotifyOnce::new(),
vec: Mutex::new(PoolVec {
total_count: deque.len(),
deque: Some(deque),
}),
}),
}
}

pub fn request(&self) -> impl Future<Output = Result<LentResource<T>, PoolClosed>>
where
G: for<'a> WaiterGauge<Context<'a> = ()>,
{
self.request_with_context(())
impl<T> LendingPool<T> {
pub fn new() -> Self {
Self::from_iter(std::iter::empty())
}

pub fn request_with_context(
&self,
context: G::Context<'_>,
(identity, module_hash, database_address, reducer_symbol): (Identity, Hash, Address, &str),
) -> impl Future<Output = Result<LentResource<T>, PoolClosed>> {
let acq = self.sem.clone().acquire_owned();
let pool_inner = self.inner.clone();
let waiter_guard = self.waiter_gauge.inc(context);

let queue_len = WORKER_METRICS.instance_queue_length.with_label_values(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is fine, I guess. it just makes it less easy to pull out/appraise as its own data structure. I might take a look at making this much simpler while still keeping LendingPool generic.

&identity,
&module_hash,
&database_address,
reducer_symbol,
);
let queue_len_histogram = WORKER_METRICS.instance_queue_length_histogram.with_label_values(
&identity,
&module_hash,
&database_address,
reducer_symbol,
);

queue_len.inc();
queue_len_histogram.observe(queue_len.get() as f64);

async move {
let permit = acq.await.map_err(|_| PoolClosed)?;
drop(waiter_guard);
queue_len.dec();
queue_len_histogram.observe(queue_len.get() as f64);
let resource = pool_inner
.vec
.lock()
Expand Down Expand Up @@ -154,9 +139,19 @@ impl<T, G: WaiterGauge> LendingPool<T, G> {
}
}

impl<T, G: WaiterGauge + Default> FromIterator<T> for LendingPool<T, G> {
impl<T> FromIterator<T> for LendingPool<T> {
fn from_iter<I: IntoIterator<Item = T>>(iter: I) -> Self {
Self::from_iter_with_gauge(iter, G::default())
let deque = VecDeque::from_iter(iter);
Self {
sem: Arc::new(Semaphore::new(deque.len())),
inner: Arc::new(LendingPoolInner {
closed_notify: NotifyOnce::new(),
vec: Mutex::new(PoolVec {
total_count: deque.len(),
deque: Some(deque),
}),
}),
}
}
}

Expand Down Expand Up @@ -194,17 +189,6 @@ impl<T> DerefMut for LentResource<T> {
}
}

// impl<T> LentResource<T> {
// fn keep(this: Self) -> T {
// let mut this = ManuallyDrop::new(this);
// let resource = unsafe { ManuallyDrop::take(&mut this.resource) };
// let permit = unsafe { ManuallyDrop::take(&mut this.permit) };
// permit.forget();
// let prev_count = this.pool.total_count.fetch_sub(1, SeqCst);
// resource
// }
// }

impl<T> Drop for LentResource<T> {
fn drop(&mut self) {
let resource = unsafe { ManuallyDrop::take(&mut self.resource) };
Expand All @@ -225,31 +209,3 @@ impl<T> Drop for LentResource<T> {
}
}
}

pub trait WaiterGauge {
type Context<'a>;
type IncGuard;
fn inc(&self, context: Self::Context<'_>) -> Self::IncGuard;
}

impl WaiterGauge for () {
type Context<'a> = ();
type IncGuard = ();
fn inc(&self, (): ()) -> Self::IncGuard {}
}

impl<G: WaiterGauge> WaiterGauge for Arc<G> {
type Context<'a> = G::Context<'a>;
type IncGuard = G::IncGuard;
fn inc(&self, context: Self::Context<'_>) -> Self::IncGuard {
(**self).inc(context)
}
}

impl WaiterGauge for IntGauge {
type Context<'a> = ();
type IncGuard = GaugeInc;
fn inc(&self, (): ()) -> Self::IncGuard {
self.inc_scope()
}
}
2 changes: 1 addition & 1 deletion crates/core/src/util/prometheus_handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ pub struct GaugeInc {
impl Drop for GaugeInc {
#[inline]
fn drop(&mut self) {
self.gauge.dec()
self.gauge.dec();
}
}

Expand Down
39 changes: 37 additions & 2 deletions crates/core/src/util/typed_prometheus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,13 @@ use spacetimedb_lib::{Address, Identity};
#[macro_export]
macro_rules! metrics_group {
($(#[$attr:meta])* $type_vis:vis struct $type_name:ident {
$(#[name = $name:ident] #[help = $help:expr] $(#[labels($($labels:ident: $labelty:ty),*)])? $vis:vis $field:ident: $ty:ident,)*
$(#[name = $name:ident] #[help = $help:expr] $(#[labels($($labels:ident: $labelty:ty),*)])? $(#[buckets($($bucket:literal),*)])? $vis:vis $field:ident: $ty:ident,)*
}) => {
$(#[$attr])*
$type_vis struct $type_name {
$($vis $field: $crate::metrics_group!(@fieldtype $field $ty $(($($labels)*))?),)*
}
$($crate::metrics_group!(@maketype $vis $field $ty $(($($labels: $labelty),*))?);)*
$($crate::metrics_group!(@maketype $vis $field $ty $(($($labels: $labelty),*))? $(($($bucket)*))?);)*
impl $type_name {
#[allow(clippy::new_without_default)]
pub fn new() -> Self {
Expand Down Expand Up @@ -47,6 +47,11 @@ macro_rules! metrics_group {
$crate::metrics_vec!($vis [< $field:camel $ty >]: $ty($($labels)*));
}
};
(@maketype $vis:vis $field:ident $ty:ident ($($labels:tt)*) ($($bucket:literal)*)) => {
$crate::util::typed_prometheus::paste! {
$crate::metrics_histogram_vec!($vis [< $field:camel $ty >]: $ty($($labels)*) ($($bucket)*));
}
};
(@maketype $vis:vis $field:ident $ty:ident) => {};
}
pub use metrics_group;
Expand All @@ -64,6 +69,36 @@ macro_rules! make_collector {
}
pub use make_collector;

#[macro_export]
macro_rules! metrics_histogram_vec {
($vis:vis $name:ident: $vecty:ident($($labels:ident: $labelty:ty),+ $(,)?) ($($bucket:literal)*)) => {
#[derive(Clone)]
$vis struct $name($vecty);
impl $name {
pub fn with_opts(opts: prometheus::Opts) -> prometheus::Result<Self> {
let opts = prometheus::HistogramOpts::from(opts).buckets(vec![$(f64::from($bucket)),*]);
$vecty::new(opts.into(), &[$(stringify!($labels)),+]).map(Self)
}

pub fn with_label_values(&self, $($labels: &$labelty),+) -> <$vecty as $crate::util::typed_prometheus::ExtractMetricVecT>::M {
use $crate::util::typed_prometheus::AsPrometheusLabel as _;
self.0.with_label_values(&[ $($labels.as_prometheus_str().as_ref()),+ ])
}
}

impl prometheus::core::Collector for $name {
fn desc(&self) -> Vec<&prometheus::core::Desc> {
prometheus::core::Collector::desc(&self.0)
}

fn collect(&self) -> Vec<prometheus::proto::MetricFamily> {
prometheus::core::Collector::collect(&self.0)
}
}
};
}
pub use metrics_histogram_vec;

#[macro_export]
macro_rules! metrics_vec {
($vis:vis $name:ident: $vecty:ident($($labels:ident: $labelty:ty),+ $(,)?)) => {
Expand Down
6 changes: 6 additions & 0 deletions crates/core/src/worker_metrics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,12 @@ metrics_group!(
#[labels(identity: Identity, module_hash: Hash, database_address: Address, reducer_symbol: str)]
pub instance_queue_length: IntGaugeVec,

#[name = spacetime_worker_instance_operation_queue_length_histogram]
#[help = "Length of the wait queue for access to a module instance."]
#[labels(identity: Identity, module_hash: Hash, database_address: Address, reducer_symbol: str)]
#[buckets(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 15, 25, 50)]
pub instance_queue_length_histogram: HistogramVec,

#[name = spacetime_system_disk_space_total_bytes]
#[help = "A node's total disk space (in bytes)"]
pub system_disk_space_total: IntGauge,
Expand Down