Skip to content

Commit 3618ede

Browse files
Merge pull request #108 from ably-labs/47-implement-discontinuities
[ECO-4982] Implement `Messages` discontinuities (CHA-M7)
2 parents 37b00b9 + b1860c2 commit 3618ede

File tree

6 files changed

+101
-24
lines changed

6 files changed

+101
-24
lines changed

Sources/AblyChat/DefaultMessages.swift

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -13,16 +13,16 @@ private struct MessageSubscriptionWrapper {
1313
@MainActor
1414
internal final class DefaultMessages: Messages, EmitsDiscontinuities {
1515
private let roomID: String
16-
public nonisolated let channel: RealtimeChannelProtocol
16+
public nonisolated let featureChannel: FeatureChannel
1717
private let chatAPI: ChatAPI
1818
private let clientID: String
1919

2020
// TODO: https://github.com/ably-labs/ably-chat-swift/issues/36 - Handle unsubscribing in line with CHA-M4b
2121
// UUID acts as a unique identifier for each listener/subscription. MessageSubscriptionWrapper houses the subscription and the timeserial of when it was attached or resumed.
2222
private var subscriptionPoints: [UUID: MessageSubscriptionWrapper] = [:]
2323

24-
internal nonisolated init(channel: RealtimeChannelProtocol, chatAPI: ChatAPI, roomID: String, clientID: String) async {
25-
self.channel = channel
24+
internal nonisolated init(featureChannel: FeatureChannel, chatAPI: ChatAPI, roomID: String, clientID: String) async {
25+
self.featureChannel = featureChannel
2626
self.chatAPI = chatAPI
2727
self.roomID = roomID
2828
self.clientID = clientID
@@ -32,6 +32,10 @@ internal final class DefaultMessages: Messages, EmitsDiscontinuities {
3232
await handleChannelEvents(roomId: roomID)
3333
}
3434

35+
internal nonisolated var channel: any RealtimeChannelProtocol {
36+
featureChannel.channel
37+
}
38+
3539
// (CHA-M4) Messages can be received via a subscription in realtime.
3640
internal func subscribe(bufferingPolicy: BufferingPolicy) async throws -> MessageSubscription {
3741
let uuid = UUID()
@@ -99,9 +103,9 @@ internal final class DefaultMessages: Messages, EmitsDiscontinuities {
99103
try await chatAPI.sendMessage(roomId: roomID, params: params)
100104
}
101105

102-
// TODO: (CHA-M7) Users may subscribe to discontinuity events to know when there’s been a break in messages that they need to resolve. Their listener will be called when a discontinuity event is triggered from the room lifecycle. - https://github.com/ably-labs/ably-chat-swift/issues/47
103-
internal nonisolated func subscribeToDiscontinuities() -> Subscription<ARTErrorInfo> {
104-
fatalError("not implemented")
106+
// (CHA-M7) Users may subscribe to discontinuity events to know when there’s been a break in messages that they need to resolve. Their listener will be called when a discontinuity event is triggered from the room lifecycle.
107+
internal func subscribeToDiscontinuities() async -> Subscription<ARTErrorInfo> {
108+
await featureChannel.subscribeToDiscontinuities()
105109
}
106110

107111
private func getBeforeSubscriptionStart(_ uuid: UUID, params: QueryOptions) async throws -> any PaginatedResult<Message> {

Sources/AblyChat/DefaultRoomLifecycleContributor.swift

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,9 @@
11
import Ably
22

3-
internal actor DefaultRoomLifecycleContributor: RoomLifecycleContributor {
3+
internal actor DefaultRoomLifecycleContributor: RoomLifecycleContributor, EmitsDiscontinuities {
44
internal let channel: DefaultRoomLifecycleContributorChannel
55
internal let feature: RoomFeature
6+
private var discontinuitySubscriptions: [Subscription<ARTErrorInfo>] = []
67

78
internal init(channel: DefaultRoomLifecycleContributorChannel, feature: RoomFeature) {
89
self.channel = channel
@@ -11,8 +12,17 @@ internal actor DefaultRoomLifecycleContributor: RoomLifecycleContributor {
1112

1213
// MARK: - Discontinuities
1314

14-
internal func emitDiscontinuity(_: ARTErrorInfo) {
15-
// TODO: https://github.com/ably-labs/ably-chat-swift/issues/47
15+
internal func emitDiscontinuity(_ error: ARTErrorInfo) {
16+
for subscription in discontinuitySubscriptions {
17+
subscription.emit(error)
18+
}
19+
}
20+
21+
internal func subscribeToDiscontinuities() -> Subscription<ARTErrorInfo> {
22+
let subscription = Subscription<ARTErrorInfo>(bufferingPolicy: .unbounded)
23+
// TODO: clean up old subscriptions (https://github.com/ably-labs/ably-chat-swift/issues/36)
24+
discontinuitySubscriptions.append(subscription)
25+
return subscription
1626
}
1727
}
1828

Sources/AblyChat/Room.swift

Lines changed: 7 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -81,37 +81,32 @@ internal actor DefaultRoom<LifecycleManagerFactory: RoomLifecycleManagerFactory>
8181
throw ARTErrorInfo.create(withCode: 40000, message: "Ensure your Realtime instance is initialized with a clientId.")
8282
}
8383

84-
channels = Self.createChannels(roomID: roomID, realtime: realtime)
85-
let contributors = Self.createContributors(channels: channels)
84+
let featureChannels = Self.createFeatureChannels(roomID: roomID, realtime: realtime)
85+
channels = featureChannels.mapValues(\.channel)
86+
let contributors = featureChannels.values.map(\.contributor)
8687

8788
lifecycleManager = await lifecycleManagerFactory.createManager(
8889
contributors: contributors,
8990
logger: logger
9091
)
9192

9293
messages = await DefaultMessages(
93-
channel: channels[.messages]!,
94+
featureChannel: featureChannels[.messages]!,
9495
chatAPI: chatAPI,
9596
roomID: roomID,
9697
clientID: clientId
9798
)
9899
}
99100

100-
private static func createChannels(roomID: String, realtime: RealtimeClient) -> [RoomFeature: RealtimeChannelProtocol] {
101+
private static func createFeatureChannels(roomID: String, realtime: RealtimeClient) -> [RoomFeature: DefaultFeatureChannel] {
101102
.init(uniqueKeysWithValues: [RoomFeature.messages].map { feature in
102103
let channel = realtime.getChannel(feature.channelNameForRoomID(roomID))
104+
let contributor = DefaultRoomLifecycleContributor(channel: .init(underlyingChannel: channel), feature: feature)
103105

104-
return (feature, channel)
106+
return (feature, .init(channel: channel, contributor: contributor))
105107
})
106108
}
107109

108-
private static func createContributors(channels: [RoomFeature: RealtimeChannelProtocol]) -> [DefaultRoomLifecycleContributor] {
109-
channels.map { entry in
110-
let (feature, channel) = entry
111-
return .init(channel: .init(underlyingChannel: channel), feature: feature)
112-
}
113-
}
114-
115110
public nonisolated var presence: any Presence {
116111
fatalError("Not yet implemented")
117112
}

Sources/AblyChat/RoomFeature.swift

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
import Ably
2+
13
/// The features offered by a chat room.
24
internal enum RoomFeature {
35
case messages
@@ -21,3 +23,22 @@ internal enum RoomFeature {
2123
}
2224
}
2325
}
26+
27+
/// Provides all of the channel-related functionality that a room feature (e.g. an implementation of ``Messages``) needs.
28+
///
29+
/// This mishmash exists to give a room feature access to both:
30+
///
31+
/// - a `RealtimeChannelProtocol` object (this is the interface that our features are currently written against, as opposed to, say, `RoomLifecycleContributorChannel`)
32+
/// - the discontinuities emitted by the room lifecycle
33+
internal protocol FeatureChannel: Sendable, EmitsDiscontinuities {
34+
var channel: RealtimeChannelProtocol { get }
35+
}
36+
37+
internal struct DefaultFeatureChannel: FeatureChannel {
38+
internal var channel: RealtimeChannelProtocol
39+
internal var contributor: DefaultRoomLifecycleContributor
40+
41+
internal func subscribeToDiscontinuities() async -> Subscription<ARTErrorInfo> {
42+
await contributor.subscribeToDiscontinuities()
43+
}
44+
}

Tests/AblyChatTests/DefaultMessagesTests.swift

Lines changed: 26 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,8 @@ struct DefaultMessagesTests {
1111
let realtime = MockRealtime.create()
1212
let chatAPI = ChatAPI(realtime: realtime)
1313
let channel = MockRealtimeChannel()
14-
let defaultMessages = await DefaultMessages(channel: channel, chatAPI: chatAPI, roomID: "basketball", clientID: "clientId")
14+
let featureChannel = MockFeatureChannel(channel: channel)
15+
let defaultMessages = await DefaultMessages(featureChannel: featureChannel, chatAPI: chatAPI, roomID: "basketball", clientID: "clientId")
1516

1617
// Then
1718
await #expect(throws: ARTErrorInfo.create(withCode: 40000, status: 400, message: "channel is attached, but channelSerial is not defined"), performing: {
@@ -28,7 +29,8 @@ struct DefaultMessagesTests {
2829
let realtime = MockRealtime.create { (MockHTTPPaginatedResponse.successGetMessagesWithNoItems, nil) }
2930
let chatAPI = ChatAPI(realtime: realtime)
3031
let channel = MockRealtimeChannel()
31-
let defaultMessages = await DefaultMessages(channel: channel, chatAPI: chatAPI, roomID: "basketball", clientID: "clientId")
32+
let featureChannel = MockFeatureChannel(channel: channel)
33+
let defaultMessages = await DefaultMessages(featureChannel: featureChannel, chatAPI: chatAPI, roomID: "basketball", clientID: "clientId")
3234

3335
// Then
3436
await #expect(throws: Never.self, performing: {
@@ -52,7 +54,8 @@ struct DefaultMessagesTests {
5254
channelSerial: "001"
5355
)
5456
)
55-
let defaultMessages = await DefaultMessages(channel: channel, chatAPI: chatAPI, roomID: "basketball", clientID: "clientId")
57+
let featureChannel = MockFeatureChannel(channel: channel)
58+
let defaultMessages = await DefaultMessages(featureChannel: featureChannel, chatAPI: chatAPI, roomID: "basketball", clientID: "clientId")
5659
let subscription = try await defaultMessages.subscribe(bufferingPolicy: .unbounded)
5760
let expectedPaginatedResult = PaginatedResultWrapper<Message>(
5861
paginatedResponse: MockHTTPPaginatedResponse.successGetMessagesWithNoItems,
@@ -65,4 +68,24 @@ struct DefaultMessagesTests {
6568
// Then
6669
#expect(previousMessages == expectedPaginatedResult)
6770
}
71+
72+
// @spec CHA-M7
73+
@Test
74+
func subscribeToDiscontinuities() async throws {
75+
// Given: A DefaultMessages instance
76+
let realtime = MockRealtime.create()
77+
let chatAPI = ChatAPI(realtime: realtime)
78+
let channel = MockRealtimeChannel()
79+
let featureChannel = MockFeatureChannel(channel: channel)
80+
let messages = await DefaultMessages(featureChannel: featureChannel, chatAPI: chatAPI, roomID: "basketball", clientID: "clientId")
81+
82+
// When: The feature channel emits a discontinuity through `subscribeToDiscontinuities`
83+
let featureChannelDiscontinuity = ARTErrorInfo.createUnknownError() // arbitrary
84+
let messagesDiscontinuitySubscription = await messages.subscribeToDiscontinuities()
85+
await featureChannel.emitDiscontinuity(featureChannelDiscontinuity)
86+
87+
// Then: The DefaultMessages instance emits this discontinuity through `subscribeToDiscontinuities`
88+
let messagesDiscontinuity = try #require(await messagesDiscontinuitySubscription.first { _ in true })
89+
#expect(messagesDiscontinuity === featureChannelDiscontinuity)
90+
}
6891
}
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
import Ably
2+
@testable import AblyChat
3+
4+
final actor MockFeatureChannel: FeatureChannel {
5+
let channel: RealtimeChannelProtocol
6+
// TODO: clean up old subscriptions (https://github.com/ably-labs/ably-chat-swift/issues/36)
7+
private var discontinuitySubscriptions: [Subscription<ARTErrorInfo>] = []
8+
9+
init(channel: RealtimeChannelProtocol) {
10+
self.channel = channel
11+
}
12+
13+
func subscribeToDiscontinuities() async -> Subscription<ARTErrorInfo> {
14+
let subscription = Subscription<ARTErrorInfo>(bufferingPolicy: .unbounded)
15+
discontinuitySubscriptions.append(subscription)
16+
return subscription
17+
}
18+
19+
func emitDiscontinuity(_ discontinuity: ARTErrorInfo) {
20+
for subscription in discontinuitySubscriptions {
21+
subscription.emit(discontinuity)
22+
}
23+
}
24+
}

0 commit comments

Comments
 (0)