Skip to content

Commit 4d36ae3

Browse files
authored
Subscription API (#137)
## Description of Changes Implements the subscription builder (at least, the parts that are possible to implement). ## API - [ ] This is an API breaking change to the SDK *If the API is breaking, please state below what will break* ## Requires SpacetimeDB PRs *List any PRs here that are required for this SDK change to work*
1 parent e3a4213 commit 4d36ae3

File tree

5 files changed

+170
-93
lines changed

5 files changed

+170
-93
lines changed

examples~/quickstart/client/Program.cs

Lines changed: 32 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -5,30 +5,29 @@
55
using System.Net.WebSockets;
66
using System.Threading;
77
using SpacetimeDB;
8-
using SpacetimeDB.ClientApi;
98
using SpacetimeDB.Types;
109

1110
const string HOST = "http://localhost:3000";
1211
const string DBNAME = "chatqs";
1312

14-
DbConnection? conn = null;
15-
1613
// our local client SpacetimeDB identity
1714
Identity? local_identity = null;
1815
// declare a thread safe queue to store commands
1916
var input_queue = new ConcurrentQueue<(string Command, string Args)>();
20-
// declare a threadsafe cancel token to cancel the process loop
21-
var cancel_token = new CancellationTokenSource();
2217

2318
void Main()
2419
{
2520
AuthToken.Init(".spacetime_csharp_quickstart");
2621

22+
// TODO: just do `var conn = DbConnection...` when OnConnect signature is fixed.
23+
DbConnection? conn = null;
24+
2725
conn = DbConnection.Builder()
2826
.WithUri(HOST)
2927
.WithModuleName(DBNAME)
3028
//.WithCredentials((null, AuthToken.Token))
31-
.OnConnect(OnConnect)
29+
// TODO: change this to just `(OnConnect)` when signature is fixed in #131.
30+
.OnConnect((identity, authToken) => OnConnect(conn!, identity, authToken))
3231
.OnConnectError(OnConnectError)
3332
.OnDisconnect(OnDisconnect)
3433
.Build();
@@ -41,17 +40,19 @@ void Main()
4140
conn.RemoteReducers.OnSetName += Reducer_OnSetNameEvent;
4241
conn.RemoteReducers.OnSendMessage += Reducer_OnSendMessageEvent;
4342

44-
conn.onSubscriptionApplied += OnSubscriptionApplied;
4543
conn.onUnhandledReducerError += onUnhandledReducerError;
4644

45+
// declare a threadsafe cancel token to cancel the process loop
46+
var cancellationTokenSource = new CancellationTokenSource();
47+
4748
// spawn a thread to call process updates and process commands
48-
var thread = new Thread(ProcessThread);
49+
var thread = new Thread(() => ProcessThread(conn, cancellationTokenSource.Token));
4950
thread.Start();
5051

5152
InputLoop();
5253

5354
// this signals the ProcessThread to stop
54-
cancel_token.Cancel();
55+
cancellationTokenSource.Cancel();
5556
thread.Join();
5657
}
5758

@@ -84,9 +85,9 @@ void User_OnUpdate(EventContext ctx, User oldValue, User newValue)
8485
}
8586
}
8687

87-
void PrintMessage(Message message)
88+
void PrintMessage(RemoteTables tables, Message message)
8889
{
89-
var sender = conn.RemoteTables.User.FindByIdentity(message.Sender);
90+
var sender = tables.User.FindByIdentity(message.Sender);
9091
var senderName = "unknown";
9192
if (sender != null)
9293
{
@@ -100,7 +101,7 @@ void Message_OnInsert(EventContext ctx, Message insertedValue)
100101
{
101102
if (ctx.Reducer is not Event<Reducer>.SubscribeApplied)
102103
{
103-
PrintMessage(insertedValue);
104+
PrintMessage(ctx.Db, insertedValue);
104105
}
105106
}
106107

@@ -128,12 +129,18 @@ void Reducer_OnSendMessageEvent(EventContext ctx, string text)
128129
}
129130
}
130131

131-
void OnConnect(Identity identity, string authToken)
132+
void OnConnect(DbConnection conn, Identity identity, string authToken)
132133
{
133134
local_identity = identity;
134135
AuthToken.SaveToken(authToken);
135136

136-
conn!.Subscribe(new List<string> { "SELECT * FROM User", "SELECT * FROM Message" });
137+
conn.SubscriptionBuilder()
138+
.OnApplied(OnSubscriptionApplied)
139+
.Subscribe("SELECT * FROM User");
140+
141+
conn.SubscriptionBuilder()
142+
.OnApplied(OnSubscriptionApplied)
143+
.Subscribe("SELECT * FROM Message");
137144
}
138145

139146
void OnConnectError(WebSocketError? error, string message)
@@ -146,35 +153,35 @@ void OnDisconnect(DbConnection conn, WebSocketCloseStatus? status, WebSocketErro
146153

147154
}
148155

149-
void PrintMessagesInOrder()
156+
void PrintMessagesInOrder(RemoteTables tables)
150157
{
151-
foreach (Message message in conn.RemoteTables.Message.Iter().OrderBy(item => item.Sent))
158+
foreach (Message message in tables.Message.Iter().OrderBy(item => item.Sent))
152159
{
153-
PrintMessage(message);
160+
PrintMessage(tables, message);
154161
}
155162
}
156163

157-
void OnSubscriptionApplied()
164+
void OnSubscriptionApplied(EventContext ctx)
158165
{
159166
Console.WriteLine("Connected");
160-
PrintMessagesInOrder();
167+
PrintMessagesInOrder(ctx.Db);
161168
}
162169

163170
void onUnhandledReducerError(ReducerEvent<Reducer> reducerEvent)
164171
{
165172
Console.WriteLine($"Unhandled reducer error in {reducerEvent.Reducer}: {reducerEvent.Status}");
166173
}
167174

168-
void ProcessThread()
175+
void ProcessThread(DbConnection conn, CancellationToken ct)
169176
{
170177
try
171178
{
172179
// loop until cancellation token
173-
while (!cancel_token.IsCancellationRequested)
180+
while (!ct.IsCancellationRequested)
174181
{
175182
conn.Update();
176183

177-
ProcessCommands();
184+
ProcessCommands(conn.RemoteReducers);
178185

179186
Thread.Sleep(100);
180187
}
@@ -207,18 +214,18 @@ void InputLoop()
207214
}
208215
}
209216

210-
void ProcessCommands()
217+
void ProcessCommands(RemoteReducers reducers)
211218
{
212219
// process input queue commands
213220
while (input_queue.TryDequeue(out var command))
214221
{
215222
switch (command.Command)
216223
{
217224
case "message":
218-
conn.RemoteReducers.SendMessage(command.Args);
225+
reducers.SendMessage(command.Args);
219226
break;
220227
case "name":
221-
conn.RemoteReducers.SetName(command.Args);
228+
reducers.SetName(command.Args);
222229
break;
223230
}
224231
}

examples~/quickstart/client/module_bindings/_Globals/SpacetimeDBClient.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -158,5 +158,7 @@ protected override bool Dispatch(IEventContext context, Reducer reducer) {
158158
_ => throw new ArgumentOutOfRangeException("Reducer", $"Unknown reducer {reducer}")
159159
};
160160
}
161+
162+
public SubscriptionBuilder<EventContext> SubscriptionBuilder() => new(this);
161163
}
162164
}

src/Event.cs

Lines changed: 59 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,4 +35,62 @@ public record UnsubscribeApplied : Event<R>;
3535
public record SubscribeError(Exception Exception) : Event<R>;
3636
public record UnknownTransaction : Event<R>;
3737
}
38-
}
38+
39+
// TODO: Move those classes into EventContext, so that we wouldn't need repetitive generics.
40+
public sealed class SubscriptionBuilder<EventContext>
41+
where EventContext : IEventContext
42+
{
43+
private readonly IDbConnection conn;
44+
private event Action<EventContext>? Applied;
45+
private event Action<EventContext>? Error;
46+
47+
public SubscriptionBuilder(IDbConnection conn)
48+
{
49+
this.conn = conn;
50+
}
51+
52+
public SubscriptionBuilder<EventContext> OnApplied(Action<EventContext> callback)
53+
{
54+
Applied += callback;
55+
return this;
56+
}
57+
58+
public SubscriptionBuilder<EventContext> OnError(Action<EventContext> callback)
59+
{
60+
Error += callback;
61+
return this;
62+
}
63+
64+
public SubscriptionHandle<EventContext> Subscribe(string querySql) => new(conn, Applied, Error, querySql);
65+
}
66+
67+
public interface ISubscriptionHandle
68+
{
69+
void OnApplied(IEventContext ctx);
70+
}
71+
72+
public class SubscriptionHandle<EventContext> : ISubscriptionHandle
73+
where EventContext : IEventContext
74+
{
75+
private readonly Action<EventContext>? onApplied;
76+
77+
void ISubscriptionHandle.OnApplied(IEventContext ctx)
78+
{
79+
IsActive = true;
80+
onApplied?.Invoke((EventContext)ctx);
81+
}
82+
83+
internal SubscriptionHandle(IDbConnection conn, Action<EventContext>? onApplied, Action<EventContext>? onError, string querySql)
84+
{
85+
this.onApplied = onApplied;
86+
conn.Subscribe(this, querySql);
87+
}
88+
89+
public void Unsubscribe() => throw new NotImplementedException();
90+
91+
public void UnsuscribeThen(Action<EventContext> onEnd) => throw new NotImplementedException();
92+
93+
public bool IsEnded => false;
94+
public bool IsActive { get; private set; }
95+
}
96+
}

0 commit comments

Comments
 (0)