@@ -91,9 +91,101 @@ public final class RealtimeChannelV2: Sendable, RealtimeChannelProtocol {
9191    callbackManager. reset ( ) 
9292  } 
9393
94-   /// Subscribes to the channel
94+   /// Subscribes to the channel.
95+   public  func  subscribeWithError( )  async  throws  { 
96+     logger? . debug ( " Starting subscription to channel ' \( topic) ' (attempt 1/ \( socket. options. maxRetryAttempts) ) " ) 
97+ 
98+     status =  . subscribing
99+ 
100+     defer  { 
101+       // If the subscription fails, we need to set the status to unsubscribed
102+       // to avoid the channel being stuck in a subscribing state.
103+       if  status !=  . subscribed { 
104+         status =  . unsubscribed
105+       } 
106+     } 
107+ 
108+     var  attempts  =  0 
109+ 
110+     while  attempts <  socket. options. maxRetryAttempts { 
111+       attempts +=  1 
112+ 
113+       do  { 
114+         logger? . debug ( 
115+           " Attempting to subscribe to channel ' \( topic) ' (attempt  \( attempts) / \( socket. options. maxRetryAttempts) ) " 
116+         ) 
117+ 
118+         try await  withTimeout ( interval:  socket. options. timeoutInterval)  {  [ self ]  in 
119+           await  _subscribe ( ) 
120+         } 
121+ 
122+         logger? . debug ( " Successfully subscribed to channel ' \( topic) ' " ) 
123+         return 
124+ 
125+       }  catch  is TimeoutError  { 
126+         logger? . debug ( 
127+           " Subscribe timed out for channel ' \( topic) ' (attempt  \( attempts) / \( socket. options. maxRetryAttempts) ) " 
128+         ) 
129+ 
130+         if  attempts <  socket. options. maxRetryAttempts { 
131+           // Add exponential backoff with jitter
132+           let  delay  =  calculateRetryDelay ( for:  attempts) 
133+           logger? . debug ( 
134+             " Retrying subscription to channel ' \( topic) ' in  \( String ( format:  " %.2f " ,  delay) )  seconds... " 
135+           ) 
136+ 
137+           do  { 
138+             try await  _clock. sleep ( for:  delay) 
139+           }  catch  { 
140+             // If sleep is cancelled, break out of retry loop
141+             logger? . debug ( " Subscription retry cancelled for channel ' \( topic) ' " ) 
142+             throw  CancellationError ( ) 
143+           } 
144+         }  else  { 
145+           logger? . error ( 
146+             " Failed to subscribe to channel ' \( topic) ' after  \( socket. options. maxRetryAttempts)  attempts due to timeout " 
147+           ) 
148+         } 
149+       }  catch  is CancellationError  { 
150+         logger? . debug ( " Subscription retry cancelled for channel ' \( topic) ' " ) 
151+         throw  CancellationError ( ) 
152+       }  catch  { 
153+         preconditionFailure ( 
154+           " The only possible error here is TimeoutError or CancellationError, this should never happen. " 
155+         ) 
156+       } 
157+     } 
158+ 
159+     logger? . error ( " Subscription to channel ' \( topic) ' failed after  \( attempts)  attempts " ) 
160+     throw  RealtimeError . maxRetryAttemptsReached
161+   } 
162+ 
163+   /// Subscribes to the channel.
164+   @available ( * ,  deprecated,  message:  " Use `subscribeWithError` instead " )  
95165  @MainActor  
96166  public  func  subscribe( )  async  { 
167+     try ? await  subscribeWithError ( ) 
168+   } 
169+ 
170+   /// Calculates retry delay with exponential backoff and jitter
171+   private  func  calculateRetryDelay( for attempt:  Int )  ->  TimeInterval  { 
172+     let  baseDelay :  TimeInterval  =  1.0 
173+     let  maxDelay :  TimeInterval  =  30.0 
174+     let  backoffMultiplier :  Double  =  2.0 
175+ 
176+     let  exponentialDelay  =  baseDelay *  pow( backoffMultiplier,  Double ( attempt -  1 ) ) 
177+     let  cappedDelay  =  min ( exponentialDelay,  maxDelay) 
178+ 
179+     // Add jitter (±25% random variation) to prevent thundering herd
180+     let  jitterRange  =  cappedDelay *  0.25 
181+     let  jitter  =  Double . random ( in:  - jitterRange... jitterRange) 
182+ 
183+     return  max ( 0.1 ,  cappedDelay +  jitter) 
184+   } 
185+ 
186+   /// Subscribes to the channel
187+   @MainActor  
188+   private  func  _subscribe( )  async  { 
97189    if  socket. status !=  . connected { 
98190      if  socket. options. connectOnSubscribe !=  true  { 
99191        reportIssue ( 
@@ -104,7 +196,6 @@ public final class RealtimeChannelV2: Sendable, RealtimeChannelProtocol {
104196      await  socket. connect ( ) 
105197    } 
106198
107-     status =  . subscribing
108199    logger? . debug ( " Subscribing to channel  \( topic) " ) 
109200
110201    config. presence. enabled =  callbackManager. callbacks. contains ( where:  {  $0. isPresence } ) 
@@ -133,18 +224,7 @@ public final class RealtimeChannelV2: Sendable, RealtimeChannelProtocol {
133224      payload:  try ! JSONObject ( payload) 
134225    ) 
135226
136-     do  { 
137-       try await  withTimeout ( interval:  socket. options. timeoutInterval)  {  [ self ]  in 
138-         _ =  await  statusChange. first  {  @Sendable   in  $0 ==  . subscribed } 
139-       } 
140-     }  catch  { 
141-       if  error is TimeoutError  { 
142-         logger? . debug ( " Subscribe timed out. " ) 
143-         await  subscribe ( ) 
144-       }  else  { 
145-         logger? . error ( " Subscribe failed:  \( error) " ) 
146-       } 
147-     } 
227+     _ =  await  statusChange. first  {  @Sendable   in  $0 ==  . subscribed } 
148228  } 
149229
150230  public  func  unsubscribe( )  async  { 
@@ -183,13 +263,6 @@ public final class RealtimeChannelV2: Sendable, RealtimeChannelProtocol {
183263  @MainActor  
184264  public  func  broadcast( event:  String ,  message:  JSONObject )  async  { 
185265    if  status !=  . subscribed { 
186-       struct  Message :  Encodable  { 
187-         let  topic :  String 
188-         let  event :  String 
189-         let  payload :  JSONObject 
190-         let  `private` :  Bool 
191-       } 
192- 
193266      var  headers :  HTTPFields  =  [ . contentType:  " application/json " ] 
194267      if  let  apiKey =  socket. options. apikey { 
195268        headers [ . apiKey]  =  apiKey
@@ -198,23 +271,34 @@ public final class RealtimeChannelV2: Sendable, RealtimeChannelProtocol {
198271        headers [ . authorization]  =  " Bearer  \( accessToken) " 
199272      } 
200273
274+       struct  BroadcastMessagePayload :  Encodable  { 
275+         let  messages :  [ Message ] 
276+ 
277+         struct  Message :  Encodable  { 
278+           let  topic :  String 
279+           let  event :  String 
280+           let  payload :  JSONObject 
281+           let  `private` :  Bool 
282+         } 
283+       } 
284+ 
201285      let  task  =  Task  {  [ headers]  in 
202286        _ =  try ? await  socket. http. send ( 
203287          HTTPRequest ( 
204288            url:  socket. broadcastURL, 
205289            method:  . post, 
206290            headers:  headers, 
207291            body:  JSONEncoder ( ) . encode ( 
208-               [ 
209-                 " messages " :  [ 
210-                   Message ( 
292+               BroadcastMessagePayload ( 
293+                 messages:  [ 
294+                   BroadcastMessagePayload . Message ( 
211295                    topic:  topic, 
212296                    event:  event, 
213297                    payload:  message, 
214298                    private:  config. isPrivate
215299                  ) 
216300                ] 
217-               ] 
301+               ) 
218302            ) 
219303          ) 
220304        ) 
0 commit comments