@@ -38,8 +38,8 @@ final class WebSocket: NSObject, URLSessionWebSocketDelegate, WebSocketClient, @
3838 private let logger : ( any SupabaseLogger ) ?
3939
4040 struct MutableState {
41- var task : URLSessionWebSocketTask ?
4241 var continuation : AsyncStream < ConnectionStatus > . Continuation ?
42+ var stream : SocketStream ?
4343 }
4444
4545 let mutableState = LockIsolated ( MutableState ( ) )
@@ -56,8 +56,9 @@ final class WebSocket: NSObject, URLSessionWebSocketDelegate, WebSocketClient, @
5656 func connect( ) -> AsyncStream < ConnectionStatus > {
5757 mutableState. withValue { state in
5858 let session = URLSession ( configuration: configuration, delegate: self , delegateQueue: nil )
59- state. task = session. webSocketTask ( with: realtimeURL)
60- state. task? . resume ( )
59+ let task = session. webSocketTask ( with: realtimeURL)
60+ state. stream = SocketStream ( task: task)
61+ task. resume ( )
6162
6263 let ( stream, continuation) = AsyncStream< ConnectionStatus> . makeStream( )
6364 state. continuation = continuation
@@ -67,49 +68,48 @@ final class WebSocket: NSObject, URLSessionWebSocketDelegate, WebSocketClient, @
6768
6869 func disconnect( closeCode: URLSessionWebSocketTask . CloseCode ) {
6970 mutableState. withValue { state in
70- state. task ? . cancel ( with: closeCode, reason : nil )
71+ state. stream ? . cancel ( with: closeCode)
7172 }
7273 }
7374
7475 func receive( ) -> AsyncThrowingStream < RealtimeMessageV2 , any Error > {
75- let ( stream, continuation) = AsyncThrowingStream < RealtimeMessageV2 , any Error > . makeStream ( )
76-
77- Task {
78- while let message = try await mutableState. task? . receive ( ) {
79- do {
80- switch message {
81- case let . string( stringMessage) :
82- logger? . verbose ( " Received message: \( stringMessage) " )
83-
84- guard let data = stringMessage. data ( using: . utf8) else {
85- throw RealtimeError ( " Expected a UTF8 encoded message. " )
86- }
87-
88- let message = try JSONDecoder ( ) . decode ( RealtimeMessageV2 . self, from: data)
89- continuation. yield ( message)
90-
91- case . data:
92- fallthrough
93- default :
94- throw RealtimeError ( " Unsupported message type. " )
76+ mutableState. withValue { mutableState in
77+ guard let stream = mutableState. stream else {
78+ return . finished(
79+ throwing: RealtimeError (
80+ " receive() called before connect(). Make sure to call `connect()` before calling `receive()`. "
81+ )
82+ )
83+ }
84+
85+ return stream. map { message in
86+ switch message {
87+ case let . string( stringMessage) :
88+ self . logger? . verbose ( " Received message: \( stringMessage) " )
89+
90+ guard let data = stringMessage. data ( using: . utf8) else {
91+ throw RealtimeError ( " Expected a UTF8 encoded message. " )
9592 }
96- } catch {
97- continuation. finish ( throwing: error)
93+
94+ let message = try JSONDecoder ( ) . decode ( RealtimeMessageV2 . self, from: data)
95+ return message
96+
97+ case . data:
98+ fallthrough
99+ default :
100+ throw RealtimeError ( " Unsupported message type. " )
98101 }
99102 }
100-
101- continuation. finish ( )
103+ . eraseToThrowingStream ( )
102104 }
103-
104- return stream
105105 }
106106
107107 func send( _ message: RealtimeMessageV2 ) async throws {
108108 let data = try JSONEncoder ( ) . encode ( message)
109109 let string = String ( decoding: data, as: UTF8 . self)
110110
111111 logger? . verbose ( " Sending message: \( string) " )
112- try await mutableState. task ? . send ( . string( string) )
112+ try await mutableState. stream ? . send ( . string( string) )
113113 }
114114
115115 // MARK: - URLSessionWebSocketDelegate
@@ -144,3 +144,74 @@ final class WebSocket: NSObject, URLSessionWebSocketDelegate, WebSocketClient, @
144144 mutableState. continuation? . yield ( . error( error) )
145145 }
146146}
147+
148+ typealias WebSocketStream = AsyncThrowingStream < URLSessionWebSocketTask . Message , any Error >
149+
150+ final class SocketStream : AsyncSequence , Sendable {
151+ typealias AsyncIterator = WebSocketStream . Iterator
152+ typealias Element = URLSessionWebSocketTask . Message
153+
154+ struct MutableState {
155+ var continuation : WebSocketStream . Continuation ?
156+ var stream : WebSocketStream ?
157+ }
158+
159+ private let task : URLSessionWebSocketTask
160+ private let mutableState = LockIsolated ( MutableState ( ) )
161+
162+ private func makeStreamIfNeeded( ) -> WebSocketStream {
163+ mutableState. withValue { state in
164+ if let stream = state. stream {
165+ return stream
166+ }
167+
168+ let stream = WebSocketStream { continuation in
169+ state. continuation = continuation
170+ waitForNextValue ( )
171+ }
172+
173+ state. stream = stream
174+ return stream
175+ }
176+ }
177+
178+ private func waitForNextValue( ) {
179+ guard task. closeCode == . invalid else {
180+ mutableState. continuation? . finish ( )
181+ return
182+ }
183+
184+ task. receive { [ weak self] result in
185+ guard let continuation = self ? . mutableState. continuation else { return }
186+
187+ do {
188+ let message = try result. get ( )
189+ continuation. yield ( message)
190+ self ? . waitForNextValue ( )
191+ } catch {
192+ continuation. finish ( throwing: error)
193+ }
194+ }
195+ }
196+
197+ init ( task: URLSessionWebSocketTask ) {
198+ self . task = task
199+ }
200+
201+ deinit {
202+ mutableState. continuation? . finish ( )
203+ }
204+
205+ func makeAsyncIterator( ) -> WebSocketStream . Iterator {
206+ makeStreamIfNeeded ( ) . makeAsyncIterator ( )
207+ }
208+
209+ func cancel( with closeCode: URLSessionWebSocketTask . CloseCode = . goingAway) {
210+ task. cancel ( with: closeCode, reason: nil )
211+ mutableState. continuation? . finish ( )
212+ }
213+
214+ func send( _ message: URLSessionWebSocketTask . Message ) async throws {
215+ try await task. send ( message)
216+ }
217+ }
0 commit comments