Skip to content

Commit bab3255

Browse files
committed
New tests
1 parent 54ab01b commit bab3255

File tree

8 files changed

+114
-58
lines changed

8 files changed

+114
-58
lines changed

core/src/commonIntegrationTest/kotlin/com/powersync/sync/SyncProgressTest.kt

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,12 @@ package com.powersync.sync
33
import app.cash.turbine.ReceiveTurbine
44
import app.cash.turbine.turbineScope
55
import com.powersync.bucket.BucketChecksum
6+
import com.powersync.bucket.BucketPriority
67
import com.powersync.bucket.Checkpoint
78
import com.powersync.bucket.OpType
89
import com.powersync.bucket.OplogEntry
910
import com.powersync.bucket.StreamPriority
1011
import com.powersync.testutils.ActiveDatabaseTest
11-
import com.powersync.testutils.bucket
1212
import com.powersync.testutils.databaseTest
1313
import com.powersync.testutils.waitFor
1414
import io.kotest.assertions.withClue
@@ -33,6 +33,18 @@ abstract class BaseSyncProgressTest(
3333
lastOpId = 0
3434
}
3535

36+
private fun bucket(
37+
name: String,
38+
count: Int,
39+
priority: StreamPriority = StreamPriority(3),
40+
): BucketChecksum =
41+
BucketChecksum(
42+
bucket = name,
43+
priority = priority,
44+
checksum = 0,
45+
count = count,
46+
)
47+
3648
private suspend fun ActiveDatabaseTest.addDataLine(
3749
bucket: String,
3850
amount: Int,

core/src/commonIntegrationTest/kotlin/com/powersync/sync/SyncStreamTest.kt

Lines changed: 94 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,7 @@ package com.powersync.sync
22

33
import app.cash.turbine.turbineScope
44
import com.powersync.ExperimentalPowerSyncAPI
5-
import com.powersync.bucket.Checkpoint
65
import com.powersync.bucket.StreamPriority
7-
import com.powersync.testutils.bucket
86
import com.powersync.testutils.databaseTest
97
import com.powersync.testutils.waitFor
108
import com.powersync.utils.JsonParam
@@ -15,6 +13,7 @@ import io.kotest.matchers.nulls.shouldNotBeNull
1513
import io.kotest.matchers.shouldBe
1614
import kotlinx.coroutines.DelicateCoroutinesApi
1715
import kotlinx.coroutines.delay
16+
import kotlinx.serialization.ExperimentalSerializationApi
1817
import kotlinx.serialization.json.JsonArray
1918
import kotlinx.serialization.json.JsonObject
2019
import kotlinx.serialization.json.buildJsonArray
@@ -80,31 +79,26 @@ class SyncStreamTest : AbstractSyncTest(true) {
8079
}
8180

8281
syncLines.send(
83-
SyncLine.FullCheckpoint(
84-
Checkpoint(
85-
lastOpId = "1",
86-
checksums =
87-
listOf(
88-
bucket(
89-
"a",
90-
0,
91-
subscriptions =
92-
buildJsonArray {
93-
add(defaultSubscription(0))
94-
},
95-
),
96-
bucket(
97-
"b",
98-
0,
99-
priority = StreamPriority(1),
100-
subscriptions =
101-
buildJsonArray {
102-
add(defaultSubscription(1))
103-
},
104-
),
105-
),
106-
streams = listOf(stream("stream", false)),
82+
checkpointLine(
83+
listOf(
84+
bucket(
85+
"a",
86+
3,
87+
subscriptions =
88+
buildJsonArray {
89+
add(defaultSubscription(0))
90+
},
91+
),
92+
bucket(
93+
"b",
94+
1,
95+
subscriptions =
96+
buildJsonArray {
97+
add(defaultSubscription(1))
98+
},
99+
),
107100
),
101+
listOf(stream("stream", false)),
108102
),
109103
)
110104

@@ -143,15 +137,7 @@ class SyncStreamTest : AbstractSyncTest(true) {
143137
val turbine = database.currentStatus.asFlow().testIn(this)
144138
turbine.waitFor { it.connected && !it.downloading }
145139

146-
syncLines.send(
147-
SyncLine.FullCheckpoint(
148-
Checkpoint(
149-
lastOpId = "1",
150-
checksums = listOf(),
151-
streams = listOf(stream("default_stream", true)),
152-
),
153-
),
154-
)
140+
syncLines.send(checkpointLine(listOf(), listOf(stream("default_stream", true))))
155141

156142
val status = turbine.awaitItem()
157143
status.syncStreams!! shouldHaveSingleElement {
@@ -214,8 +200,81 @@ class SyncStreamTest : AbstractSyncTest(true) {
214200
turbine.cancelAndIgnoreRemainingEvents()
215201
}
216202
}
203+
204+
@Test
205+
fun `unsubscribing multiple times has no effect`() =
206+
databaseTest {
207+
val a = database.syncStream("a").subscribe()
208+
val aAgain = database.syncStream("a").subscribe()
209+
a.unsubscribe()
210+
a.unsubscribe()
211+
212+
// Pretend the streams are expired - they should still be requested because the core
213+
// extension extends the lifetime of streams currently referenced before connecting.
214+
database.execute("UPDATE ps_stream_subscriptions SET expires_at = unixepoch() - 1000")
215+
216+
database.connect(connector, options = getOptions())
217+
database.waitForStatusMatching { it.connected }
218+
requestedSyncStreams shouldHaveSingleElement {
219+
val streams = it.jsonObject["streams"]!!.jsonObject
220+
val subscriptions = streams["subscriptions"]!!.jsonArray
221+
subscriptions shouldHaveSize 1
222+
true
223+
}
224+
aAgain.unsubscribe()
225+
}
226+
227+
@Test
228+
fun unsubscribeAll() =
229+
databaseTest {
230+
val a = database.syncStream("a").subscribe()
231+
database.syncStream("a").unsubscribeAll()
232+
233+
// Despite a being active, it should not be requested.
234+
database.connect(connector, options = getOptions())
235+
database.waitForStatusMatching { it.connected }
236+
requestedSyncStreams shouldHaveSingleElement {
237+
val streams = it.jsonObject["streams"]!!.jsonObject
238+
val subscriptions = streams["subscriptions"]!!.jsonArray
239+
subscriptions shouldHaveSize 0
240+
true
241+
}
242+
a.unsubscribe()
243+
}
217244
}
218245

246+
@OptIn(ExperimentalSerializationApi::class)
247+
private fun checkpointLine(
248+
buckets: List<JsonObject>,
249+
streams: List<JsonObject>,
250+
): JsonObject =
251+
buildJsonObject {
252+
put("checkpoint", checkpoint(buckets, streams))
253+
}
254+
255+
@OptIn(ExperimentalSerializationApi::class)
256+
private fun checkpoint(
257+
buckets: List<JsonObject>,
258+
streams: List<JsonObject>,
259+
): JsonObject =
260+
buildJsonObject {
261+
put("last_op_id", "0")
262+
put("buckets", buildJsonArray { addAll(buckets) })
263+
put("streams", buildJsonArray { addAll(streams) })
264+
}
265+
266+
private fun bucket(
267+
name: String,
268+
priority: Int,
269+
subscriptions: JsonArray? = null,
270+
): JsonObject =
271+
buildJsonObject {
272+
put("bucket", name)
273+
put("priority", priority)
274+
put("checksum", 0)
275+
subscriptions?.let { put("subscriptions", it) }
276+
}
277+
219278
private fun stream(
220279
name: String,
221280
isDefault: Boolean,

core/src/commonIntegrationTest/kotlin/com/powersync/testutils/TestUtils.kt

Lines changed: 0 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,6 @@ import com.powersync.DatabaseDriverFactory
1111
import com.powersync.ExperimentalPowerSyncAPI
1212
import com.powersync.PowerSyncTestLogWriter
1313
import com.powersync.TestConnector
14-
import com.powersync.bucket.BucketChecksum
15-
import com.powersync.bucket.StreamPriority
1614
import com.powersync.bucket.WriteCheckpointData
1715
import com.powersync.bucket.WriteCheckpointResponse
1816
import com.powersync.createPowerSyncDatabaseImpl
@@ -28,7 +26,6 @@ import kotlinx.coroutines.channels.Channel
2826
import kotlinx.coroutines.test.TestScope
2927
import kotlinx.coroutines.test.runTest
3028
import kotlinx.io.files.Path
31-
import kotlinx.serialization.json.JsonArray
3229
import kotlinx.serialization.json.JsonElement
3330

3431
expect val factory: DatabaseDriverFactory
@@ -156,18 +153,3 @@ internal class ActiveDatabaseTest(
156153
cleanup(path)
157154
}
158155
}
159-
160-
internal fun bucket(
161-
name: String,
162-
count: Int,
163-
priority: StreamPriority = StreamPriority(3),
164-
checksum: Int = 0,
165-
subscriptions: JsonArray? = null,
166-
): BucketChecksum =
167-
BucketChecksum(
168-
bucket = name,
169-
priority = priority,
170-
checksum = checksum,
171-
count = count,
172-
subscriptions = subscriptions,
173-
)

core/src/commonMain/kotlin/com/powersync/bucket/BucketChecksum.kt

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ package com.powersync.bucket
33
import com.powersync.sync.LegacySyncImplementation
44
import kotlinx.serialization.SerialName
55
import kotlinx.serialization.Serializable
6-
import kotlinx.serialization.json.JsonArray
76

87
@LegacySyncImplementation
98
@Serializable
@@ -13,5 +12,4 @@ internal data class BucketChecksum(
1312
val checksum: Int,
1413
val count: Int? = null,
1514
@SerialName("last_op_id") val lastOpId: String? = null,
16-
val subscriptions: JsonArray? = null,
1715
)

core/src/commonMain/kotlin/com/powersync/bucket/Checkpoint.kt

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@ internal data class Checkpoint(
1111
@SerialName("last_op_id") val lastOpId: String,
1212
@SerialName("buckets") val checksums: List<BucketChecksum>,
1313
@SerialName("write_checkpoint") val writeCheckpoint: String? = null,
14-
val streams: List<JsonElement>? = null,
1514
) {
1615
fun clone(): Checkpoint = Checkpoint(lastOpId, checksums, writeCheckpoint)
1716
}

core/src/commonMain/kotlin/com/powersync/db/StreamImpl.kt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ internal class StreamTracker(
2121
val streamGroups = mutableMapOf<StreamKey, SubscriptionGroup>()
2222
val currentlyReferencedStreams = MutableStateFlow(listOf<SubscriptionGroup>())
2323

24-
private suspend fun subscriptionsCommand(command: RustSubscriptionChangeRequest) {
24+
suspend fun subscriptionsCommand(command: RustSubscriptionChangeRequest) {
2525
db.writeTransaction { tx ->
2626
tx.execute("SELECT powersync_control(?,?)", listOf("subscriptions", jsonDontEncodeDefaults.encodeToString(command)))
2727
}
@@ -101,6 +101,7 @@ internal class PendingStream(
101101
override suspend fun unsubscribeAll() {
102102
tracker.groupMutex.withLock {
103103
tracker.removeStreamGroup(key)
104+
tracker.subscriptionsCommand(RustSubscriptionChangeRequest(unsubscribe = key))
104105
}
105106
}
106107
}

core/src/commonTest/kotlin/com/powersync/testutils/MockSyncService.kt

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import io.ktor.utils.io.writer
2727
import kotlinx.coroutines.CoroutineScope
2828
import kotlinx.coroutines.channels.ReceiveChannel
2929
import kotlinx.coroutines.channels.consume
30+
import kotlinx.serialization.json.JsonElement
3031

3132
/**
3233
* A mock HTTP engine providing sync lines read from a coroutines [ReceiveChannel].
@@ -69,6 +70,10 @@ internal class MockSyncService(
6970
val serializedLine = JsonUtil.json.encodeToString(line)
7071
channel.writeStringUtf8("$serializedLine\n")
7172
}
73+
is JsonElement -> {
74+
val serializedLine = JsonUtil.json.encodeToString(line)
75+
channel.writeStringUtf8("$serializedLine\n")
76+
}
7277
is ByteArray -> {
7378
channel.writeByteArray(line)
7479
}

0 commit comments

Comments
 (0)