@@ -127,6 +127,107 @@ simple_sac_test(_) ->
127127
128128 ok .
129129
130+ super_stream_partition_sac_test (_ ) ->
131+ Stream = <<" stream" >>,
132+ ConsumerName = <<" app" >>,
133+ ConnectionPid = self (),
134+ GroupId = {<<" /" >>, Stream , ConsumerName },
135+ Command0 =
136+ register_consumer_command (Stream , 1 , ConsumerName , ConnectionPid , 0 ),
137+ State0 = state (),
138+ {#? STATE {groups = #{GroupId := # group {consumers = Consumers1 }}} =
139+ State1 ,
140+ {ok , Active1 }, Effects1 } =
141+ rabbit_stream_sac_coordinator :apply (Command0 , State0 ),
142+ ? assert (Active1 ),
143+ ? assertEqual ([consumer (ConnectionPid , 0 , true )], Consumers1 ),
144+ assertSendMessageEffect (ConnectionPid , 0 , true , Effects1 ),
145+
146+ Command1 =
147+ register_consumer_command (Stream , 1 , ConsumerName , ConnectionPid , 1 ),
148+ {#? STATE {groups = #{GroupId := # group {consumers = Consumers2 }}} =
149+ State2 ,
150+ {ok , Active2 }, Effects2 } =
151+ rabbit_stream_sac_coordinator :apply (Command1 , State1 ),
152+ % % never active on registration
153+ ? assertNot (Active2 ),
154+ % % all consumers inactive, until the former active one steps down and activates the new consumer
155+ ? assertEqual ([consumer (ConnectionPid , 0 , false ),
156+ consumer (ConnectionPid , 1 , false )],
157+ Consumers2 ),
158+ assertSendMessageSteppingDownEffect (ConnectionPid , 0 , Effects2 ),
159+
160+ Command2 = activate_consumer_command (Stream , ConsumerName ),
161+ {#? STATE {groups = #{GroupId := # group {consumers = Consumers3 }}} =
162+ State3 ,
163+ ok , Effects3 } =
164+ rabbit_stream_sac_coordinator :apply (Command2 , State2 ),
165+
166+ % % 1 (partition index) % 2 (consumer count) = 1 (active consumer index)
167+ ? assertEqual ([consumer (ConnectionPid , 0 , false ),
168+ consumer (ConnectionPid , 1 , true )],
169+ Consumers3 ),
170+ assertSendMessageEffect (ConnectionPid , 1 , true , Effects3 ),
171+
172+ Command3 =
173+ register_consumer_command (Stream , 1 , ConsumerName , ConnectionPid , 2 ),
174+ {#? STATE {groups = #{GroupId := # group {consumers = Consumers4 }}} =
175+ State4 ,
176+ {ok , Active4 }, Effects4 } =
177+ rabbit_stream_sac_coordinator :apply (Command3 , State3 ),
178+ % % never active on registration
179+ ? assertNot (Active4 ),
180+ % % 1 (partition index) % 3 (consumer count) = 1 (active consumer index)
181+ % % the active consumer stays the same
182+ ? assertEqual ([consumer (ConnectionPid , 0 , false ),
183+ consumer (ConnectionPid , 1 , true ),
184+ consumer (ConnectionPid , 2 , false )],
185+ Consumers4 ),
186+ assertEmpty (Effects4 ),
187+
188+ Command4 =
189+ unregister_consumer_command (Stream , ConsumerName , ConnectionPid , 0 ),
190+ {#? STATE {groups = #{GroupId := # group {consumers = Consumers5 }}} =
191+ State5 ,
192+ ok , Effects5 } =
193+ rabbit_stream_sac_coordinator :apply (Command4 , State4 ),
194+ % % 1 (partition index) % 2 (consumer count) = 1 (active consumer index)
195+ % % the active consumer will move from sub 1 to sub 2
196+ ? assertEqual ([consumer (ConnectionPid , 1 , false ),
197+ consumer (ConnectionPid , 2 , false )],
198+ Consumers5 ),
199+
200+ assertSendMessageSteppingDownEffect (ConnectionPid , 1 , Effects5 ),
201+
202+ Command5 = activate_consumer_command (Stream , ConsumerName ),
203+ {#? STATE {groups = #{GroupId := # group {consumers = Consumers6 }}} =
204+ State6 ,
205+ ok , Effects6 } =
206+ rabbit_stream_sac_coordinator :apply (Command5 , State5 ),
207+
208+ ? assertEqual ([consumer (ConnectionPid , 1 , false ),
209+ consumer (ConnectionPid , 2 , true )],
210+ Consumers6 ),
211+ assertSendMessageEffect (ConnectionPid , 2 , true , Effects6 ),
212+
213+ Command6 =
214+ unregister_consumer_command (Stream , ConsumerName , ConnectionPid , 1 ),
215+ {#? STATE {groups = #{GroupId := # group {consumers = Consumers7 }}} =
216+ State7 ,
217+ ok , Effects7 } =
218+ rabbit_stream_sac_coordinator :apply (Command6 , State6 ),
219+ ? assertEqual ([consumer (ConnectionPid , 2 , true )], Consumers7 ),
220+ assertEmpty (Effects7 ),
221+
222+ Command7 =
223+ unregister_consumer_command (Stream , ConsumerName , ConnectionPid , 2 ),
224+ {#? STATE {groups = Groups8 }, ok , Effects8 } =
225+ rabbit_stream_sac_coordinator :apply (Command7 , State7 ),
226+ assertEmpty (Groups8 ),
227+ assertEmpty (Effects8 ),
228+
229+ ok .
230+
130231ensure_monitors_test (_ ) ->
131232 GroupId = {<<" /" >>, <<" stream" >>, <<" app" >>},
132233 Group =
@@ -280,6 +381,9 @@ unregister_consumer_command(Stream,
280381 ConnectionPid ,
281382 SubId }.
282383
384+ activate_consumer_command (Stream , ConsumerName ) ->
385+ {activate_consumer , <<" /" >>, Stream , ConsumerName }.
386+
283387assertSendMessageEffect (Pid , SubId , Active , [Effect ]) ->
284388 ? assertEqual ({mod_call ,
285389 rabbit_stream_sac_coordinator ,
@@ -289,3 +393,13 @@ assertSendMessageEffect(Pid, SubId, Active, [Effect]) ->
289393 {{subscription_id , SubId }, {active , Active },
290394 {extra , []}}}]},
291395 Effect ).
396+
397+ assertSendMessageSteppingDownEffect (Pid , SubId , [Effect ]) ->
398+ ? assertEqual ({mod_call ,
399+ rabbit_stream_sac_coordinator ,
400+ send_message ,
401+ [Pid ,
402+ {sac ,
403+ {{subscription_id , SubId }, {active , false },
404+ {extra , [{stepping_down , true }]}}}]},
405+ Effect ).
0 commit comments