Skip to content

Commit 54ab01b

Browse files
committed
Support sync streams
1 parent 7650661 commit 54ab01b

File tree

25 files changed

+850
-168
lines changed

25 files changed

+850
-168
lines changed

core/src/commonIntegrationTest/kotlin/com/powersync/sync/SyncIntegrationTest.kt

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,10 @@ import com.powersync.PowerSyncDatabase
77
import com.powersync.PowerSyncException
88
import com.powersync.TestConnector
99
import com.powersync.bucket.BucketChecksum
10-
import com.powersync.bucket.BucketPriority
1110
import com.powersync.bucket.Checkpoint
1211
import com.powersync.bucket.OpType
1312
import com.powersync.bucket.OplogEntry
13+
import com.powersync.bucket.StreamPriority
1414
import com.powersync.bucket.WriteCheckpointData
1515
import com.powersync.bucket.WriteCheckpointResponse
1616
import com.powersync.connectors.PowerSyncBackendConnector
@@ -164,7 +164,7 @@ abstract class BaseSyncIntegrationTest(
164164
add(
165165
BucketChecksum(
166166
bucket = "bucket$prio",
167-
priority = BucketPriority(prio),
167+
priority = StreamPriority(prio),
168168
checksum = 10 + prio,
169169
),
170170
)
@@ -217,7 +217,7 @@ abstract class BaseSyncIntegrationTest(
217217

218218
// Emit a partial sync complete for each priority but the last.
219219
for (priorityNo in 0..<3) {
220-
val priority = BucketPriority(priorityNo)
220+
val priority = StreamPriority(priorityNo)
221221
pushData(priorityNo)
222222
syncLines.send(
223223
SyncLine.CheckpointPartiallyComplete(
@@ -257,7 +257,7 @@ abstract class BaseSyncIntegrationTest(
257257
listOf(
258258
BucketChecksum(
259259
bucket = "bkt",
260-
priority = BucketPriority(1),
260+
priority = StreamPriority(1),
261261
checksum = 0,
262262
),
263263
),
@@ -267,17 +267,17 @@ abstract class BaseSyncIntegrationTest(
267267
syncLines.send(
268268
SyncLine.CheckpointPartiallyComplete(
269269
lastOpId = "0",
270-
priority = BucketPriority(1),
270+
priority = StreamPriority(1),
271271
),
272272
)
273273

274-
database.waitForFirstSync(BucketPriority(1))
274+
database.waitForFirstSync(StreamPriority(1))
275275
database.close()
276276

277277
// Connect to the same database again
278278
database = openDatabaseAndInitialize()
279279
database.currentStatus.hasSynced shouldBe false
280-
database.currentStatus.statusForPriority(BucketPriority(1)).hasSynced shouldBe true
280+
database.currentStatus.statusForPriority(StreamPriority(1)).hasSynced shouldBe true
281281
}
282282

283283
@Test

core/src/commonIntegrationTest/kotlin/com/powersync/sync/SyncProgressTest.kt

Lines changed: 10 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,12 @@ package com.powersync.sync
33
import app.cash.turbine.ReceiveTurbine
44
import app.cash.turbine.turbineScope
55
import com.powersync.bucket.BucketChecksum
6-
import com.powersync.bucket.BucketPriority
76
import com.powersync.bucket.Checkpoint
87
import com.powersync.bucket.OpType
98
import com.powersync.bucket.OplogEntry
9+
import com.powersync.bucket.StreamPriority
1010
import com.powersync.testutils.ActiveDatabaseTest
11+
import com.powersync.testutils.bucket
1112
import com.powersync.testutils.databaseTest
1213
import com.powersync.testutils.waitFor
1314
import io.kotest.assertions.withClue
@@ -32,18 +33,6 @@ abstract class BaseSyncProgressTest(
3233
lastOpId = 0
3334
}
3435

35-
private fun bucket(
36-
name: String,
37-
count: Int,
38-
priority: BucketPriority = BucketPriority(3),
39-
): BucketChecksum =
40-
BucketChecksum(
41-
bucket = name,
42-
priority = priority,
43-
checksum = 0,
44-
count = count,
45-
)
46-
4736
private suspend fun ActiveDatabaseTest.addDataLine(
4837
bucket: String,
4938
amount: Int,
@@ -68,7 +57,7 @@ abstract class BaseSyncProgressTest(
6857
)
6958
}
7059

71-
private suspend fun ActiveDatabaseTest.addCheckpointComplete(priority: BucketPriority? = null) {
60+
private suspend fun ActiveDatabaseTest.addCheckpointComplete(priority: StreamPriority? = null) {
7261
if (priority != null) {
7362
syncLines.send(
7463
SyncLine.CheckpointPartiallyComplete(
@@ -93,7 +82,7 @@ abstract class BaseSyncProgressTest(
9382

9483
private suspend fun ReceiveTurbine<SyncStatusData>.expectProgress(
9584
total: Pair<Int, Int>,
96-
priorities: Map<BucketPriority, Pair<Int, Int>> = emptyMap(),
85+
priorities: Map<StreamPriority, Pair<Int, Int>> = emptyMap(),
9786
) {
9887
val item = awaitItem()
9988
val progress = item.downloadProgress ?: error("Expected download progress on $item")
@@ -357,7 +346,7 @@ abstract class BaseSyncProgressTest(
357346
) {
358347
turbine.expectProgress(
359348
prio2,
360-
mapOf(BucketPriority(0) to prio0, BucketPriority(2) to prio2),
349+
mapOf(StreamPriority(0) to prio0, StreamPriority(2) to prio2),
361350
)
362351
}
363352

@@ -367,8 +356,8 @@ abstract class BaseSyncProgressTest(
367356
lastOpId = "10",
368357
checksums =
369358
listOf(
370-
bucket("a", 5, BucketPriority(0)),
371-
bucket("b", 5, BucketPriority(2)),
359+
bucket("a", 5, StreamPriority(0)),
360+
bucket("b", 5, StreamPriority(2)),
372361
),
373362
),
374363
),
@@ -378,7 +367,7 @@ abstract class BaseSyncProgressTest(
378367
addDataLine("a", 5)
379368
expectProgress(5 to 5, 5 to 10)
380369

381-
addCheckpointComplete(BucketPriority(0))
370+
addCheckpointComplete(StreamPriority(0))
382371
expectProgress(5 to 5, 5 to 10)
383372

384373
addDataLine("b", 2)
@@ -390,8 +379,8 @@ abstract class BaseSyncProgressTest(
390379
lastOpId = "14",
391380
updatedBuckets =
392381
listOf(
393-
bucket("a", 8, BucketPriority(0)),
394-
bucket("b", 6, BucketPriority(2)),
382+
bucket("a", 8, StreamPriority(0)),
383+
bucket("b", 6, StreamPriority(2)),
395384
),
396385
removedBuckets = emptyList(),
397386
),
Lines changed: 229 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,229 @@
1+
package com.powersync.sync
2+
3+
import app.cash.turbine.turbineScope
4+
import com.powersync.ExperimentalPowerSyncAPI
5+
import com.powersync.bucket.Checkpoint
6+
import com.powersync.bucket.StreamPriority
7+
import com.powersync.testutils.bucket
8+
import com.powersync.testutils.databaseTest
9+
import com.powersync.testutils.waitFor
10+
import com.powersync.utils.JsonParam
11+
import com.powersync.utils.JsonUtil
12+
import io.kotest.matchers.collections.shouldHaveSingleElement
13+
import io.kotest.matchers.collections.shouldHaveSize
14+
import io.kotest.matchers.nulls.shouldNotBeNull
15+
import io.kotest.matchers.shouldBe
16+
import kotlinx.coroutines.DelicateCoroutinesApi
17+
import kotlinx.coroutines.delay
18+
import kotlinx.serialization.json.JsonArray
19+
import kotlinx.serialization.json.JsonObject
20+
import kotlinx.serialization.json.buildJsonArray
21+
import kotlinx.serialization.json.buildJsonObject
22+
import kotlinx.serialization.json.jsonArray
23+
import kotlinx.serialization.json.jsonObject
24+
import kotlinx.serialization.json.jsonPrimitive
25+
import kotlinx.serialization.json.put
26+
import kotlin.test.Test
27+
28+
@OptIn(ExperimentalPowerSyncAPI::class, LegacySyncImplementation::class)
29+
class SyncStreamTest : AbstractSyncTest(true) {
30+
@Test
31+
fun `can disable default streams`() =
32+
databaseTest {
33+
database.connect(
34+
connector,
35+
options =
36+
SyncOptions(
37+
newClientImplementation = true,
38+
includeDefaultStreams = false,
39+
clientConfiguration = SyncClientConfiguration.ExistingClient(createSyncClient()),
40+
),
41+
)
42+
43+
turbineScope {
44+
val turbine = database.currentStatus.asFlow().testIn(this)
45+
turbine.waitFor { it.connected && !it.downloading }
46+
47+
requestedSyncStreams shouldHaveSingleElement {
48+
val streams = it.jsonObject["streams"]!!.jsonObject
49+
streams["include_defaults"]!!.jsonPrimitive.content shouldBe "false"
50+
51+
true
52+
}
53+
54+
turbine.cancelAndIgnoreRemainingEvents()
55+
}
56+
}
57+
58+
@Test
59+
fun `subscribes with streams`() =
60+
databaseTest {
61+
val a = database.syncStream("stream", mapOf("foo" to JsonParam.String("a"))).subscribe()
62+
val b = database.syncStream("stream", mapOf("foo" to JsonParam.String("b"))).subscribe(priority = StreamPriority(1))
63+
64+
database.connect(connector, options = getOptions())
65+
turbineScope {
66+
val turbine = database.currentStatus.asFlow().testIn(this)
67+
turbine.waitFor { it.connected && !it.downloading }
68+
69+
// Should request subscriptions
70+
requestedSyncStreams shouldHaveSingleElement {
71+
val streams = it.jsonObject["streams"]!!.jsonObject
72+
val subscriptions = streams["subscriptions"]!!.jsonArray
73+
74+
subscriptions shouldHaveSize 2
75+
JsonUtil.json.encodeToString(subscriptions[0]) shouldBe
76+
"""{"stream":"stream","parameters":{"foo":"a"},"override_priority":null}"""
77+
JsonUtil.json.encodeToString(subscriptions[1]) shouldBe
78+
"""{"stream":"stream","parameters":{"foo":"b"},"override_priority":1}"""
79+
true
80+
}
81+
82+
syncLines.send(
83+
SyncLine.FullCheckpoint(
84+
Checkpoint(
85+
lastOpId = "1",
86+
checksums =
87+
listOf(
88+
bucket(
89+
"a",
90+
0,
91+
subscriptions =
92+
buildJsonArray {
93+
add(defaultSubscription(0))
94+
},
95+
),
96+
bucket(
97+
"b",
98+
0,
99+
priority = StreamPriority(1),
100+
subscriptions =
101+
buildJsonArray {
102+
add(defaultSubscription(1))
103+
},
104+
),
105+
),
106+
streams = listOf(stream("stream", false)),
107+
),
108+
),
109+
)
110+
111+
// Subscriptions should be active now, but not marked as synced.
112+
var status = turbine.awaitItem()
113+
for (subscription in listOf(a, b)) {
114+
val subscriptionStatus = status.forStream(subscription)!!
115+
subscriptionStatus.subscription.active shouldBe true
116+
subscriptionStatus.subscription.lastSyncedAt shouldBe null
117+
subscriptionStatus.subscription.hasExplicitSubscription shouldBe true
118+
}
119+
120+
syncLines.send(
121+
SyncLine.CheckpointPartiallyComplete(
122+
lastOpId = "0",
123+
priority = StreamPriority(1),
124+
),
125+
)
126+
status = turbine.awaitItem()
127+
status.forStream(a)!!.subscription.lastSyncedAt shouldBe null
128+
status.forStream(b)!!.subscription.lastSyncedAt shouldNotBeNull {}
129+
b.waitForFirstSync()
130+
131+
syncLines.send(SyncLine.CheckpointComplete(lastOpId = "0"))
132+
a.waitForFirstSync()
133+
134+
turbine.cancelAndIgnoreRemainingEvents()
135+
}
136+
}
137+
138+
@Test
139+
fun `reports default streams`() =
140+
databaseTest {
141+
database.connect(connector, options = getOptions())
142+
turbineScope {
143+
val turbine = database.currentStatus.asFlow().testIn(this)
144+
turbine.waitFor { it.connected && !it.downloading }
145+
146+
syncLines.send(
147+
SyncLine.FullCheckpoint(
148+
Checkpoint(
149+
lastOpId = "1",
150+
checksums = listOf(),
151+
streams = listOf(stream("default_stream", true)),
152+
),
153+
),
154+
)
155+
156+
val status = turbine.awaitItem()
157+
status.syncStreams!! shouldHaveSingleElement {
158+
it.subscription.name shouldBe "default_stream"
159+
it.subscription.parameters shouldBe null
160+
it.subscription.isDefault shouldBe true
161+
it.subscription.hasExplicitSubscription shouldBe false
162+
true
163+
}
164+
165+
turbine.cancelAndIgnoreRemainingEvents()
166+
}
167+
}
168+
169+
@OptIn(DelicateCoroutinesApi::class)
170+
@Test
171+
fun `changes subscriptions dynamically`() =
172+
databaseTest {
173+
database.connect(connector, options = getOptions())
174+
turbineScope {
175+
val turbine = database.currentStatus.asFlow().testIn(this)
176+
turbine.waitFor { it.connected && !it.downloading }
177+
requestedSyncStreams.clear()
178+
179+
val subscription = database.syncStream("a").subscribe()
180+
181+
// Adding the subscription should reconnect
182+
turbine.waitFor { it.connected && !it.downloading }
183+
requestedSyncStreams shouldHaveSingleElement {
184+
val streams = it.jsonObject["streams"]!!.jsonObject
185+
val subscriptions = streams["subscriptions"]!!.jsonArray
186+
187+
subscriptions shouldHaveSize 1
188+
JsonUtil.json.encodeToString(subscriptions[0]) shouldBe """{"stream":"a","parameters":null,"override_priority":null}"""
189+
true
190+
}
191+
192+
// Given that the subscription has a default TTL, unsubscribing should not re-subscribe.
193+
subscription.unsubscribe()
194+
delay(100)
195+
turbine.expectNoEvents()
196+
197+
turbine.cancelAndIgnoreRemainingEvents()
198+
}
199+
}
200+
201+
@Test
202+
fun `subscriptions update while offline`() =
203+
databaseTest {
204+
turbineScope {
205+
val turbine = database.currentStatus.asFlow().testIn(this)
206+
turbine.awaitItem() // Ignore initial
207+
208+
// Subscribing while offline should add the stream to the subscriptions reported in the
209+
// status.
210+
val subscription = database.syncStream("foo").subscribe()
211+
val status = turbine.awaitItem()
212+
status.forStream(subscription) shouldNotBeNull {}
213+
214+
turbine.cancelAndIgnoreRemainingEvents()
215+
}
216+
}
217+
}
218+
219+
private fun stream(
220+
name: String,
221+
isDefault: Boolean,
222+
): JsonObject =
223+
buildJsonObject {
224+
put("name", name)
225+
put("is_default", isDefault)
226+
put("errors", JsonArray(emptyList()))
227+
}
228+
229+
private fun defaultSubscription(index: Int): JsonObject = buildJsonObject { put("sub", index) }

0 commit comments

Comments
 (0)