Skip to content

Commit 2088aeb

Browse files
joshua-spacetimecoolreader18
authored andcommitted
Track instance_queue_length by reducer (#530)
Co-authored-by: Noa <[email protected]>
1 parent e973fed commit 2088aeb

File tree

3 files changed

+112
-27
lines changed

3 files changed

+112
-27
lines changed

crates/core/src/host/module_host.rs

Lines changed: 42 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,9 @@ use crate::identity::Identity;
1212
use crate::json::client_api::{SubscriptionUpdateJson, TableRowOperationJson, TableUpdateJson};
1313
use crate::protobuf::client_api::{table_row_operation, SubscriptionUpdate, TableRowOperation, TableUpdate};
1414
use crate::subscription::module_subscription_actor::ModuleSubscriptionManager;
15-
use crate::util::lending_pool::{Closed, LendingPool, LentResource, PoolClosed};
15+
use crate::util::lending_pool::{Closed, LendingPool, LentResource, PoolClosed, WaiterGauge};
1616
use crate::util::notify_once::NotifyOnce;
17+
use crate::util::prometheus_handle::{GaugeInc, IntGaugeExt};
1718
use crate::worker_metrics::WORKER_METRICS;
1819
use base64::{engine::general_purpose::STANDARD as BASE_64_STD, Engine as _};
1920
use futures::{Future, FutureExt};
@@ -333,7 +334,10 @@ impl fmt::Debug for ModuleHost {
333334

334335
#[async_trait::async_trait]
335336
trait DynModuleHost: Send + Sync + 'static {
336-
async fn get_instance(&self) -> Result<(&HostThreadpool, Box<dyn ModuleInstance>), NoSuchModule>;
337+
async fn get_instance(
338+
&self,
339+
waiter_gauge_context: <InstancePoolGauge as WaiterGauge>::Context<'_>,
340+
) -> Result<(&HostThreadpool, Box<dyn ModuleInstance>), NoSuchModule>;
337341
fn inject_logs(&self, log_level: LogLevel, message: &str);
338342
fn one_off_query(
339343
&self,
@@ -349,7 +353,7 @@ trait DynModuleHost: Send + Sync + 'static {
349353
struct HostControllerActor<T: Module> {
350354
module: Arc<T>,
351355
threadpool: Arc<HostThreadpool>,
352-
instance_pool: LendingPool<T::Instance>,
356+
instance_pool: LendingPool<T::Instance, InstancePoolGauge>,
353357
start: NotifyOnce,
354358
}
355359

@@ -377,18 +381,38 @@ async fn select_first<A: Future, B: Future<Output = ()>>(fut_a: A, fut_b: B) ->
377381
}
378382
}
379383

384+
#[derive(Clone)]
385+
struct InstancePoolGauge;
386+
387+
impl WaiterGauge for InstancePoolGauge {
388+
type Context<'a> = &'a (&'a Identity, &'a Hash, &'a Address, &'a str);
389+
type IncGuard = GaugeInc;
390+
fn inc(&self, &(identity, module_hash, database_address, reducer_symbol): Self::Context<'_>) -> Self::IncGuard {
391+
WORKER_METRICS
392+
.instance_queue_length
393+
.with_label_values(identity, module_hash, database_address, reducer_symbol)
394+
.inc_scope()
395+
}
396+
}
397+
380398
#[async_trait::async_trait]
381399
impl<T: Module> DynModuleHost for HostControllerActor<T> {
382-
async fn get_instance(&self) -> Result<(&HostThreadpool, Box<dyn ModuleInstance>), NoSuchModule> {
400+
async fn get_instance(
401+
&self,
402+
waiter_gauge_context: <InstancePoolGauge as WaiterGauge>::Context<'_>,
403+
) -> Result<(&HostThreadpool, Box<dyn ModuleInstance>), NoSuchModule> {
383404
self.start.notified().await;
384405
// in the future we should do something like in the else branch here -- add more instances based on load.
385406
// we need to do write-skew retries first - right now there's only ever once instance per module.
386407
let inst = if true {
387-
self.instance_pool.request().await.map_err(|_| NoSuchModule)?
408+
self.instance_pool
409+
.request_with_context(waiter_gauge_context)
410+
.await
411+
.map_err(|_| NoSuchModule)?
388412
} else {
389413
const GET_INSTANCE_TIMEOUT: Duration = Duration::from_millis(500);
390414
select_first(
391-
self.instance_pool.request(),
415+
self.instance_pool.request_with_context(waiter_gauge_context),
392416
tokio::time::sleep(GET_INSTANCE_TIMEOUT).map(|()| self.spinup_new_instance()),
393417
)
394418
.await
@@ -476,11 +500,7 @@ pub enum InitDatabaseError {
476500
impl ModuleHost {
477501
pub fn new(threadpool: Arc<HostThreadpool>, mut module: impl Module) -> Self {
478502
let info = module.info();
479-
let waiter_gauge =
480-
WORKER_METRICS
481-
.instance_queue_length
482-
.with_label_values(&info.identity, &info.module_hash, &info.address);
483-
let instance_pool = LendingPool::new(waiter_gauge);
503+
let instance_pool = LendingPool::with_gauge(InstancePoolGauge);
484504
instance_pool.add_multiple(module.initial_instances()).unwrap();
485505
let inner = Arc::new(HostControllerActor {
486506
module: Arc::new(module),
@@ -505,12 +525,18 @@ impl ModuleHost {
505525
&self.info.subscription
506526
}
507527

508-
async fn call<F, R>(&self, f: F) -> Result<R, NoSuchModule>
528+
async fn call<F, R>(&self, reducer_name: &str, f: F) -> Result<R, NoSuchModule>
509529
where
510530
F: FnOnce(&mut dyn ModuleInstance) -> R + Send + 'static,
511531
R: Send + 'static,
512532
{
513-
let (threadpool, mut inst) = self.inner.get_instance().await?;
533+
let waiter_gauge_context = (
534+
&self.info.identity,
535+
&self.info.module_hash,
536+
&self.info.address,
537+
reducer_name,
538+
);
539+
let (threadpool, mut inst) = self.inner.get_instance(&waiter_gauge_context).await?;
514540

515541
let (tx, rx) = oneshot::channel();
516542
threadpool.spawn(move || {
@@ -561,7 +587,7 @@ impl ModuleHost {
561587
let args = args.into_tuple(self.info.typespace.with_type(schema))?;
562588
let caller_address = caller_address.unwrap_or(Address::__DUMMY);
563589

564-
self.call(move |inst| {
590+
self.call(reducer_name, move |inst| {
565591
inst.call_reducer(CallReducerParams {
566592
timestamp: Timestamp::now(),
567593
caller_identity,
@@ -619,13 +645,13 @@ impl ModuleHost {
619645
Some(schema) => args.into_tuple(schema)?,
620646
_ => ArgsTuple::default(),
621647
};
622-
self.call(move |inst| inst.init_database(fence, args))
648+
self.call("<init_database>", move |inst| inst.init_database(fence, args))
623649
.await?
624650
.map_err(InitDatabaseError::Other)
625651
}
626652

627653
pub async fn update_database(&self, fence: u128) -> Result<UpdateDatabaseResult, anyhow::Error> {
628-
self.call(move |inst| inst.update_database(fence))
654+
self.call("<update_database>", move |inst| inst.update_database(fence))
629655
.await?
630656
.map_err(Into::into)
631657
}

crates/core/src/util/lending_pool.rs

Lines changed: 69 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -15,24 +15,32 @@ use tokio::sync::{OwnedSemaphorePermit, Semaphore};
1515
use crate::util::prometheus_handle::IntGaugeExt;
1616

1717
use super::notify_once::{NotifiedOnce, NotifyOnce};
18+
use super::prometheus_handle::GaugeInc;
1819

19-
pub struct LendingPool<T> {
20+
pub struct LendingPool<T, G = ()> {
2021
sem: Arc<Semaphore>,
22+
waiter_gauge: G,
2123
inner: Arc<LendingPoolInner<T>>,
2224
}
2325

24-
impl<T> Clone for LendingPool<T> {
26+
impl<T, G: WaiterGauge + Default> Default for LendingPool<T, G> {
27+
fn default() -> Self {
28+
Self::new()
29+
}
30+
}
31+
32+
impl<T, G: Clone> Clone for LendingPool<T, G> {
2533
fn clone(&self) -> Self {
2634
Self {
2735
sem: self.sem.clone(),
36+
waiter_gauge: self.waiter_gauge.clone(),
2837
inner: self.inner.clone(),
2938
}
3039
}
3140
}
3241

3342
struct LendingPoolInner<T> {
3443
closed_notify: NotifyOnce,
35-
waiter_gauge: IntGauge,
3644
vec: Mutex<PoolVec<T>>,
3745
}
3846

@@ -44,18 +52,25 @@ struct PoolVec<T> {
4452
#[derive(Debug)]
4553
pub struct PoolClosed;
4654

47-
impl<T> LendingPool<T> {
48-
pub fn new(waiter_gauge: IntGauge) -> Self {
49-
Self::from_iter(std::iter::empty(), waiter_gauge)
55+
impl<T, G: WaiterGauge> LendingPool<T, G> {
56+
pub fn new() -> Self
57+
where
58+
G: Default,
59+
{
60+
Self::with_gauge(G::default())
61+
}
62+
63+
pub fn with_gauge(waiter_gauge: G) -> Self {
64+
Self::from_iter_with_gauge(std::iter::empty(), waiter_gauge)
5065
}
5166

52-
pub fn from_iter<I: IntoIterator<Item = T>>(iter: I, waiter_gauge: IntGauge) -> Self {
67+
pub fn from_iter_with_gauge<I: IntoIterator<Item = T>>(iter: I, waiter_gauge: G) -> Self {
5368
let deque = VecDeque::from_iter(iter);
5469
Self {
5570
sem: Arc::new(Semaphore::new(deque.len())),
71+
waiter_gauge,
5672
inner: Arc::new(LendingPoolInner {
5773
closed_notify: NotifyOnce::new(),
58-
waiter_gauge,
5974
vec: Mutex::new(PoolVec {
6075
total_count: deque.len(),
6176
deque: Some(deque),
@@ -64,10 +79,20 @@ impl<T> LendingPool<T> {
6479
}
6580
}
6681

67-
pub fn request(&self) -> impl Future<Output = Result<LentResource<T>, PoolClosed>> {
82+
pub fn request(&self) -> impl Future<Output = Result<LentResource<T>, PoolClosed>>
83+
where
84+
G: for<'a> WaiterGauge<Context<'a> = ()>,
85+
{
86+
self.request_with_context(())
87+
}
88+
89+
pub fn request_with_context(
90+
&self,
91+
context: G::Context<'_>,
92+
) -> impl Future<Output = Result<LentResource<T>, PoolClosed>> {
6893
let acq = self.sem.clone().acquire_owned();
6994
let pool_inner = self.inner.clone();
70-
let waiter_guard = pool_inner.waiter_gauge.inc_scope();
95+
let waiter_guard = self.waiter_gauge.inc(context);
7196
async move {
7297
let permit = acq.await.map_err(|_| PoolClosed)?;
7398
drop(waiter_guard);
@@ -129,6 +154,12 @@ impl<T> LendingPool<T> {
129154
}
130155
}
131156

157+
impl<T, G: WaiterGauge + Default> FromIterator<T> for LendingPool<T, G> {
158+
fn from_iter<I: IntoIterator<Item = T>>(iter: I) -> Self {
159+
Self::from_iter_with_gauge(iter, G::default())
160+
}
161+
}
162+
132163
pin_project_lite::pin_project! {
133164
pub struct Closed<'a> {
134165
#[pin]
@@ -194,3 +225,31 @@ impl<T> Drop for LentResource<T> {
194225
}
195226
}
196227
}
228+
229+
pub trait WaiterGauge {
230+
type Context<'a>;
231+
type IncGuard;
232+
fn inc(&self, context: Self::Context<'_>) -> Self::IncGuard;
233+
}
234+
235+
impl WaiterGauge for () {
236+
type Context<'a> = ();
237+
type IncGuard = ();
238+
fn inc(&self, (): ()) -> Self::IncGuard {}
239+
}
240+
241+
impl<G: WaiterGauge> WaiterGauge for Arc<G> {
242+
type Context<'a> = G::Context<'a>;
243+
type IncGuard = G::IncGuard;
244+
fn inc(&self, context: Self::Context<'_>) -> Self::IncGuard {
245+
(**self).inc(context)
246+
}
247+
}
248+
249+
impl WaiterGauge for IntGauge {
250+
type Context<'a> = ();
251+
type IncGuard = GaugeInc;
252+
fn inc(&self, (): ()) -> Self::IncGuard {
253+
self.inc_scope()
254+
}
255+
}

crates/core/src/worker_metrics/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ metrics_group!(
6666

6767
#[name = spacetime_worker_instance_operation_queue_length]
6868
#[help = "Length of the wait queue for access to a module instance."]
69-
#[labels(identity: Identity, module_hash: Hash, database_address: Address)]
69+
#[labels(identity: Identity, module_hash: Hash, database_address: Address, reducer_symbol: str)]
7070
pub instance_queue_length: IntGaugeVec,
7171

7272
#[name = spacetime_system_disk_space_total_bytes]

0 commit comments

Comments
 (0)