Skip to content

Commit 56197f9

Browse files
fix(1115): Identify subscribers by Identity AND Address
Closes #1115. Previously subscribers were only identified by their Identity. However the same Identity can be associated to different Addresses.
1 parent 2894d36 commit 56197f9

File tree

2 files changed

+43
-34
lines changed

2 files changed

+43
-34
lines changed

crates/core/src/subscription/module_subscription_actor.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ impl ModuleSubscriptions {
8888
// This also makes it possible for `broadcast_event` to get scheduled before the subsequent part here
8989
// but that should not pose an issue.
9090
let mut subscriptions = self.subscriptions.write();
91-
subscriptions.remove_subscription(&sender.id.identity);
91+
subscriptions.remove_subscription(&(sender.id.identity, sender.id.address));
9292
subscriptions.add_subscription(sender.clone(), execution_set.into_iter());
9393
let num_queries = subscriptions.num_queries();
9494

@@ -118,7 +118,7 @@ impl ModuleSubscriptions {
118118

119119
pub fn remove_subscriber(&self, client_id: ClientActorId) {
120120
let mut subscriptions = self.subscriptions.write();
121-
subscriptions.remove_subscription(&client_id.identity);
121+
subscriptions.remove_subscription(&(client_id.identity, client_id.address));
122122
WORKER_METRICS
123123
.subscription_queries
124124
.with_label_values(&self.relational_db.address())

crates/core/src/subscription/module_subscription_manager.rs

Lines changed: 41 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -10,13 +10,14 @@ use rayon::iter::{IntoParallelIterator, ParallelIterator};
1010
use smallvec::SmallVec;
1111
use spacetimedb_client_api_messages::client_api::{TableRowOperation, TableUpdate};
1212
use spacetimedb_data_structures::map::{Entry, HashMap, HashSet, IntMap};
13-
use spacetimedb_lib::Identity;
13+
use spacetimedb_lib::{Address, Identity};
1414
use spacetimedb_primitives::TableId;
1515
use std::ops::Deref;
1616
use std::sync::Arc;
1717

1818
type Query = Arc<ExecutionUnit>;
1919
type Client = Arc<ClientConnectionSender>;
20+
type Id = (Identity, Address);
2021

2122
/// Responsible for the efficient evaluation of subscriptions.
2223
/// It performs basic multi-query optimization,
@@ -26,17 +27,17 @@ type Client = Arc<ClientConnectionSender>;
2627
#[derive(Debug, Default)]
2728
pub struct SubscriptionManager {
2829
// Subscriber identities and their client connections.
29-
clients: HashMap<Identity, Client>,
30+
clients: HashMap<Id, Client>,
3031
// Queries for which there is at least one subscriber.
3132
queries: HashMap<QueryHash, Query>,
3233
// The subscribers for each query.
33-
subscribers: HashMap<QueryHash, HashSet<Identity>>,
34+
subscribers: HashMap<QueryHash, HashSet<Id>>,
3435
// Inverted index from tables to queries that read from them.
3536
tables: IntMap<TableId, HashSet<QueryHash>>,
3637
}
3738

3839
impl SubscriptionManager {
39-
pub fn client(&self, id: &Identity) -> Client {
40+
pub fn client(&self, id: &Id) -> Client {
4041
self.clients[id].clone()
4142
}
4243

@@ -54,7 +55,7 @@ impl SubscriptionManager {
5455
}
5556

5657
#[cfg(test)]
57-
fn contains_subscription(&self, subscriber: &Identity, query: &QueryHash) -> bool {
58+
fn contains_subscription(&self, subscriber: &Id, query: &QueryHash) -> bool {
5859
self.subscribers.get(query).is_some_and(|ids| ids.contains(subscriber))
5960
}
6061

@@ -68,7 +69,7 @@ impl SubscriptionManager {
6869
/// its table ids added to the inverted index.
6970
#[tracing::instrument(skip_all)]
7071
pub fn add_subscription(&mut self, client: Client, queries: impl IntoIterator<Item = Query>) {
71-
let id = client.id.identity;
72+
let id = (client.id.identity, client.id.address);
7273
self.clients.insert(id, client);
7374
for unit in queries {
7475
let hash = unit.hash();
@@ -83,7 +84,7 @@ impl SubscriptionManager {
8384
/// If a query no longer has any subscribers,
8485
/// it is removed from the index along with its table ids.
8586
#[tracing::instrument(skip_all)]
86-
pub fn remove_subscription(&mut self, client: &Identity) {
87+
pub fn remove_subscription(&mut self, client: &Id) {
8788
// Remove `hash` from the set of queries for `table_id`.
8889
// When the table has no queries, cleanup the map entry altogether.
8990
let mut remove_table_query = |table_id: TableId, hash: &QueryHash| {
@@ -188,7 +189,7 @@ impl SubscriptionManager {
188189
// so we'll have either `TableUpdate` (`Protocol::Binary`)
189190
// or `TableUpdateJson` (`Protocol::Text`).
190191
.fold(
191-
HashMap::<(&Identity, TableId), Either<TableUpdate, TableUpdateJson>>::new(),
192+
HashMap::<(&Id, TableId), Either<TableUpdate, TableUpdateJson>>::new(),
192193
|mut tables, (id, table_id, table_name, ops)| {
193194
match tables.entry((id, table_id)) {
194195
Entry::Occupied(mut entry) => match ops {
@@ -220,7 +221,7 @@ impl SubscriptionManager {
220221
// So before sending the updates to each client,
221222
// we must stitch together the `TableUpdate*`s into an aggregated list.
222223
.fold(
223-
HashMap::<&Identity, Either<Vec<TableUpdate>, Vec<TableUpdateJson>>>::new(),
224+
HashMap::<&Id, Either<Vec<TableUpdate>, Vec<TableUpdateJson>>>::new(),
224225
|mut updates, ((id, _), update)| {
225226
let entry = updates.entry(id);
226227
match update {
@@ -265,12 +266,12 @@ impl SubscriptionManager {
265266
mod tests {
266267
use std::sync::Arc;
267268

268-
use spacetimedb_lib::{error::ResultTest, AlgebraicType, Identity};
269+
use spacetimedb_lib::{error::ResultTest, Address, AlgebraicType, Identity};
269270
use spacetimedb_primitives::TableId;
270271
use spacetimedb_vm::expr::CrudExpr;
271272

272273
use crate::{
273-
client::{ClientActorId, ClientConnectionSender, Protocol},
274+
client::{ClientActorId, ClientConnectionSender, ClientName, Protocol},
274275
db::relational_db::{tests_utils::TestDB, RelationalDB},
275276
execution_context::ExecutionContext,
276277
sql::compiler::compile_sql,
@@ -300,6 +301,21 @@ mod tests {
300301
})
301302
}
302303

304+
fn id(address: u128) -> (Identity, Address) {
305+
(Identity::ZERO, Address::from_u128(address))
306+
}
307+
308+
fn client(address: u128) -> ClientConnectionSender {
309+
ClientConnectionSender::dummy(
310+
ClientActorId {
311+
identity: Identity::ZERO,
312+
address: Address::from_u128(address),
313+
name: ClientName(0),
314+
},
315+
Protocol::Binary,
316+
)
317+
}
318+
303319
#[test]
304320
fn test_subscribe() -> ResultTest<()> {
305321
let db = TestDB::durable()?;
@@ -309,9 +325,8 @@ mod tests {
309325
let plan = compile_plan(&db, sql)?;
310326
let hash = plan.hash();
311327

312-
let id = Identity::ZERO;
313-
let client = ClientActorId::for_test(id);
314-
let client = Arc::new(ClientConnectionSender::dummy(client, Protocol::Binary));
328+
let id = id(0);
329+
let client = Arc::new(client(0));
315330

316331
let mut subscriptions = SubscriptionManager::default();
317332
subscriptions.add_subscription(client, [plan]);
@@ -332,9 +347,8 @@ mod tests {
332347
let plan = compile_plan(&db, sql)?;
333348
let hash = plan.hash();
334349

335-
let id = Identity::ZERO;
336-
let client = ClientActorId::for_test(id);
337-
let client = Arc::new(ClientConnectionSender::dummy(client, Protocol::Binary));
350+
let id = id(0);
351+
let client = Arc::new(client(0));
338352

339353
let mut subscriptions = SubscriptionManager::default();
340354
subscriptions.add_subscription(client, [plan]);
@@ -356,9 +370,8 @@ mod tests {
356370
let plan = compile_plan(&db, sql)?;
357371
let hash = plan.hash();
358372

359-
let id = Identity::ZERO;
360-
let client = ClientActorId::for_test(id);
361-
let client = Arc::new(ClientConnectionSender::dummy(client, Protocol::Binary));
373+
let id = id(0);
374+
let client = Arc::new(client(0));
362375

363376
let mut subscriptions = SubscriptionManager::default();
364377
subscriptions.add_subscription(client.clone(), [plan.clone()]);
@@ -386,13 +399,11 @@ mod tests {
386399
let plan = compile_plan(&db, sql)?;
387400
let hash = plan.hash();
388401

389-
let id0 = Identity::ZERO;
390-
let client0 = ClientActorId::for_test(id0);
391-
let client0 = Arc::new(ClientConnectionSender::dummy(client0, Protocol::Binary));
402+
let id0 = id(0);
403+
let client0 = Arc::new(client(0));
392404

393-
let id1 = Identity::from_byte_array([1; 32]);
394-
let client1 = ClientActorId::for_test(id1);
395-
let client1 = Arc::new(ClientConnectionSender::dummy(client1, Protocol::Binary));
405+
let id1 = id(1);
406+
let client1 = Arc::new(client(1));
396407

397408
let mut subscriptions = SubscriptionManager::default();
398409
subscriptions.add_subscription(client0, [plan.clone()]);
@@ -433,13 +444,11 @@ mod tests {
433444
let hash_select0 = plan_select0.hash();
434445
let hash_select1 = plan_select1.hash();
435446

436-
let id0 = Identity::ZERO;
437-
let client0 = ClientActorId::for_test(id0);
438-
let client0 = Arc::new(ClientConnectionSender::dummy(client0, Protocol::Binary));
447+
let id0 = id(0);
448+
let client0 = Arc::new(client(0));
439449

440-
let id1 = Identity::from_byte_array([1; 32]);
441-
let client1 = ClientActorId::for_test(id1);
442-
let client1 = Arc::new(ClientConnectionSender::dummy(client1, Protocol::Binary));
450+
let id1 = id(1);
451+
let client1 = Arc::new(client(1));
443452

444453
let mut subscriptions = SubscriptionManager::default();
445454
subscriptions.add_subscription(client0, [plan_scan.clone(), plan_select0.clone()]);

0 commit comments

Comments
 (0)