@@ -64,6 +64,7 @@ all_tests() ->
6464 scenario32 ,
6565 upgrade ,
6666 messages_total ,
67+ ra_indexes ,
6768 simple_prefetch ,
6869 simple_prefetch_without_checkout_cancel ,
6970 simple_prefetch_01 ,
@@ -910,6 +911,30 @@ messages_total(_Config) ->
910911 end )
911912 end , [], Size ).
912913
914+ ra_indexes (_Config ) ->
915+ meck :expect (rabbit_feature_flags , is_enabled , fun (_ ) -> false end ),
916+ Size = 256 ,
917+ run_proper (
918+ fun () ->
919+ ? FORALL ({Length , Bytes , DeliveryLimit , SingleActive },
920+ frequency ([{5 , {undefined , undefined , undefined , false }},
921+ {5 , {oneof ([range (1 , 10 ), undefined ]),
922+ oneof ([range (1 , 1000 ), undefined ]),
923+ oneof ([range (1 , 3 ), undefined ]),
924+ oneof ([true , false ])
925+ }}]),
926+ begin
927+ Config = config (? FUNCTION_NAME ,
928+ Length ,
929+ Bytes ,
930+ SingleActive ,
931+ DeliveryLimit ),
932+ ? FORALL (O , ? LET (Ops , log_gen (Size ), expand (Ops , Config )),
933+ collect ({log_size , length (O )},
934+ ra_indexes_prop (Config , O )))
935+ end )
936+ end , [], Size ).
937+
913938simple_prefetch (_Config ) ->
914939 Size = 500 ,
915940 meck :expect (rabbit_feature_flags , is_enabled , fun (_ ) -> true end ),
@@ -1464,6 +1489,38 @@ messages_total_invariant() ->
14641489 end
14651490 end .
14661491
1492+ ra_indexes_prop (Conf0 , Commands ) ->
1493+ Conf = Conf0 #{release_cursor_interval => 100 },
1494+ Indexes = lists :seq (1 , length (Commands )),
1495+ Entries = lists :zip (Indexes , Commands ),
1496+ InitState = test_init (Conf ),
1497+ run_log (InitState , Entries , ra_indexes_invariant ()),
1498+ true .
1499+
1500+ ra_indexes_invariant () ->
1501+ % % The raft indexes contained in the `ra_indexes` `rabbit_fifo_index` must
1502+ % % be the same as all indexes checked out by consumers plus those in the
1503+ % % `returns` queue.
1504+ fun (# rabbit_fifo {ra_indexes = Index ,
1505+ consumers = C ,
1506+ returns = R }) ->
1507+ RIdxs = lqueue :fold (fun (? MSG (I , _ ), Acc ) -> [I | Acc ] end , [], R ),
1508+ CIdxs = maps :fold (fun (_ , # consumer {checked_out = Ch }, Acc0 ) ->
1509+ maps :fold (fun (_ , ? MSG (I , _ ), Acc ) ->
1510+ [I | Acc ]
1511+ end , Acc0 , Ch )
1512+ end , [], C ),
1513+ ActualIdxs = lists :sort (RIdxs ++ CIdxs ),
1514+ IndexIdxs = lists :sort (rabbit_fifo_index :to_list (Index )),
1515+ case ActualIdxs == IndexIdxs of
1516+ true -> true ;
1517+ false ->
1518+ ct :pal (" ra_indexes invariant failed Expected ~b Got ~b " ,
1519+ [ActualIdxs , IndexIdxs ]),
1520+ false
1521+ end
1522+ end .
1523+
14671524simple_prefetch_prop (Conf0 , Commands , WithCheckoutCancel ) ->
14681525 Conf = Conf0 #{release_cursor_interval => 100 },
14691526 Indexes = lists :seq (1 , length (Commands )),
0 commit comments