4141
4242namespace RabbitMQ . Client . Impl
4343{
44- internal abstract class AsyncRpcContinuation < T > : IRpcContinuation , IDisposable
44+ internal abstract class AsyncRpcContinuation < T > : IRpcContinuation
4545 {
4646 private readonly CancellationTokenSource _cancellationTokenSource ;
4747 private readonly CancellationTokenRegistration _cancellationTokenRegistration ;
@@ -101,7 +101,7 @@ public ConfiguredTaskAwaitable<T>.ConfiguredTaskAwaiter GetAwaiter()
101101 return _tcsConfiguredTaskAwaitable . GetAwaiter ( ) ;
102102 }
103103
104- public abstract void HandleCommand ( in IncomingCommand cmd ) ;
104+ public abstract Task HandleCommandAsync ( IncomingCommand cmd ) ;
105105
106106 public virtual void HandleChannelShutdown ( ShutdownEventArgs reason )
107107 {
@@ -135,7 +135,7 @@ public ConnectionSecureOrTuneAsyncRpcContinuation(TimeSpan continuationTimeout)
135135 {
136136 }
137137
138- public override void HandleCommand ( in IncomingCommand cmd )
138+ public override Task HandleCommandAsync ( IncomingCommand cmd )
139139 {
140140 try
141141 {
@@ -156,6 +156,8 @@ public override void HandleCommand(in IncomingCommand cmd)
156156 {
157157 _tcs . SetException ( new InvalidOperationException ( $ "Received unexpected command of type { cmd . CommandId } !") ) ;
158158 }
159+
160+ return Task . CompletedTask ;
159161 }
160162 finally
161163 {
@@ -173,7 +175,7 @@ public SimpleAsyncRpcContinuation(ProtocolCommandId expectedCommandId, TimeSpan
173175 _expectedCommandId = expectedCommandId ;
174176 }
175177
176- public override void HandleCommand ( in IncomingCommand cmd )
178+ public override Task HandleCommandAsync ( IncomingCommand cmd )
177179 {
178180 try
179181 {
@@ -185,6 +187,8 @@ public override void HandleCommand(in IncomingCommand cmd)
185187 {
186188 _tcs . SetException ( new InvalidOperationException ( $ "Received unexpected command of type { cmd . CommandId } !") ) ;
187189 }
190+
191+ return Task . CompletedTask ;
188192 }
189193 finally
190194 {
@@ -205,7 +209,7 @@ public BasicCancelAsyncRpcContinuation(string consumerTag, IConsumerDispatcher c
205209 _consumerDispatcher = consumerDispatcher ;
206210 }
207211
208- public override void HandleCommand ( in IncomingCommand cmd )
212+ public override async Task HandleCommandAsync ( IncomingCommand cmd )
209213 {
210214 try
211215 {
@@ -214,7 +218,8 @@ public override void HandleCommand(in IncomingCommand cmd)
214218 var method = new Client . Framing . Impl . BasicCancelOk ( cmd . MethodSpan ) ;
215219 _tcs . TrySetResult ( true ) ;
216220 Debug . Assert ( _consumerTag == method . _consumerTag ) ;
217- _consumerDispatcher . HandleBasicCancelOk ( _consumerTag ) ;
221+ await _consumerDispatcher . HandleBasicCancelOkAsync ( _consumerTag , CancellationToken )
222+ . ConfigureAwait ( false ) ;
218223 }
219224 else
220225 {
@@ -240,15 +245,16 @@ public BasicConsumeAsyncRpcContinuation(IBasicConsumer consumer, IConsumerDispat
240245 _consumerDispatcher = consumerDispatcher ;
241246 }
242247
243- public override void HandleCommand ( in IncomingCommand cmd )
248+ public override async Task HandleCommandAsync ( IncomingCommand cmd )
244249 {
245250 try
246251 {
247252 if ( cmd . CommandId == ProtocolCommandId . BasicConsumeOk )
248253 {
249254 var method = new Client . Framing . Impl . BasicConsumeOk ( cmd . MethodSpan ) ;
250255 _tcs . TrySetResult ( method . _consumerTag ) ;
251- _consumerDispatcher . HandleBasicConsumeOk ( _consumer , method . _consumerTag ) ;
256+ await _consumerDispatcher . HandleBasicConsumeOkAsync ( _consumer , method . _consumerTag , CancellationToken )
257+ . ConfigureAwait ( false ) ;
252258 }
253259 else
254260 {
@@ -272,7 +278,7 @@ public BasicGetAsyncRpcContinuation(Func<ulong, ulong> adjustDeliveryTag, TimeSp
272278 _adjustDeliveryTag = adjustDeliveryTag ;
273279 }
274280
275- public override void HandleCommand ( in IncomingCommand cmd )
281+ public override Task HandleCommandAsync ( IncomingCommand cmd )
276282 {
277283 try
278284 {
@@ -300,6 +306,8 @@ public override void HandleCommand(in IncomingCommand cmd)
300306 {
301307 _tcs . SetException ( new InvalidOperationException ( $ "Received unexpected command of type { cmd . CommandId } !") ) ;
302308 }
309+
310+ return Task . CompletedTask ;
303311 }
304312 finally
305313 {
@@ -389,7 +397,7 @@ public QueueDeclareAsyncRpcContinuation(TimeSpan continuationTimeout) : base(con
389397 {
390398 }
391399
392- public override void HandleCommand ( in IncomingCommand cmd )
400+ public override Task HandleCommandAsync ( IncomingCommand cmd )
393401 {
394402 try
395403 {
@@ -403,6 +411,8 @@ public override void HandleCommand(in IncomingCommand cmd)
403411 {
404412 _tcs . SetException ( new InvalidOperationException ( $ "Received unexpected command of type { cmd . CommandId } !") ) ;
405413 }
414+
415+ return Task . CompletedTask ;
406416 }
407417 finally
408418 {
@@ -433,7 +443,7 @@ public QueueDeleteAsyncRpcContinuation(TimeSpan continuationTimeout) : base(cont
433443 {
434444 }
435445
436- public override void HandleCommand ( in IncomingCommand cmd )
446+ public override Task HandleCommandAsync ( IncomingCommand cmd )
437447 {
438448 try
439449 {
@@ -446,6 +456,8 @@ public override void HandleCommand(in IncomingCommand cmd)
446456 {
447457 _tcs . SetException ( new InvalidOperationException ( $ "Received unexpected command of type { cmd . CommandId } !") ) ;
448458 }
459+
460+ return Task . CompletedTask ;
449461 }
450462 finally
451463 {
@@ -460,7 +472,7 @@ public QueuePurgeAsyncRpcContinuation(TimeSpan continuationTimeout) : base(conti
460472 {
461473 }
462474
463- public override void HandleCommand ( in IncomingCommand cmd )
475+ public override Task HandleCommandAsync ( IncomingCommand cmd )
464476 {
465477 try
466478 {
@@ -473,6 +485,8 @@ public override void HandleCommand(in IncomingCommand cmd)
473485 {
474486 _tcs . SetException ( new InvalidOperationException ( $ "Received unexpected command of type { cmd . CommandId } !") ) ;
475487 }
488+
489+ return Task . CompletedTask ;
476490 }
477491 finally
478492 {
0 commit comments