@@ -180,7 +180,7 @@ public virtual ElasticsearchResponse<TReturn> Request<TReturn>(RequestData reque
180180 }
181181
182182
183- private static void RegisterApmTaskTimeout ( IAsyncResult result , WebRequest request , RequestData requestData ) =>
183+ private static RegisteredWaitHandle RegisterApmTaskTimeout ( IAsyncResult result , WebRequest request , RequestData requestData ) =>
184184 ThreadPool . RegisterWaitForSingleObject ( result . AsyncWaitHandle , TimeoutCallback , request , requestData . RequestTimeout , true ) ;
185185
186186 private static void TimeoutCallback ( object state , bool timedOut )
@@ -189,69 +189,82 @@ private static void TimeoutCallback(object state, bool timedOut)
189189 ( state as WebRequest ) ? . Abort ( ) ;
190190 }
191191
192- public virtual async Task < ElasticsearchResponse < TReturn > > RequestAsync < TReturn > ( RequestData requestData , CancellationToken cancellationToken ) where TReturn : class
192+ public virtual async Task < ElasticsearchResponse < TReturn > > RequestAsync < TReturn > ( RequestData requestData ,
193+ CancellationToken cancellationToken ) where TReturn : class
193194 {
194195 var builder = new ResponseBuilder < TReturn > ( requestData , cancellationToken ) ;
196+ WaitHandle apmWaitHandle = null ;
197+ RegisteredWaitHandle apmTaskTimeout = null ;
195198 try
196199 {
197- var request = this . CreateHttpWebRequest ( requestData ) ;
198- cancellationToken . Register ( ( ) => request . Abort ( ) ) ;
199200 var data = requestData . PostData ;
200-
201- if ( data != null )
201+ var request = this . CreateHttpWebRequest ( requestData ) ;
202+ using ( cancellationToken . Register ( ( ) => request . Abort ( ) ) )
202203 {
203- var apmGetRequestStreamTask = Task . Factory . FromAsync ( request . BeginGetRequestStream , request . EndGetRequestStream , null ) ;
204- RegisterApmTaskTimeout ( apmGetRequestStreamTask , request , requestData ) ;
205-
206- using ( var stream = await apmGetRequestStreamTask . ConfigureAwait ( false ) )
207- {
208- if ( requestData . HttpCompression )
209- using ( var zipStream = new GZipStream ( stream , CompressionMode . Compress ) )
210- await data . WriteAsync ( zipStream , requestData . ConnectionSettings , cancellationToken ) . ConfigureAwait ( false ) ;
211- else
212- await data . WriteAsync ( stream , requestData . ConnectionSettings , cancellationToken ) . ConfigureAwait ( false ) ;
213- }
204+ if ( data != null )
205+ await PostRequestAsync ( requestData , cancellationToken , request , data ) ;
206+ requestData . MadeItToResponse = true ;
207+ //http://msdn.microsoft.com/en-us/library/system.net.httpwebresponse.getresponsestream.aspx
208+ //Either the stream or the response object needs to be closed but not both although it won't
209+ //throw any errors if both are closed atleast one of them has to be Closed.
210+ //Since we expose the stream we let closing the stream determining when to close the connection
211+
212+ var apmGetResponseTask = Task . Factory . FromAsync ( request . BeginGetResponse , request . EndGetResponse , null ) ;
213+ apmWaitHandle = ( ( IAsyncResult ) apmGetResponseTask ) . AsyncWaitHandle ;
214+ apmTaskTimeout = RegisterApmTaskTimeout ( apmGetResponseTask , request , requestData ) ;
215+
216+ var response = ( HttpWebResponse ) ( await apmGetResponseTask . ConfigureAwait ( false ) ) ;
217+ builder . StatusCode = ( int ) response . StatusCode ;
218+ builder . Stream = response . GetResponseStream ( ) ;
219+ if ( response . SupportsHeaders && response . Headers . HasKeys ( ) && response . Headers . AllKeys . Contains ( "Warning" ) )
220+ builder . DeprecationWarnings = response . Headers . GetValues ( "Warning" ) ;
221+ // https://github.com/elastic/elasticsearch-net/issues/2311
222+ // if stream is null call dispose on response instead.
223+ if ( builder . Stream == null || builder . Stream == Stream . Null ) response . Dispose ( ) ;
224+ if ( apmWaitHandle != null ) apmTaskTimeout ? . Unregister ( apmWaitHandle ) ;
214225 }
215- requestData . MadeItToResponse = true ;
216-
217- //http://msdn.microsoft.com/en-us/library/system.net.httpwebresponse.getresponsestream.aspx
218- //Either the stream or the response object needs to be closed but not both although it won't
219- //throw any errors if both are closed atleast one of them has to be Closed.
220- //Since we expose the stream we let closing the stream determining when to close the connection
221-
222- var apmGetResponseTask = Task . Factory . FromAsync ( request . BeginGetResponse , request . EndGetResponse , null ) ;
223- RegisterApmTaskTimeout ( apmGetResponseTask , request , requestData ) ;
224-
225- var response = ( HttpWebResponse ) ( await apmGetResponseTask . ConfigureAwait ( false ) ) ;
226- builder . StatusCode = ( int ) response . StatusCode ;
227- builder . Stream = response . GetResponseStream ( ) ;
228- if ( response . SupportsHeaders && response . Headers . HasKeys ( ) && response . Headers . AllKeys . Contains ( "Warning" ) )
229- builder . DeprecationWarnings = response . Headers . GetValues ( "Warning" ) ;
230- // https://github.com/elastic/elasticsearch-net/issues/2311
231- // if stream is null call dispose on response instead.
232- if ( builder . Stream == null || builder . Stream == Stream . Null ) response . Dispose ( ) ;
233226 }
234227 catch ( WebException e )
235228 {
229+ if ( apmWaitHandle != null ) apmTaskTimeout ? . Unregister ( apmWaitHandle ) ;
236230 HandleException ( builder , e ) ;
237231 }
238-
232+ catch
233+ {
234+ if ( apmWaitHandle != null ) apmTaskTimeout ? . Unregister ( apmWaitHandle ) ;
235+ throw ;
236+ }
239237 return await builder . ToResponseAsync ( ) . ConfigureAwait ( false ) ;
240238 }
241239
240+ private static async Task PostRequestAsync ( RequestData requestData , CancellationToken cancellationToken , HttpWebRequest request ,
241+ PostData < object > data )
242+ {
243+ var apmGetRequestStreamTask = Task . Factory . FromAsync ( request . BeginGetRequestStream , request . EndGetRequestStream , null ) ;
244+ var getRequestStreamCancellationHandle = RegisterApmTaskTimeout ( apmGetRequestStreamTask , request , requestData ) ;
245+
246+ using ( var stream = await apmGetRequestStreamTask . ConfigureAwait ( false ) )
247+ {
248+ if ( requestData . HttpCompression )
249+ using ( var zipStream = new GZipStream ( stream , CompressionMode . Compress ) )
250+ await data . WriteAsync ( zipStream , requestData . ConnectionSettings , cancellationToken ) . ConfigureAwait ( false ) ;
251+ else
252+ await data . WriteAsync ( stream , requestData . ConnectionSettings , cancellationToken ) . ConfigureAwait ( false ) ;
253+ }
254+ getRequestStreamCancellationHandle . Unregister ( ( ( IAsyncResult ) apmGetRequestStreamTask ) . AsyncWaitHandle ) ;
255+ }
256+
242257 private void HandleException < TReturn > ( ResponseBuilder < TReturn > builder , WebException exception )
243258 where TReturn : class
244259 {
245260 builder . Exception = exception ;
246261 var response = exception . Response as HttpWebResponse ;
247- if ( response != null )
248- {
249- builder . StatusCode = ( int ) response . StatusCode ;
250- builder . Stream = response . GetResponseStream ( ) ;
251- // https://github.com/elastic/elasticsearch-net/issues/2311
252- // if stream is null call dispose on response instead.
253- if ( builder . Stream == null || builder . Stream == Stream . Null ) response . Dispose ( ) ;
254- }
262+ if ( response == null ) return ;
263+ builder . StatusCode = ( int ) response . StatusCode ;
264+ builder . Stream = response . GetResponseStream ( ) ;
265+ // https://github.com/elastic/elasticsearch-net/issues/2311
266+ // if stream is null call dispose on response instead.
267+ if ( builder . Stream == null || builder . Stream == Stream . Null ) response . Dispose ( ) ;
255268 }
256269
257270 void IDisposable . Dispose ( ) => this . DisposeManagedResources ( ) ;
0 commit comments