1313import org .opensearch .client .Response ;
1414import org .opensearch .client .StreamingRequest ;
1515import org .opensearch .client .StreamingResponse ;
16+ import org .opensearch .common .settings .Settings ;
1617import org .opensearch .test .rest .OpenSearchRestTestCase ;
1718import org .junit .After ;
1819
2223import java .nio .ByteBuffer ;
2324import java .nio .charset .StandardCharsets ;
2425import java .time .Duration ;
26+ import java .util .concurrent .TimeoutException ;
2527import java .util .concurrent .atomic .AtomicInteger ;
2628import java .util .stream .Stream ;
2729
@@ -44,6 +46,11 @@ public void tearDown() throws Exception {
4446 super .tearDown ();
4547 }
4648
49+ @ Override
50+ protected Settings restClientSettings () {
51+ return Settings .builder ().put (super .restClientSettings ()).put (CLIENT_SOCKET_TIMEOUT , "5s" ).build ();
52+ }
53+
4754 public void testCloseClientStreamingRequest () throws Exception {
4855 final VirtualTimeScheduler scheduler = VirtualTimeScheduler .create (true );
4956
@@ -66,17 +73,19 @@ public void testCloseClientStreamingRequest() throws Exception {
6673 final StreamingResponse <ByteBuffer > streamingResponse = client ().streamRequest (streamingRequest );
6774 scheduler .advanceTimeBy (delay ); /* emit first element */
6875
69- StepVerifier .create (Flux . from ( streamingResponse . getBody ()). map ( b -> new String ( b . array (), StandardCharsets . UTF_8 )))
70- . expectNextMatches ( s -> s . contains ( " \" result \" : \" created \" " ) && s . contains ( " \" _id \" : \" 1 \" " ))
71- .then (() -> {
72- try {
73- client ().close ();
74- } catch (final IOException ex ) {
75- throw new UncheckedIOException (ex );
76- }
77- })
76+ StepVerifier .create (
77+ Flux . from ( streamingResponse . getBody ()). timeout ( Duration . ofSeconds ( 5 )). map ( b -> new String ( b . array (), StandardCharsets . UTF_8 ))
78+ ). expectNextMatches ( s -> s . contains ( " \" result \" : \" created \" " ) && s . contains ( " \" _id \" : \" 1 \" " )) .then (() -> {
79+ try {
80+ client ().close ();
81+ } catch (final IOException ex ) {
82+ throw new UncheckedIOException (ex );
83+ }
84+ })
7885 .then (() -> scheduler .advanceTimeBy (delay ))
79- .expectErrorMatches (t -> t instanceof InterruptedIOException || t instanceof ConnectionClosedException )
86+ .expectErrorMatches (
87+ t -> t instanceof InterruptedIOException || t instanceof ConnectionClosedException || t instanceof TimeoutException
88+ )
8089 .verify (Duration .ofSeconds (10 ));
8190 }
8291}
0 commit comments