3232import reactor .test .scheduler .VirtualTimeScheduler ;
3333
3434import static org .hamcrest .CoreMatchers .equalTo ;
35+ import static org .junit .Assume .assumeThat ;
3536
3637public class ReactorNetty4StreamingStressIT extends OpenSearchRestTestCase {
3738 @ After
@@ -48,12 +49,13 @@ public void tearDown() throws Exception {
4849
4950 @ Override
5051 protected Settings restClientSettings () {
51- return Settings .builder ().put (super .restClientSettings ()).put (CLIENT_SOCKET_TIMEOUT , "5s " ).build ();
52+ return Settings .builder ().put (super .restClientSettings ()).put (CLIENT_SOCKET_TIMEOUT , "10s " ).build ();
5253 }
5354
5455 public void testCloseClientStreamingRequest () throws Exception {
55- final VirtualTimeScheduler scheduler = VirtualTimeScheduler . create ( true );
56+ assumeThat ( "The OpenSearch is not ready" , isServiceReady (), equalTo ( true ) );
5657
58+ final VirtualTimeScheduler scheduler = VirtualTimeScheduler .create (true );
5759 final AtomicInteger id = new AtomicInteger (0 );
5860 final Stream <String > stream = Stream .generate (
5961 () -> "{ \" index\" : { \" _index\" : \" test-stress-streaming\" , \" _id\" : \" "
@@ -74,7 +76,7 @@ public void testCloseClientStreamingRequest() throws Exception {
7476 scheduler .advanceTimeBy (delay ); /* emit first element */
7577
7678 StepVerifier .create (
77- Flux .from (streamingResponse .getBody ()).timeout (Duration .ofSeconds (5 )).map (b -> new String (b .array (), StandardCharsets .UTF_8 ))
79+ Flux .from (streamingResponse .getBody ()).timeout (Duration .ofSeconds (10 )).map (b -> new String (b .array (), StandardCharsets .UTF_8 ))
7880 ).expectNextMatches (s -> s .contains ("\" result\" :\" created\" " ) && s .contains ("\" _id\" :\" 1\" " )).then (() -> {
7981 try {
8082 client ().close ();
@@ -88,4 +90,14 @@ public void testCloseClientStreamingRequest() throws Exception {
8890 )
8991 .verify (Duration .ofSeconds (10 ));
9092 }
93+
94+ private boolean isServiceReady () {
95+ try {
96+ final Response reponse = client ().performRequest (new Request ("GET" , "/" ));
97+ return reponse .getStatusLine ().getStatusCode () == 200 ;
98+ } catch (final IOException ex ) {
99+ return false ;
100+ }
101+ }
102+
91103}
0 commit comments