3030//---------------------------------------------------------------------------
3131
3232using System ;
33- using System . Collections . Generic ;
3433using System . Threading ;
3534using System . Threading . Tasks ;
3635using RabbitMQ . Client ;
@@ -49,14 +48,6 @@ public TestConnectionBlockedChannelLeak(ITestOutputHelper output) : base(output)
4948 public override async Task InitializeAsync ( )
5049 {
5150 await UnblockAsync ( ) ;
52- _connFactory = new ConnectionFactory
53- {
54- AutomaticRecoveryEnabled = true ,
55- ClientProvidedName = _testDisplayName ,
56- ContinuationTimeout = TimeSpan . FromSeconds ( 2 )
57- } ;
58- _conn = await _connFactory . CreateConnectionAsync ( ) ;
59- _channel = await _conn . CreateChannelAsync ( ) ;
6051 }
6152
6253 public override async Task DisposeAsync ( )
@@ -68,49 +59,55 @@ public override async Task DisposeAsync()
6859 [ Fact ]
6960 public async Task TestConnectionBlockedChannelLeak_GH1573 ( )
7061 {
71- string exchangeName = GenerateExchangeName ( ) ;
62+ await BlockAsync ( ) ;
7263
73- var tcs = new TaskCompletionSource < bool > ( TaskCreationOptions . RunContinuationsAsynchronously ) ;
64+ var connectionBlockedTcs = new TaskCompletionSource < bool > ( TaskCreationOptions . RunContinuationsAsynchronously ) ;
65+ var connectionUnblockedTcs = new TaskCompletionSource < bool > ( TaskCreationOptions . RunContinuationsAsynchronously ) ;
7466
7567 using var cts = new CancellationTokenSource ( WaitSpan ) ;
7668 using CancellationTokenRegistration ctr = cts . Token . Register ( ( ) =>
7769 {
78- tcs . TrySetCanceled ( ) ;
70+ connectionBlockedTcs . TrySetCanceled ( ) ;
71+ connectionUnblockedTcs . TrySetCanceled ( ) ;
7972 } ) ;
8073
74+ _connFactory = new ConnectionFactory
75+ {
76+ AutomaticRecoveryEnabled = true ,
77+ ClientProvidedName = _testDisplayName ,
78+ ContinuationTimeout = TimeSpan . FromSeconds ( 2 )
79+ } ;
80+ _conn = await _connFactory . CreateConnectionAsync ( ) ;
81+ _channel = await _conn . CreateChannelAsync ( ) ;
82+
83+ string exchangeName = GenerateExchangeName ( ) ;
84+
8185 _conn . ConnectionBlocked += ( object sender , ConnectionBlockedEventArgs args ) =>
8286 {
83- UnblockAsync ( ) ;
87+ connectionBlockedTcs . SetResult ( true ) ;
8488 } ;
8589
8690 _conn . ConnectionUnblocked += ( object sender , EventArgs ea ) =>
8791 {
88- tcs . SetResult ( true ) ;
92+ connectionUnblockedTcs . SetResult ( true ) ;
8993 } ;
9094
91- await BlockAsync ( _channel ) ;
92-
93- using ( IChannel publishChannel = await _conn . CreateChannelAsync ( ) )
95+ async Task ExchangeDeclareAndPublish ( )
9496 {
95- await publishChannel . ExchangeDeclareAsync ( exchangeName , ExchangeType . Direct , autoDelete : true ) ;
96- await publishChannel . BasicPublishAsync ( exchangeName , exchangeName , GetRandomBody ( ) , mandatory : true ) ;
97- await publishChannel . CloseAsync ( ) ;
97+ using ( IChannel publishChannel = await _conn . CreateChannelAsync ( ) )
98+ {
99+ await publishChannel . ExchangeDeclareAsync ( exchangeName , ExchangeType . Direct , autoDelete : true ) ;
100+ await publishChannel . BasicPublishAsync ( exchangeName , exchangeName , GetRandomBody ( ) , mandatory : true ) ;
101+ await publishChannel . CloseAsync ( ) ;
102+ }
98103 }
104+ await Assert . ThrowsAnyAsync < OperationCanceledException > ( ExchangeDeclareAndPublish ) ;
99105
100- var channels = new List < IChannel > ( ) ;
101106 for ( int i = 1 ; i <= 5 ; i ++ )
102107 {
103- IChannel c = await _conn . CreateChannelAsync ( ) ;
104- channels . Add ( c ) ;
108+ await Assert . ThrowsAnyAsync < OperationCanceledException > ( ( ) => _conn . CreateChannelAsync ( ) ) ;
105109 }
106110
107- /*
108- * Note:
109- * This wait probably isn't necessary, if the above CreateChannelAsync
110- * calls were to timeout, we'd get exceptions on the await
111- */
112- await Task . Delay ( TimeSpan . FromSeconds ( 5 ) ) ;
113-
114111 // Note: debugging
115112 // var rmq = new RabbitMQCtl(_output);
116113 // string output = await rmq.ExecRabbitMQCtlAsync("list_channels");
@@ -121,7 +118,8 @@ public async Task TestConnectionBlockedChannelLeak_GH1573()
121118 // output = await rmq.ExecRabbitMQCtlAsync("list_channels");
122119 // _output.WriteLine("CHANNELS 1: {0}", output);
123120
124- Assert . True ( await tcs . Task , "Unblock notification not received." ) ;
121+ Assert . True ( await connectionBlockedTcs . Task , "Blocked notification not received." ) ;
122+ Assert . True ( await connectionUnblockedTcs . Task , "Unblocked notification not received." ) ;
125123 }
126124 }
127125}
0 commit comments