1818 */ 
1919package  org .neo4j .driver .stress ;
2020
21+ import  org .reactivestreams .Publisher ;
2122import  reactor .core .publisher .Flux ;
23+ import  reactor .core .publisher .Mono ;
2224
2325import  java .util .concurrent .CompletableFuture ;
2426import  java .util .concurrent .CompletionStage ;
27+ import  java .util .concurrent .atomic .AtomicInteger ;
28+ import  java .util .function .Function ;
2529
26- import  org .neo4j .driver .AccessMode ;
2730import  org .neo4j .driver .Driver ;
2831import  org .neo4j .driver .internal .util .Futures ;
2932import  org .neo4j .driver .reactive .RxSession ;
3033import  org .neo4j .driver .reactive .RxTransaction ;
34+ import  org .neo4j .driver .summary .ResultSummary ;
3135
3236import  static  org .junit .jupiter .api .Assertions .assertEquals ;
3337
@@ -45,17 +49,32 @@ public RxWriteQueryInTx( AbstractStressTestBase<C> stressTest, Driver driver, bo
4549    public  CompletionStage <Void > execute ( C  context  )
4650    {
4751        CompletableFuture <Void > queryFinished  = new  CompletableFuture <>();
48-         RxSession  session  = newSession ( AccessMode .WRITE , context  );
49-         Flux .usingWhen ( session .beginTransaction (), tx  -> tx .run ( "CREATE ()"  ).consume (),
50-                 RxTransaction ::commit , ( tx , error  ) -> tx .rollback (), null  ).subscribe (
51-                 summary  -> {
52-                     context .setBookmark ( session .lastBookmark () );
53-                     assertEquals ( 1 , summary .counters ().nodesCreated () );
52+ 
53+         Function <RxSession ,Publisher <ResultSummary >> sessionToResultSummaryPublisher  = ( RxSession  session  ) -> Flux .usingWhen (
54+                 Mono .from ( session .beginTransaction () ),
55+                 tx  -> tx .run ( "CREATE ()"  ).consume (),
56+                 RxTransaction ::commit ,
57+                 ( tx , error  ) -> tx .rollback (),
58+                 RxTransaction ::rollback 
59+         );
60+ 
61+         AtomicInteger  createdNodesNum  = new  AtomicInteger ();
62+         Flux .usingWhen (
63+                 Mono .fromSupplier ( driver ::rxSession  ),
64+                 sessionToResultSummaryPublisher ,
65+                 session  -> Mono .empty (),
66+                 ( session , error  ) -> session .close (),
67+                 RxSession ::close 
68+         ).subscribe (
69+                 resultSummary  -> createdNodesNum .addAndGet ( resultSummary .counters ().nodesCreated () ),
70+                 error  -> handleError ( Futures .completionExceptionCause ( error  ), context , queryFinished  ),
71+                 () ->
72+                 {
73+                     assertEquals ( 1 , createdNodesNum .get () );
5474                    context .nodeCreated ();
5575                    queryFinished .complete ( null  );
56-                 }, error  -> {
57-                     handleError ( Futures .completionExceptionCause ( error  ), context , queryFinished  );
58-                 } );
76+                 }
77+         );
5978
6079        return  queryFinished ;
6180    }
0 commit comments