Skip to content

Commit a8d8eab

Browse files
committed
Add unit for "simple" SAC group
"simple" meaning not part of a super stream. References #3753
1 parent 473c722 commit a8d8eab

File tree

2 files changed

+85
-14
lines changed

2 files changed

+85
-14
lines changed

deps/rabbit/src/rabbit_stream_sac_coordinator.erl

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -58,13 +58,7 @@ apply({register_consumer,
5858
ConnectionPid,
5959
SubscriptionId},
6060
#?MODULE{groups = StreamGroups0} = State) ->
61-
%% TODO monitor connection PID to remove consumers when their connection dies
62-
%% this could require some index to avoid crawling the whole data structure
63-
%% this is necessary to fail over to another consumer when one dies abruptly
64-
%% also, check the liveliness of each consumer whenever there's a change in the group,
65-
%% to make sure to get rid of zombies
66-
%%
67-
%% TODO monitor streams and virtual hosts as well
61+
%% FIXME monitor virtual hosts as well?
6862
rabbit_log:debug("New consumer ~p ~p in group ~p, partition index "
6963
"is ~p",
7064
[ConnectionPid,

deps/rabbit/test/rabbit_stream_sac_coordinator_SUITE.erl

Lines changed: 84 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,76 @@ init_per_testcase(_TestCase, Config) ->
5757
end_per_testcase(_TestCase, _Config) ->
5858
ok.
5959

60+
simple_sac_test(_) ->
61+
Stream = <<"stream">>,
62+
ConsumerName = <<"app">>,
63+
ConnectionPid = self(),
64+
GroupId = {<<"/">>, Stream, ConsumerName},
65+
Command0 =
66+
register_consumer_command(Stream, -1, ConsumerName, ConnectionPid, 0),
67+
State0 = state(),
68+
{#?STATE{groups = #{GroupId := #group{consumers = Consumers1}}} =
69+
State1,
70+
{ok, Active1}, Effects1} =
71+
rabbit_stream_sac_coordinator:apply(Command0, State0),
72+
?assert(Active1),
73+
?assertEqual([consumer(ConnectionPid, 0, true)], Consumers1),
74+
assertSendMessageEffect(ConnectionPid, 0, true, Effects1),
75+
76+
Command1 =
77+
register_consumer_command(Stream, -1, ConsumerName, ConnectionPid, 1),
78+
{#?STATE{groups = #{GroupId := #group{consumers = Consumers2}}} =
79+
State2,
80+
{ok, Active2}, Effects2} =
81+
rabbit_stream_sac_coordinator:apply(Command1, State1),
82+
?assertNot(Active2),
83+
?assertEqual([consumer(ConnectionPid, 0, true),
84+
consumer(ConnectionPid, 1, false)],
85+
Consumers2),
86+
assertEmpty(Effects2),
87+
88+
Command2 =
89+
register_consumer_command(Stream, -1, ConsumerName, ConnectionPid, 2),
90+
{#?STATE{groups = #{GroupId := #group{consumers = Consumers3}}} =
91+
State3,
92+
{ok, Active3}, Effects3} =
93+
rabbit_stream_sac_coordinator:apply(Command2, State2),
94+
?assertNot(Active3),
95+
?assertEqual([consumer(ConnectionPid, 0, true),
96+
consumer(ConnectionPid, 1, false),
97+
consumer(ConnectionPid, 2, false)],
98+
Consumers3),
99+
assertEmpty(Effects3),
100+
101+
Command3 =
102+
unregister_consumer_command(Stream, ConsumerName, ConnectionPid, 0),
103+
{#?STATE{groups = #{GroupId := #group{consumers = Consumers4}}} =
104+
State4,
105+
ok, Effects4} =
106+
rabbit_stream_sac_coordinator:apply(Command3, State3),
107+
?assertEqual([consumer(ConnectionPid, 1, true),
108+
consumer(ConnectionPid, 2, false)],
109+
Consumers4),
110+
assertSendMessageEffect(ConnectionPid, 1, true, Effects4),
111+
112+
Command4 =
113+
unregister_consumer_command(Stream, ConsumerName, ConnectionPid, 1),
114+
{#?STATE{groups = #{GroupId := #group{consumers = Consumers5}}} =
115+
State5,
116+
ok, Effects5} =
117+
rabbit_stream_sac_coordinator:apply(Command4, State4),
118+
?assertEqual([consumer(ConnectionPid, 2, true)], Consumers5),
119+
assertSendMessageEffect(ConnectionPid, 2, true, Effects5),
120+
121+
Command5 =
122+
unregister_consumer_command(Stream, ConsumerName, ConnectionPid, 2),
123+
{#?STATE{groups = Groups6}, ok, Effects6} =
124+
rabbit_stream_sac_coordinator:apply(Command5, State5),
125+
assertEmpty(Groups6),
126+
assertEmpty(Effects6),
127+
128+
ok.
129+
60130
ensure_monitors_test(_) ->
61131
GroupId = {<<"/">>, <<"stream">>, <<"app">>},
62132
Group =
@@ -139,13 +209,7 @@ handle_connection_down_test(_) ->
139209
rabbit_stream_sac_coordinator:handle_connection_down(Pid0, State0),
140210
assertSize(1, PidsGroups1),
141211
assertSize(1, maps:get(Pid1, PidsGroups1)),
142-
?assertEqual([{mod_call,
143-
rabbit_stream_sac_coordinator,
144-
send_message,
145-
[Pid1,
146-
{sac,
147-
{{subscription_id, 1}, {active, true}, {extra, []}}}]}],
148-
Effects1),
212+
assertSendMessageEffect(Pid1, 1, true, Effects1),
149213
?assertEqual(#{GroupId => cgroup([consumer(Pid1, 1, true)])},
150214
Groups1),
151215
{#?STATE{pids_groups = PidsGroups2, groups = Groups2} = _State2,
@@ -183,6 +247,9 @@ cgroup(Consumers) ->
183247
cgroup(PartitionIndex, Consumers) ->
184248
#group{partition_index = PartitionIndex, consumers = Consumers}.
185249

250+
state() ->
251+
state(#{}).
252+
186253
state(Groups) ->
187254
state(Groups, #{}).
188255

@@ -212,3 +279,13 @@ unregister_consumer_command(Stream,
212279
ConsumerName,
213280
ConnectionPid,
214281
SubId}.
282+
283+
assertSendMessageEffect(Pid, SubId, Active, [Effect]) ->
284+
?assertEqual({mod_call,
285+
rabbit_stream_sac_coordinator,
286+
send_message,
287+
[Pid,
288+
{sac,
289+
{{subscription_id, SubId}, {active, Active},
290+
{extra, []}}}]},
291+
Effect).

0 commit comments

Comments
 (0)