diff --git a/deps/rabbit/BUILD.bazel b/deps/rabbit/BUILD.bazel index ea972a0d9a4b..cefd487fa557 100644 --- a/deps/rabbit/BUILD.bazel +++ b/deps/rabbit/BUILD.bazel @@ -794,6 +794,12 @@ suites = [ "//deps/rabbit_common:erlang_app", ], ), + rabbitmq_suite( + name = "rabbit_stream_sac_coordinator_SUITE", + deps = [ + "//deps/rabbit_common:erlang_app", + ], + ), rabbitmq_integration_suite( PACKAGE, name = "rabbit_stream_queue_SUITE", diff --git a/deps/rabbit/rebar.config b/deps/rabbit/rebar.config new file mode 100644 index 000000000000..345cf9e342c4 --- /dev/null +++ b/deps/rabbit/rebar.config @@ -0,0 +1,13 @@ +{plugins, [rebar3_format]}. + +{format, [ + {files, ["src/rabbit_stream_sac_coordinator.*", + "test/rabbit_stream_sac_coordinator_SUITE.erl"]}, + {formatter, default_formatter}, + {options, #{ + paper => 80, + ribbon => 70, + inline_attributes => {when_under, 1}, + inline_items => {when_under, 4} + }} +]}. \ No newline at end of file diff --git a/deps/rabbit/src/rabbit_core_ff.erl b/deps/rabbit/src/rabbit_core_ff.erl index 2eb34de8321f..05d5048d4f88 100644 --- a/deps/rabbit/src/rabbit_core_ff.erl +++ b/deps/rabbit/src/rabbit_core_ff.erl @@ -13,7 +13,8 @@ implicit_default_bindings_migration/3, virtual_host_metadata_migration/3, maintenance_mode_status_migration/3, - user_limits_migration/3]). + user_limits_migration/3, + stream_single_active_consumer_migration/3]). -rabbit_feature_flag( {classic_mirrored_queue_version, @@ -68,6 +69,15 @@ migration_fun => {?MODULE, user_limits_migration} }}). +-rabbit_feature_flag( + {stream_single_active_consumer, + #{desc => "Single active consumer for streams", + doc_url => "https://www.rabbitmq.com/stream.html", + stability => stable, + depends_on => [stream_queue], + migration_fun => {?MODULE, stream_single_active_consumer_migration} + }}). + classic_mirrored_queue_version_migration(_FeatureName, _FeatureProps, _Enable) -> ok. @@ -188,3 +198,12 @@ user_limits_migration(_FeatureName, _FeatureProps, enable) -> end; user_limits_migration(_FeatureName, _FeatureProps, is_enabled) -> mnesia:table_info(rabbit_user, attributes) =:= internal_user:fields(internal_user_v2). + +%% ------------------------------------------------------------------- +%% Stream single active consumer. +%% ------------------------------------------------------------------- + +stream_single_active_consumer_migration(_FeatureName, _FeatureProps, enable) -> + ok; +stream_single_active_consumer_migration(_FeatureName, _FeatureProps, is_enabled) -> + undefined. diff --git a/deps/rabbit/src/rabbit_stream_coordinator.erl b/deps/rabbit/src/rabbit_stream_coordinator.erl index 38c5d9687eb9..a36d4cc92879 100644 --- a/deps/rabbit/src/rabbit_stream_coordinator.erl +++ b/deps/rabbit/src/rabbit_stream_coordinator.erl @@ -40,6 +40,10 @@ -export([log_overview/1]). -export([replay/1]). +%% for SAC coordinator +-export([process_command/1, + sac_state/1]). + %% for testing and debugging -export([eval_listeners/3, state/0]). @@ -88,6 +92,7 @@ {member_stopped, stream_id(), args()} | {retention_updated, stream_id(), args()} | {mnesia_updated, stream_id(), args()} | + {sac, rabbit_stream_sac_coordinator:command()} | ra_machine:effect(). -export_type([command/0]). @@ -171,6 +176,9 @@ policy_changed(Q) when ?is_amqqueue(Q) -> StreamId = maps:get(name, amqqueue:get_type_state(Q)), process_command({policy_changed, StreamId, #{queue => Q}}). +sac_state(#?MODULE{single_active_consumer = SacState}) -> + SacState. + %% for debugging state() -> case ra:local_query({?MODULE, node()}, fun(State) -> State end) of @@ -338,15 +346,15 @@ all_coord_members() -> Nodes = rabbit_mnesia:cluster_nodes(running) -- [node()], [{?MODULE, Node} || Node <- [node() | Nodes]]. -version() -> 2. +version() -> 3. which_module(_) -> ?MODULE. init(_Conf) -> - #?MODULE{}. + #?MODULE{single_active_consumer = rabbit_stream_sac_coordinator:init_state()}. --spec apply(map(), command(), state()) -> +-spec apply(ra_machine:command_meta_data(), command(), state()) -> {state(), term(), ra_machine:effects()}. apply(#{index := _Idx, machine_version := MachineVersion} = Meta0, {_CmdTag, StreamId, #{}} = Cmd, @@ -380,11 +388,18 @@ apply(#{index := _Idx, machine_version := MachineVersion} = Meta0, Reply -> return(Meta, State0, Reply, []) end; +apply(Meta, {sac, SacCommand}, #?MODULE{single_active_consumer = SacState0, + monitors = Monitors0} = State0) -> + {SacState1, Reply, Effects0} = rabbit_stream_sac_coordinator:apply(SacCommand, SacState0), + {SacState2, Monitors1, Effects1} = + rabbit_stream_sac_coordinator:ensure_monitors(SacCommand, SacState1, Monitors0, Effects0), + return(Meta, State0#?MODULE{single_active_consumer = SacState2, + monitors = Monitors1}, Reply, Effects1); apply(#{machine_version := MachineVersion} = Meta, {down, Pid, Reason} = Cmd, #?MODULE{streams = Streams0, monitors = Monitors0, - listeners = StateListeners0} = State) -> - + listeners = StateListeners0, + single_active_consumer = SacState0 } = State) -> Effects0 = case Reason of noconnection -> [{monitor, node, node(Pid)}]; @@ -438,6 +453,10 @@ apply(#{machine_version := MachineVersion} = Meta, {down, Pid, Reason} = Cmd, return(Meta, State#?MODULE{streams = Streams0, monitors = Monitors1}, ok, Effects0) end; + {sac, Monitors1} -> + {SacState1, Effects} = rabbit_stream_sac_coordinator:handle_connection_down(Pid, SacState0), + return(Meta, State#?MODULE{single_active_consumer = SacState1, + monitors = Monitors1}, ok, Effects); error -> return(Meta, State, ok, Effects0) end; @@ -1784,6 +1803,9 @@ machine_version(1, 2, State = #?MODULE{streams = Streams0, {State#?MODULE{streams = Streams1, monitors = Monitors2, listeners = undefined}, Effects}; +machine_version(2, 3, State) -> + rabbit_log:info("Stream coordinator machine version changes from 2 to 3, updating state."), + {State#?MODULE{single_active_consumer = rabbit_stream_sac_coordinator:init_state()}, []}; machine_version(From, To, State) -> rabbit_log:info("Stream coordinator machine version changes from ~p to ~p, no state changes required.", [From, To]), diff --git a/deps/rabbit/src/rabbit_stream_coordinator.hrl b/deps/rabbit/src/rabbit_stream_coordinator.hrl index 5d72ad9633c5..65f44d6febfc 100644 --- a/deps/rabbit/src/rabbit_stream_coordinator.hrl +++ b/deps/rabbit/src/rabbit_stream_coordinator.hrl @@ -59,10 +59,11 @@ -record(rabbit_stream_coordinator, {streams = #{} :: #{stream_id() => #stream{}}, monitors = #{} :: #{pid() => {stream_id() | %% v0 & v1 #{stream_id() => ok}, %% v2 - monitor_role()}}, + monitor_role()} | + sac}, %% not used as of v2 listeners = #{} :: undefined | #{stream_id() => #{pid() := queue_ref()}}, + single_active_consumer = undefined :: undefined | rabbit_stream_sac_coordinator:state(), %% future extensibility - reserved_1, reserved_2}). diff --git a/deps/rabbit/src/rabbit_stream_sac_coordinator.erl b/deps/rabbit/src/rabbit_stream_sac_coordinator.erl new file mode 100644 index 000000000000..a102aabc43fb --- /dev/null +++ b/deps/rabbit/src/rabbit_stream_sac_coordinator.erl @@ -0,0 +1,779 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 2.0 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License +%% at https://www.mozilla.org/en-US/MPL/2.0/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and +%% limitations under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is Pivotal Software, Inc. +%% Copyright (c) 2021-2022 VMware, Inc. or its affiliates. All rights reserved. +%% + +-module(rabbit_stream_sac_coordinator). + +-include("rabbit_stream_sac_coordinator.hrl"). + +-opaque command() :: + #command_register_consumer{} | #command_unregister_consumer{} | + #command_activate_consumer{}. +-opaque state() :: #?MODULE{}. + +-export_type([state/0, + command/0]). + +%% Single Active Consumer API +-export([register_consumer/7, + unregister_consumer/5, + activate_consumer/3, + consumer_groups/2, + group_consumers/4]). +-export([apply/2, + init_state/0, + send_message/2, + ensure_monitors/4, + handle_connection_down/2, + consumer_groups/3, + group_consumers/5, + is_ff_enabled/0]). + +%% Single Active Consumer API +-spec register_consumer(binary(), + binary(), + integer(), + binary(), + pid(), + binary(), + integer()) -> + {ok, boolean()} | {error, feature_flag_disabled} | + {error, term()}. +register_consumer(VirtualHost, + Stream, + PartitionIndex, + ConsumerName, + ConnectionPid, + Owner, + SubscriptionId) -> + maybe_sac_execute(fun() -> + process_command({sac, + #command_register_consumer{vhost = + VirtualHost, + stream = + Stream, + partition_index + = + PartitionIndex, + consumer_name + = + ConsumerName, + connection_pid + = + ConnectionPid, + owner = + Owner, + subscription_id + = + SubscriptionId}}) + end). + +-spec unregister_consumer(binary(), + binary(), + binary(), + pid(), + integer()) -> + ok | {error, feature_flag_disabled} | + {error, term()}. +unregister_consumer(VirtualHost, + Stream, + ConsumerName, + ConnectionPid, + SubscriptionId) -> + maybe_sac_execute(fun() -> + process_command({sac, + #command_unregister_consumer{vhost = + VirtualHost, + stream = + Stream, + consumer_name + = + ConsumerName, + connection_pid + = + ConnectionPid, + subscription_id + = + SubscriptionId}}) + end). + +-spec activate_consumer(binary(), binary(), binary()) -> + ok | {error, feature_flag_disabled}. +activate_consumer(VirtualHost, Stream, ConsumerName) -> + maybe_sac_execute(fun() -> + process_command({sac, + #command_activate_consumer{vhost = + VirtualHost, + stream = + Stream, + consumer_name + = + ConsumerName}}) + end). + +process_command(Cmd) -> + case rabbit_stream_coordinator:process_command(Cmd) of + {ok, Res, _} -> + Res; + {error, _} = Err -> + rabbit_log:warning("SAC coordinator command ~p returned error ~p", + [Cmd, Err]), + Err + end. + +%% return the current groups for a given virtual host +-spec consumer_groups(binary(), [atom()]) -> + {ok, + [term()] | {error, feature_flag_disabled | atom()}}. +consumer_groups(VirtualHost, InfoKeys) -> + maybe_sac_execute(fun() -> + case ra:local_query({rabbit_stream_coordinator, + node()}, + fun(State) -> + SacState = + rabbit_stream_coordinator:sac_state(State), + consumer_groups(VirtualHost, + InfoKeys, + SacState) + end) + of + {ok, {_, Result}, _} -> Result; + {error, noproc} -> + %% not started yet, so no groups + {ok, []}; + {error, _} = Err -> Err; + {timeout, _} -> {error, timeout} + end + end). + +%% get the consumers of a given group in a given virtual host +-spec group_consumers(binary(), binary(), binary(), [atom()]) -> + {ok, [term()]} | + {error, feature_flag_disabled | atom()}. +group_consumers(VirtualHost, Stream, Reference, InfoKeys) -> + maybe_sac_execute(fun() -> + case ra:local_query({rabbit_stream_coordinator, + node()}, + fun(State) -> + SacState = + rabbit_stream_coordinator:sac_state(State), + group_consumers(VirtualHost, + Stream, + Reference, + InfoKeys, + SacState) + end) + of + {ok, {_, {ok, _} = Result}, _} -> Result; + {ok, {_, {error, _} = Err}, _} -> Err; + {error, noproc} -> + %% not started yet, so the group cannot exist + {error, not_found}; + {error, _} = Err -> Err; + {timeout, _} -> {error, timeout} + end + end). + +maybe_sac_execute(Fun) -> + case rabbit_stream_sac_coordinator:is_ff_enabled() of + true -> + Fun(); + false -> + {error, feature_flag_disabled} + end. + +-spec init_state() -> state(). +init_state() -> + #?MODULE{groups = #{}, pids_groups = #{}}. + +-spec apply(command(), state()) -> + {state(), term(), ra_machine:effects()}. +apply(#command_register_consumer{vhost = VirtualHost, + stream = Stream, + partition_index = PartitionIndex, + consumer_name = ConsumerName, + connection_pid = ConnectionPid, + owner = Owner, + subscription_id = SubscriptionId}, + #?MODULE{groups = StreamGroups0} = State) -> + rabbit_log:debug("New consumer ~p ~p in group ~p, partition index " + "is ~p", + [ConnectionPid, + SubscriptionId, + {VirtualHost, Stream, ConsumerName}, + PartitionIndex]), + StreamGroups1 = + maybe_create_group(VirtualHost, + Stream, + PartitionIndex, + ConsumerName, + StreamGroups0), + + do_register_consumer(VirtualHost, + Stream, + PartitionIndex, + ConsumerName, + ConnectionPid, + Owner, + SubscriptionId, + State#?MODULE{groups = StreamGroups1}); +apply(#command_unregister_consumer{vhost = VirtualHost, + stream = Stream, + consumer_name = ConsumerName, + connection_pid = ConnectionPid, + subscription_id = SubscriptionId}, + #?MODULE{groups = StreamGroups0} = State0) -> + {State1, Effects1} = + case lookup_group(VirtualHost, Stream, ConsumerName, StreamGroups0) of + undefined -> + {State0, []}; + Group0 -> + {Group1, Effects} = + case lookup_consumer(ConnectionPid, SubscriptionId, Group0) + of + {value, Consumer} -> + rabbit_log:debug("Unregistering consumer ~p from group", + [Consumer]), + G1 = remove_from_group(Consumer, Group0), + rabbit_log:debug("Consumer removed from group: ~p", + [G1]), + handle_consumer_removal(G1, Consumer); + false -> + rabbit_log:debug("Could not find consumer ~p ~p in group ~p ~p ~p", + [ConnectionPid, + SubscriptionId, + VirtualHost, + Stream, + ConsumerName]), + {Group0, []} + end, + SGS = update_groups(VirtualHost, + Stream, + ConsumerName, + Group1, + StreamGroups0), + {State0#?MODULE{groups = SGS}, Effects} + end, + {State1, ok, Effects1}; +apply(#command_activate_consumer{vhost = VirtualHost, + stream = Stream, + consumer_name = ConsumerName}, + #?MODULE{groups = StreamGroups0} = State0) -> + {G, Eff} = + case lookup_group(VirtualHost, Stream, ConsumerName, StreamGroups0) of + undefined -> + rabbit_log:warning("trying to activate consumer in group ~p, but " + "the group does not longer exist", + [{VirtualHost, Stream, ConsumerName}]), + {undefined, []}; + Group -> + #consumer{pid = Pid, subscription_id = SubId} = + evaluate_active_consumer(Group), + Group1 = + update_consumer_state_in_group(Group, Pid, SubId, true), + {Group1, [notify_consumer_effect(Pid, SubId, true)]} + end, + StreamGroups1 = + update_groups(VirtualHost, Stream, ConsumerName, G, StreamGroups0), + {State0#?MODULE{groups = StreamGroups1}, ok, Eff}. + +-spec consumer_groups(binary(), [atom()], state()) -> {ok, [term()]}. +consumer_groups(VirtualHost, InfoKeys, #?MODULE{groups = Groups}) -> + Res = maps:fold(fun ({VH, Stream, Reference}, + #group{consumers = Consumers, + partition_index = PartitionIndex}, + Acc) + when VH == VirtualHost -> + Record = + lists:foldr(fun (stream, RecAcc) -> + [{stream, Stream} | RecAcc]; + (reference, RecAcc) -> + [{reference, Reference} + | RecAcc]; + (partition_index, RecAcc) -> + [{partition_index, + PartitionIndex} + | RecAcc]; + (consumers, RecAcc) -> + [{consumers, + length(Consumers)} + | RecAcc]; + (Unknown, RecAcc) -> + [{Unknown, unknown_field} + | RecAcc] + end, + [], InfoKeys), + [Record | Acc]; + (_GroupId, _Group, Acc) -> + Acc + end, + [], Groups), + {ok, lists:reverse(Res)}. + +-spec group_consumers(binary(), + binary(), + binary(), + [atom()], + state()) -> + {ok, [term()]} | {error, not_found}. +group_consumers(VirtualHost, + Stream, + Reference, + InfoKeys, + #?MODULE{groups = Groups}) -> + GroupId = {VirtualHost, Stream, Reference}, + case Groups of + #{GroupId := #group{consumers = Consumers}} -> + Cs = lists:foldr(fun(#consumer{subscription_id = SubId, + owner = Owner, + active = Active}, + Acc) -> + Record = + lists:foldr(fun (subscription_id, RecAcc) -> + [{subscription_id, + SubId} + | RecAcc]; + (connection_name, RecAcc) -> + [{connection_name, + Owner} + | RecAcc]; + (state, RecAcc) + when Active -> + [{state, active} + | RecAcc]; + (state, RecAcc) -> + [{state, inactive} + | RecAcc]; + (Unknown, RecAcc) -> + [{Unknown, + unknown_field} + | RecAcc] + end, + [], InfoKeys), + [Record | Acc] + end, + [], Consumers), + {ok, Cs}; + _ -> + {error, not_found} + end. + +-spec ensure_monitors(command(), + state(), + map(), + ra_machine:effects()) -> + {state(), map(), ra_machine:effects()}. +ensure_monitors(#command_register_consumer{vhost = VirtualHost, + stream = Stream, + consumer_name = ConsumerName, + connection_pid = Pid}, + #?MODULE{pids_groups = PidsGroups0} = State0, + Monitors0, + Effects) -> + GroupId = {VirtualHost, Stream, ConsumerName}, + Groups0 = maps:get(Pid, PidsGroups0, #{}), + PidsGroups1 = + maps:put(Pid, maps:put(GroupId, true, Groups0), PidsGroups0), + {State0#?MODULE{pids_groups = PidsGroups1}, Monitors0#{Pid => sac}, + [{monitor, process, Pid}, {monitor, node, node(Pid)} | Effects]}; +ensure_monitors(#command_unregister_consumer{vhost = VirtualHost, + stream = Stream, + consumer_name = ConsumerName, + connection_pid = Pid}, + #?MODULE{groups = StreamGroups0, pids_groups = PidsGroups0} = + State0, + Monitors, + Effects) -> + GroupId = {VirtualHost, Stream, ConsumerName}, + #{Pid := PidGroup0} = PidsGroups0, + PidGroup1 = + case lookup_group(VirtualHost, Stream, ConsumerName, StreamGroups0) of + undefined -> + %% group is gone, can be removed from the PID map + maps:remove(GroupId, PidGroup0); + Group -> + %% group still exists, check if other consumers are from this PID + %% if yes, don't change the PID set + %% if no, remove group from PID set + case has_consumers_from_pid(Group, Pid) of + true -> + %% the group still depends on this PID, keep the group entry in the set + PidGroup0; + false -> + %% the group does not depend on the PID anymore, remove the group entry from the map + maps:remove(GroupId, PidGroup0) + end + end, + case maps:size(PidGroup1) == 0 of + true -> + %% no more groups depend on the PID + %% remove PID from data structure and demonitor it + {State0#?MODULE{pids_groups = maps:remove(Pid, PidsGroups0)}, + maps:remove(Pid, Monitors), [{demonitor, process, Pid} | Effects]}; + false -> + %% one or more groups still depend on the PID + {State0#?MODULE{pids_groups = + maps:put(Pid, PidGroup1, PidsGroups0)}, + Monitors, Effects} + end; +ensure_monitors(_, #?MODULE{} = State0, Monitors, Effects) -> + {State0, Monitors, Effects}. + +-spec handle_connection_down(connection_pid(), state()) -> + {state(), ra_machine:effects()}. +handle_connection_down(Pid, + #?MODULE{pids_groups = PidsGroups0} = State0) -> + case maps:take(Pid, PidsGroups0) of + error -> + {State0, []}; + {Groups, PidsGroups1} -> + State1 = State0#?MODULE{pids_groups = PidsGroups1}, + %% iterate other the groups that this PID affects + maps:fold(fun({VirtualHost, Stream, ConsumerName}, _, + {#?MODULE{groups = ConsumerGroups} = S0, Eff0}) -> + case lookup_group(VirtualHost, + Stream, + ConsumerName, + ConsumerGroups) + of + undefined -> {S0, Eff0}; + #group{consumers = Consumers} -> + %% iterate over the consumers of the group + %% and unregister the ones from this PID. + %% It may not be optimal, computing the new active consumer + %% from the purged group and notifying the remaining consumers + %% appropriately should avoid unwanted notifications and even rebalancing. + lists:foldl(fun (#consumer{pid = P, + subscription_id = + SubId}, + {StateSub0, EffSub0}) + when P == Pid -> + {StateSub1, ok, E} = + ?MODULE:apply(#command_unregister_consumer{vhost + = + VirtualHost, + stream + = + Stream, + consumer_name + = + ConsumerName, + connection_pid + = + Pid, + subscription_id + = + SubId}, + StateSub0), + {StateSub1, EffSub0 ++ E}; + (_Consumer, Acc) -> Acc + end, + {S0, Eff0}, Consumers) + end + end, + {State1, []}, Groups) + end. + +do_register_consumer(VirtualHost, + Stream, + -1 = _PartitionIndex, + ConsumerName, + ConnectionPid, + Owner, + SubscriptionId, + #?MODULE{groups = StreamGroups0} = State) -> + Group0 = + lookup_group(VirtualHost, Stream, ConsumerName, StreamGroups0), + + rabbit_log:debug("Group: ~p", [Group0]), + Consumer = + case lookup_active_consumer(Group0) of + {value, _} -> + #consumer{pid = ConnectionPid, + owner = Owner, + subscription_id = SubscriptionId, + active = false}; + false -> + #consumer{pid = ConnectionPid, + subscription_id = SubscriptionId, + owner = Owner, + active = true} + end, + Group1 = add_to_group(Consumer, Group0), + rabbit_log:debug("Consumer added to group: ~p", [Group1]), + StreamGroups1 = + update_groups(VirtualHost, + Stream, + ConsumerName, + Group1, + StreamGroups0), + + #consumer{active = Active} = Consumer, + Effects = + case Active of + true -> + [notify_consumer_effect(ConnectionPid, SubscriptionId, Active)]; + _ -> + [] + end, + + {State#?MODULE{groups = StreamGroups1}, {ok, Active}, Effects}; +do_register_consumer(VirtualHost, + Stream, + _PartitionIndex, + ConsumerName, + ConnectionPid, + Owner, + SubscriptionId, + #?MODULE{groups = StreamGroups0} = State) -> + Group0 = + lookup_group(VirtualHost, Stream, ConsumerName, StreamGroups0), + + rabbit_log:debug("Group: ~p", [Group0]), + {Group1, Effects} = + case Group0 of + #group{consumers = []} -> + %% first consumer in the group, it's the active one + Consumer0 = + #consumer{pid = ConnectionPid, + owner = Owner, + subscription_id = SubscriptionId, + active = true}, + G1 = add_to_group(Consumer0, Group0), + {G1, + [notify_consumer_effect(ConnectionPid, SubscriptionId, true)]}; + _G -> + %% whatever the current state is, the newcomer will be passive + Consumer0 = + #consumer{pid = ConnectionPid, + owner = Owner, + subscription_id = SubscriptionId, + active = false}, + G1 = add_to_group(Consumer0, Group0), + + case lookup_active_consumer(G1) of + {value, + #consumer{pid = ActPid, subscription_id = ActSubId} = + CurrentActive} -> + case evaluate_active_consumer(G1) of + CurrentActive -> + %% the current active stays the same + {G1, []}; + _ -> + %% there's a change, telling the active it's not longer active + {update_consumer_state_in_group(G1, + ActPid, + ActSubId, + false), + [notify_consumer_effect(ActPid, + ActSubId, + false, + true)]} + end; + false -> + %% no active consumer in the (non-empty) group, we are waiting for the reply of a former active + {G1, []} + end + end, + StreamGroups1 = + update_groups(VirtualHost, + Stream, + ConsumerName, + Group1, + StreamGroups0), + {value, #consumer{active = Active}} = + lookup_consumer(ConnectionPid, SubscriptionId, Group1), + {State#?MODULE{groups = StreamGroups1}, {ok, Active}, Effects}. + +handle_consumer_removal(#group{consumers = []} = G, _) -> + {G, []}; +handle_consumer_removal(#group{partition_index = -1} = Group0, + Consumer) -> + case Consumer of + #consumer{active = true} -> + Group1 = compute_active_consumer(Group0), + rabbit_log:debug("This is the active consumer, group after active " + "consumer calculation: ~p", + [Group1]), + case lookup_active_consumer(Group1) of + {value, #consumer{pid = Pid, subscription_id = SubId} = C} -> + rabbit_log:debug("Creating side effect to notify new active consumer ~p", + [C]), + {Group1, [notify_consumer_effect(Pid, SubId, true)]}; + _ -> + rabbit_log:debug("No active consumer found in the group, nothing " + "to do"), + {Group1, []} + end; + #consumer{active = false} -> + rabbit_log:debug("Not the active consumer, nothing to do."), + {Group0, []} + end; +handle_consumer_removal(Group0, Consumer) -> + case lookup_active_consumer(Group0) of + {value, + #consumer{pid = ActPid, subscription_id = ActSubId} = + CurrentActive} -> + case evaluate_active_consumer(Group0) of + CurrentActive -> + %% the current active stays the same + {Group0, []}; + _ -> + %% there's a change, telling the active it's not longer active + {update_consumer_state_in_group(Group0, + ActPid, + ActSubId, + false), + [notify_consumer_effect(ActPid, ActSubId, false, true)]} + end; + false -> + case Consumer#consumer.active of + true -> + %% the active one is going away, picking a new one + #consumer{pid = P, subscription_id = SID} = + evaluate_active_consumer(Group0), + {update_consumer_state_in_group(Group0, P, SID, true), + [notify_consumer_effect(P, SID, true)]}; + false -> + %% no active consumer in the (non-empty) group, we are waiting for the reply of a former active + {Group0, []} + end + end. + +notify_consumer_effect(Pid, SubId, Active) -> + notify_consumer_effect(Pid, SubId, Active, false). + +notify_consumer_effect(Pid, SubId, Active, false = _SteppingDown) -> + mod_call_effect(Pid, + {sac, + {{subscription_id, SubId}, {active, Active}, + {extra, []}}}); +notify_consumer_effect(Pid, SubId, Active, true = _SteppingDown) -> + mod_call_effect(Pid, + {sac, + {{subscription_id, SubId}, {active, Active}, + {extra, [{stepping_down, true}]}}}). + +maybe_create_group(VirtualHost, + Stream, + PartitionIndex, + ConsumerName, + StreamGroups) -> + case StreamGroups of + #{{VirtualHost, Stream, ConsumerName} := _Group} -> + StreamGroups; + SGS -> + maps:put({VirtualHost, Stream, ConsumerName}, + #group{consumers = [], partition_index = PartitionIndex}, + SGS) + end. + +lookup_group(VirtualHost, Stream, ConsumerName, StreamGroups) -> + maps:get({VirtualHost, Stream, ConsumerName}, StreamGroups, + undefined). + +add_to_group(Consumer, #group{consumers = Consumers} = Group) -> + Group#group{consumers = Consumers ++ [Consumer]}. + +remove_from_group(Consumer, #group{consumers = Consumers} = Group) -> + Group#group{consumers = lists:delete(Consumer, Consumers)}. + +has_consumers_from_pid(#group{consumers = Consumers}, Pid) -> + lists:any(fun (#consumer{pid = P}) when P == Pid -> + true; + (_) -> + false + end, + Consumers). + +compute_active_consumer(#group{consumers = Crs, + partition_index = -1} = + Group) + when length(Crs) == 0 -> + Group; +compute_active_consumer(#group{partition_index = -1, + consumers = [Consumer0]} = + Group0) -> + Consumer1 = Consumer0#consumer{active = true}, + Group0#group{consumers = [Consumer1]}; +compute_active_consumer(#group{partition_index = -1, + consumers = [Consumer0 | T]} = + Group0) -> + Consumer1 = Consumer0#consumer{active = true}, + Consumers = lists:map(fun(C) -> C#consumer{active = false} end, T), + Group0#group{consumers = [Consumer1] ++ Consumers}. + +evaluate_active_consumer(#group{partition_index = PartitionIndex, + consumers = Consumers}) + when PartitionIndex >= 0 -> + ActiveConsumerIndex = PartitionIndex rem length(Consumers), + lists:nth(ActiveConsumerIndex + 1, Consumers). + +lookup_consumer(ConnectionPid, SubscriptionId, + #group{consumers = Consumers}) -> + lists:search(fun(#consumer{pid = ConnPid, subscription_id = SubId}) -> + ConnPid == ConnectionPid andalso SubId == SubscriptionId + end, + Consumers). + +lookup_active_consumer(#group{consumers = Consumers}) -> + lists:search(fun(#consumer{active = Active}) -> Active end, + Consumers). + +update_groups(_VirtualHost, + _Stream, + _ConsumerName, + undefined, + StreamGroups) -> + StreamGroups; +update_groups(VirtualHost, + Stream, + ConsumerName, + #group{consumers = []}, + StreamGroups) -> + rabbit_log:debug("Group ~p ~p ~p is now empty, removing it", + [VirtualHost, Stream, ConsumerName]), + maps:remove({VirtualHost, Stream, ConsumerName}, StreamGroups); +update_groups(VirtualHost, + Stream, + ConsumerName, + Group, + StreamGroups) -> + maps:put({VirtualHost, Stream, ConsumerName}, Group, StreamGroups). + +update_consumer_state_in_group(#group{consumers = Consumers0} = G, + Pid, + SubId, + NewState) -> + CS1 = lists:map(fun(C0) -> + case C0 of + #consumer{pid = Pid, subscription_id = SubId} -> + C0#consumer{active = NewState}; + C -> C + end + end, + Consumers0), + G#group{consumers = CS1}. + +mod_call_effect(Pid, Msg) -> + {mod_call, rabbit_stream_sac_coordinator, send_message, [Pid, Msg]}. + +-spec send_message(pid(), term()) -> ok. +send_message(ConnectionPid, Msg) -> + ConnectionPid ! Msg, + ok. + +is_ff_enabled() -> + rabbit_feature_flags:is_enabled(stream_single_active_consumer). diff --git a/deps/rabbit/src/rabbit_stream_sac_coordinator.hrl b/deps/rabbit/src/rabbit_stream_sac_coordinator.hrl new file mode 100644 index 000000000000..a0a2eacfb45f --- /dev/null +++ b/deps/rabbit/src/rabbit_stream_sac_coordinator.hrl @@ -0,0 +1,58 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 2.0 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License +%% at https://www.mozilla.org/en-US/MPL/2.0/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and +%% limitations under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is Pivotal Software, Inc. +%% Copyright (c) 2021 VMware, Inc. or its affiliates. All rights reserved. +%% + +-type vhost() :: binary(). +-type partition_index() :: integer(). +-type stream() :: binary(). +-type consumer_name() :: binary(). +-type connection_pid() :: pid(). +-type subscription_id() :: byte(). +-type group_id() :: {vhost(), stream(), consumer_name()}. +-type owner() :: binary(). + +-record(consumer, + {pid :: pid(), + subscription_id :: subscription_id(), + owner :: owner(), %% just a label + active :: boolean()}). +-record(group, + {consumers :: [#consumer{}], partition_index :: integer()}). +-record(rabbit_stream_sac_coordinator, + {groups :: #{group_id() => #group{}}, + pids_groups :: + #{connection_pid() => + #{group_id() => true}}, %% inner map acts as a set + %% future extensibility + reserved_1, + reserved_2}). +%% commands +-record(command_register_consumer, + {vhost :: vhost(), + stream :: stream(), + partition_index :: partition_index(), + consumer_name :: consumer_name(), + connection_pid :: connection_pid(), + owner :: owner(), + subscription_id :: subscription_id()}). +-record(command_unregister_consumer, + {vhost :: vhost(), + stream :: stream(), + consumer_name :: consumer_name(), + connection_pid :: connection_pid(), + subscription_id :: subscription_id()}). +-record(command_activate_consumer, + {vhost :: vhost(), stream :: stream(), + consumer_name :: consumer_name()}). diff --git a/deps/rabbit/test/rabbit_stream_coordinator_SUITE.erl b/deps/rabbit/test/rabbit_stream_coordinator_SUITE.erl index 361938e67edb..e5ee529a63f5 100644 --- a/deps/rabbit/test/rabbit_stream_coordinator_SUITE.erl +++ b/deps/rabbit/test/rabbit_stream_coordinator_SUITE.erl @@ -26,7 +26,8 @@ all() -> all_tests() -> [ listeners, - machine_version_upgrade, + machine_version_upgrade_to_2, + machine_version_upgrade_to_3, new_stream, leader_down, leader_down_scenario_1, @@ -197,12 +198,12 @@ listeners(_) -> ok. -machine_version_upgrade(_) -> - machine_version(0, 2), - machine_version(1, 2), +machine_version_upgrade_to_2(_) -> + machine_version_to_2(0), + machine_version_to_2(1), ok. -machine_version(From, To) -> +machine_version_to_2(From) -> S = <<"stream">>, LeaderPid = spawn(fun() -> ok end), ListPid = spawn(fun() -> ok end), %% simulate a dead listener (not cleaned up) @@ -212,7 +213,7 @@ machine_version(From, To) -> DeadListPid => LeaderPid}}}, monitors = #{ListPid => {S, listener}}}, - {State1, ok, Effects} = apply_cmd(#{index => 42}, {machine_version, From, To}, State0), + {State1, ok, Effects} = apply_cmd(#{index => 42}, {machine_version, From, 2}, State0), Stream1 = maps:get(S, State1#?STATE.streams), ?assertEqual( @@ -232,6 +233,24 @@ machine_version(From, To) -> ), ok. +machine_version_upgrade_to_3(_) -> + machine_version_to_3(0), + machine_version_to_3(1), + machine_version_to_3(2), + ok. + +machine_version_to_3(From) -> + State0 = #?STATE{}, + #?STATE{single_active_consumer = Sac0} = State0, + + ?assert(Sac0 == undefined), + + {#?STATE{single_active_consumer = Sac1}, ok, Effects} = apply_cmd(#{index => 42}, {machine_version, From, 3}, State0), + + ?assertNot(Sac1 == undefined), + ?assertEqual(Effects, []), + ok. + new_stream(_) -> [N1, N2, N3] = Nodes = [r@n1, r@n2, r@n3], StreamId = atom_to_list(?FUNCTION_NAME), diff --git a/deps/rabbit/test/rabbit_stream_sac_coordinator_SUITE.erl b/deps/rabbit/test/rabbit_stream_sac_coordinator_SUITE.erl new file mode 100644 index 000000000000..1282800dbc52 --- /dev/null +++ b/deps/rabbit/test/rabbit_stream_sac_coordinator_SUITE.erl @@ -0,0 +1,402 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 2.0 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License +%% at https://www.mozilla.org/en-US/MPL/2.0/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and +%% limitations under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is Pivotal Software, Inc. +%% Copyright (c) 2021-2022 VMware, Inc. or its affiliates. All rights reserved. +%% + +-module(rabbit_stream_sac_coordinator_SUITE). + +-compile(export_all). + +-include_lib("eunit/include/eunit.hrl"). +-include_lib("rabbit/src/rabbit_stream_sac_coordinator.hrl"). + +%%%=================================================================== +%%% Common Test callbacks +%%%=================================================================== + +-define(STATE, rabbit_stream_sac_coordinator). + +all() -> + [{group, tests}]. + +%% replicate eunit like test resolution +all_tests() -> + [F + || {F, _} <- ?MODULE:module_info(functions), + re:run(atom_to_list(F), "_test$") /= nomatch]. + +groups() -> + [{tests, [], all_tests()}]. + +init_per_suite(Config) -> + Config. + +end_per_suite(_Config) -> + ok. + +init_per_group(_Group, Config) -> + Config. + +end_per_group(_Group, _Config) -> + ok. + +init_per_testcase(_TestCase, Config) -> + Config. + +end_per_testcase(_TestCase, _Config) -> + ok. + +simple_sac_test(_) -> + Stream = <<"stream">>, + ConsumerName = <<"app">>, + ConnectionPid = self(), + GroupId = {<<"/">>, Stream, ConsumerName}, + Command0 = + register_consumer_command(Stream, -1, ConsumerName, ConnectionPid, 0), + State0 = state(), + {#?STATE{groups = #{GroupId := #group{consumers = Consumers1}}} = + State1, + {ok, Active1}, Effects1} = + rabbit_stream_sac_coordinator:apply(Command0, State0), + ?assert(Active1), + ?assertEqual([consumer(ConnectionPid, 0, true)], Consumers1), + assertSendMessageEffect(ConnectionPid, 0, true, Effects1), + + Command1 = + register_consumer_command(Stream, -1, ConsumerName, ConnectionPid, 1), + {#?STATE{groups = #{GroupId := #group{consumers = Consumers2}}} = + State2, + {ok, Active2}, Effects2} = + rabbit_stream_sac_coordinator:apply(Command1, State1), + ?assertNot(Active2), + ?assertEqual([consumer(ConnectionPid, 0, true), + consumer(ConnectionPid, 1, false)], + Consumers2), + assertEmpty(Effects2), + + Command2 = + register_consumer_command(Stream, -1, ConsumerName, ConnectionPid, 2), + {#?STATE{groups = #{GroupId := #group{consumers = Consumers3}}} = + State3, + {ok, Active3}, Effects3} = + rabbit_stream_sac_coordinator:apply(Command2, State2), + ?assertNot(Active3), + ?assertEqual([consumer(ConnectionPid, 0, true), + consumer(ConnectionPid, 1, false), + consumer(ConnectionPid, 2, false)], + Consumers3), + assertEmpty(Effects3), + + Command3 = + unregister_consumer_command(Stream, ConsumerName, ConnectionPid, 0), + {#?STATE{groups = #{GroupId := #group{consumers = Consumers4}}} = + State4, + ok, Effects4} = + rabbit_stream_sac_coordinator:apply(Command3, State3), + ?assertEqual([consumer(ConnectionPid, 1, true), + consumer(ConnectionPid, 2, false)], + Consumers4), + assertSendMessageEffect(ConnectionPid, 1, true, Effects4), + + Command4 = + unregister_consumer_command(Stream, ConsumerName, ConnectionPid, 1), + {#?STATE{groups = #{GroupId := #group{consumers = Consumers5}}} = + State5, + ok, Effects5} = + rabbit_stream_sac_coordinator:apply(Command4, State4), + ?assertEqual([consumer(ConnectionPid, 2, true)], Consumers5), + assertSendMessageEffect(ConnectionPid, 2, true, Effects5), + + Command5 = + unregister_consumer_command(Stream, ConsumerName, ConnectionPid, 2), + {#?STATE{groups = Groups6}, ok, Effects6} = + rabbit_stream_sac_coordinator:apply(Command5, State5), + assertEmpty(Groups6), + assertEmpty(Effects6), + + ok. + +super_stream_partition_sac_test(_) -> + Stream = <<"stream">>, + ConsumerName = <<"app">>, + ConnectionPid = self(), + GroupId = {<<"/">>, Stream, ConsumerName}, + Command0 = + register_consumer_command(Stream, 1, ConsumerName, ConnectionPid, 0), + State0 = state(), + {#?STATE{groups = #{GroupId := #group{consumers = Consumers1}}} = + State1, + {ok, Active1}, Effects1} = + rabbit_stream_sac_coordinator:apply(Command0, State0), + ?assert(Active1), + ?assertEqual([consumer(ConnectionPid, 0, true)], Consumers1), + assertSendMessageEffect(ConnectionPid, 0, true, Effects1), + + Command1 = + register_consumer_command(Stream, 1, ConsumerName, ConnectionPid, 1), + {#?STATE{groups = #{GroupId := #group{consumers = Consumers2}}} = + State2, + {ok, Active2}, Effects2} = + rabbit_stream_sac_coordinator:apply(Command1, State1), + %% never active on registration + ?assertNot(Active2), + %% all consumers inactive, until the former active one steps down and activates the new consumer + ?assertEqual([consumer(ConnectionPid, 0, false), + consumer(ConnectionPid, 1, false)], + Consumers2), + assertSendMessageSteppingDownEffect(ConnectionPid, 0, Effects2), + + Command2 = activate_consumer_command(Stream, ConsumerName), + {#?STATE{groups = #{GroupId := #group{consumers = Consumers3}}} = + State3, + ok, Effects3} = + rabbit_stream_sac_coordinator:apply(Command2, State2), + + %% 1 (partition index) % 2 (consumer count) = 1 (active consumer index) + ?assertEqual([consumer(ConnectionPid, 0, false), + consumer(ConnectionPid, 1, true)], + Consumers3), + assertSendMessageEffect(ConnectionPid, 1, true, Effects3), + + Command3 = + register_consumer_command(Stream, 1, ConsumerName, ConnectionPid, 2), + {#?STATE{groups = #{GroupId := #group{consumers = Consumers4}}} = + State4, + {ok, Active4}, Effects4} = + rabbit_stream_sac_coordinator:apply(Command3, State3), + %% never active on registration + ?assertNot(Active4), + %% 1 (partition index) % 3 (consumer count) = 1 (active consumer index) + %% the active consumer stays the same + ?assertEqual([consumer(ConnectionPid, 0, false), + consumer(ConnectionPid, 1, true), + consumer(ConnectionPid, 2, false)], + Consumers4), + assertEmpty(Effects4), + + Command4 = + unregister_consumer_command(Stream, ConsumerName, ConnectionPid, 0), + {#?STATE{groups = #{GroupId := #group{consumers = Consumers5}}} = + State5, + ok, Effects5} = + rabbit_stream_sac_coordinator:apply(Command4, State4), + %% 1 (partition index) % 2 (consumer count) = 1 (active consumer index) + %% the active consumer will move from sub 1 to sub 2 + ?assertEqual([consumer(ConnectionPid, 1, false), + consumer(ConnectionPid, 2, false)], + Consumers5), + + assertSendMessageSteppingDownEffect(ConnectionPid, 1, Effects5), + + Command5 = activate_consumer_command(Stream, ConsumerName), + {#?STATE{groups = #{GroupId := #group{consumers = Consumers6}}} = + State6, + ok, Effects6} = + rabbit_stream_sac_coordinator:apply(Command5, State5), + + ?assertEqual([consumer(ConnectionPid, 1, false), + consumer(ConnectionPid, 2, true)], + Consumers6), + assertSendMessageEffect(ConnectionPid, 2, true, Effects6), + + Command6 = + unregister_consumer_command(Stream, ConsumerName, ConnectionPid, 1), + {#?STATE{groups = #{GroupId := #group{consumers = Consumers7}}} = + State7, + ok, Effects7} = + rabbit_stream_sac_coordinator:apply(Command6, State6), + ?assertEqual([consumer(ConnectionPid, 2, true)], Consumers7), + assertEmpty(Effects7), + + Command7 = + unregister_consumer_command(Stream, ConsumerName, ConnectionPid, 2), + {#?STATE{groups = Groups8}, ok, Effects8} = + rabbit_stream_sac_coordinator:apply(Command7, State7), + assertEmpty(Groups8), + assertEmpty(Effects8), + + ok. + +ensure_monitors_test(_) -> + GroupId = {<<"/">>, <<"stream">>, <<"app">>}, + Group = + cgroup([consumer(self(), 0, true), consumer(self(), 1, false)]), + State0 = state(#{GroupId => Group}), + Monitors0 = #{}, + Command0 = + register_consumer_command(<<"stream">>, -1, <<"app">>, self(), 0), + {#?STATE{pids_groups = PidsGroups1} = State1, Monitors1, Effects1} = + rabbit_stream_sac_coordinator:ensure_monitors(Command0, + State0, + Monitors0, + []), + assertSize(1, PidsGroups1), + assertSize(1, maps:get(self(), PidsGroups1)), + ?assertEqual(#{self() => sac}, Monitors1), + ?assertEqual([{monitor, process, self()}, {monitor, node, node()}], + Effects1), + + Command1 = + register_consumer_command(<<"stream">>, -1, <<"app">>, self(), 1), + + {#?STATE{pids_groups = PidsGroups2} = State2, Monitors2, Effects2} = + rabbit_stream_sac_coordinator:ensure_monitors(Command1, + State1, + Monitors1, + []), + assertSize(1, PidsGroups2), + assertSize(1, maps:get(self(), PidsGroups2)), + ?assertEqual(#{self() => sac}, Monitors2), + ?assertEqual([{monitor, process, self()}, {monitor, node, node()}], + Effects2), + Group2 = cgroup([consumer(self(), 1, true)]), + + Command2 = + unregister_consumer_command(<<"stream">>, <<"app">>, self(), 0), + + {#?STATE{pids_groups = PidsGroups3} = State3, Monitors3, Effects3} = + rabbit_stream_sac_coordinator:ensure_monitors(Command2, + State2#?STATE{groups = + #{GroupId + => + Group2}}, + Monitors2, + []), + assertSize(1, PidsGroups3), + assertSize(1, maps:get(self(), PidsGroups3)), + ?assertEqual(#{self() => sac}, Monitors3), + ?assertEqual([], Effects3), + + Command3 = + unregister_consumer_command(<<"stream">>, <<"app">>, self(), 1), + + {#?STATE{pids_groups = PidsGroups4} = _State4, Monitors4, Effects4} = + rabbit_stream_sac_coordinator:ensure_monitors(Command3, + State3#?STATE{groups = + #{}}, + Monitors3, + []), + assertEmpty(PidsGroups4), + assertEmpty(Monitors4), + ?assertEqual([{demonitor, process, self()}], Effects4), + + ok. + +handle_connection_down_test(_) -> + GroupId = {<<"/">>, <<"stream">>, <<"app">>}, + Pid0 = self(), + Pid1 = spawn(fun() -> ok end), + Group = + cgroup([consumer(Pid0, 0, true), consumer(Pid1, 1, false), + consumer(Pid0, 2, false)]), + State0 = + state(#{GroupId => Group}, + #{Pid0 => maps:from_list([{GroupId, true}]), + Pid1 => maps:from_list([{GroupId, true}])}), + + {#?STATE{pids_groups = PidsGroups1, groups = Groups1} = State1, + Effects1} = + rabbit_stream_sac_coordinator:handle_connection_down(Pid0, State0), + assertSize(1, PidsGroups1), + assertSize(1, maps:get(Pid1, PidsGroups1)), + assertSendMessageEffect(Pid1, 1, true, Effects1), + ?assertEqual(#{GroupId => cgroup([consumer(Pid1, 1, true)])}, + Groups1), + {#?STATE{pids_groups = PidsGroups2, groups = Groups2} = _State2, + Effects2} = + rabbit_stream_sac_coordinator:handle_connection_down(Pid1, State1), + assertEmpty(PidsGroups2), + assertEmpty(Effects2), + assertEmpty(Groups2), + + ok. + +assertSize(Expected, []) -> + ?assertEqual(Expected, 0); +assertSize(Expected, Map) when is_map(Map) -> + ?assertEqual(Expected, maps:size(Map)); +assertSize(Expected, List) when is_list(List) -> + ?assertEqual(Expected, length(List)). + +assertEmpty(Data) -> + assertSize(0, Data). + +consumer(Pid, SubId, Active) -> + #consumer{pid = Pid, + subscription_id = SubId, + owner = <<"owning connection label">>, + active = Active}. + +cgroup(Consumers) -> + cgroup(-1, Consumers). + +cgroup(PartitionIndex, Consumers) -> + #group{partition_index = PartitionIndex, consumers = Consumers}. + +state() -> + state(#{}). + +state(Groups) -> + state(Groups, #{}). + +state(Groups, PidsGroups) -> + #?STATE{groups = Groups, pids_groups = PidsGroups}. + +register_consumer_command(Stream, + PartitionIndex, + ConsumerName, + ConnectionPid, + SubId) -> + #command_register_consumer{vhost = <<"/">>, + stream = Stream, + partition_index = PartitionIndex, + consumer_name = ConsumerName, + connection_pid = ConnectionPid, + owner = <<"owning connection label">>, + subscription_id = SubId}. + +unregister_consumer_command(Stream, + ConsumerName, + ConnectionPid, + SubId) -> + #command_unregister_consumer{vhost = <<"/">>, + stream = Stream, + consumer_name = ConsumerName, + connection_pid = ConnectionPid, + subscription_id = SubId}. + +activate_consumer_command(Stream, ConsumerName) -> + #command_activate_consumer{vhost = <<"/">>, + stream = Stream, + consumer_name = ConsumerName}. + +assertSendMessageEffect(Pid, SubId, Active, [Effect]) -> + ?assertEqual({mod_call, + rabbit_stream_sac_coordinator, + send_message, + [Pid, + {sac, + {{subscription_id, SubId}, {active, Active}, + {extra, []}}}]}, + Effect). + +assertSendMessageSteppingDownEffect(Pid, SubId, [Effect]) -> + ?assertEqual({mod_call, + rabbit_stream_sac_coordinator, + send_message, + [Pid, + {sac, + {{subscription_id, SubId}, {active, false}, + {extra, [{stepping_down, true}]}}}]}, + Effect). diff --git a/deps/rabbitmq_management/priv/www/js/formatters.js b/deps/rabbitmq_management/priv/www/js/formatters.js index ca5ca58ecf9e..9ef81a8ac7d1 100644 --- a/deps/rabbitmq_management/priv/www/js/formatters.js +++ b/deps/rabbitmq_management/priv/www/js/formatters.js @@ -150,7 +150,9 @@ function fmt_features_short(obj) { return res; } -function fmt_activity_status(obj) { +function fmt_activity_status(obj, unknown) { + if (unknown == undefined) unknown = UNKNOWN_REPR; + if (obj == undefined) return unknown; return obj.replace('_', ' '); } diff --git a/deps/rabbitmq_stream/docs/PROTOCOL.adoc b/deps/rabbitmq_stream/docs/PROTOCOL.adoc index 532c86a2bc24..d87975823f3e 100644 --- a/deps/rabbitmq_stream/docs/PROTOCOL.adoc +++ b/deps/rabbitmq_stream/docs/PROTOCOL.adoc @@ -215,6 +215,12 @@ used to make the difference between a request (0) and a response (1). Example fo |Client |0x0019 |Yes + +|<> (experimental) +|Server +|0x0020 +|Yes + |=== === DeclarePublisher @@ -597,10 +603,11 @@ RouteQuery => Key Version CorrelationId RoutingKey SuperStream RoutingKey => string SuperStream => string -RouteResponse => Key Version CorrelationId [Stream] +RouteResponse => Key Version CorrelationId ResponseCode [Stream] Key => uint16 // 0x8018 Version => uint16 CorrelationId => uint32 + ResponseCode => uint16 Stream => string ``` @@ -615,13 +622,34 @@ PartitionsQuery => Key Version CorrelationId SuperStream CorrelationId => uint32 SuperStream => string -PartitionsResponse => Key Version CorrelationId [Stream] +PartitionsResponse => Key Version CorrelationId ResponseCode [Stream] Key => uint16 // 0x8019 Version => uint16 CorrelationId => uint32 + ResponseCode => uint16 Stream => string ``` +=== Consumer Update (experimental) + +``` +ConsumerUpdateQuery => Key Version CorrelationId SubscriptionId Active + Key => uint16 // 0x001a + Version => uint16 + CorrelationId => uint32 + SubscriptionId => uint8 + Active => uint8 (boolean, 0 = false, 1 = true) + +ConsumerUpdateResponse => Key Version CorrelationId ResponseCode OffsetSpecification + Key => uint16 // 0x801a + Version => uint16 + CorrelationId => uint32 + ResponseCode => uint16 + OffsetSpecification => OffsetType Offset + OffsetType => uint16 // 0 (none), 1 (first), 2 (last), 3 (next), 4 (offset), 5 (timestamp) + Offset => uint64 (for offset) | int64 (for timestamp) +``` + == Authentication Once a client is connected to the server, it initiates an authentication diff --git a/deps/rabbitmq_stream/src/Elixir.RabbitMQ.CLI.Ctl.Commands.ListStreamConsumerGroupsCommand.erl b/deps/rabbitmq_stream/src/Elixir.RabbitMQ.CLI.Ctl.Commands.ListStreamConsumerGroupsCommand.erl new file mode 100644 index 000000000000..5a0f8b5dcbe3 --- /dev/null +++ b/deps/rabbitmq_stream/src/Elixir.RabbitMQ.CLI.Ctl.Commands.ListStreamConsumerGroupsCommand.erl @@ -0,0 +1,119 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 2.0 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License +%% at https://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and +%% limitations under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is GoPivotal, Inc. +%% Copyright (c) 2022 VMware, Inc. or its affiliates. All rights reserved. + +-module('Elixir.RabbitMQ.CLI.Ctl.Commands.ListStreamConsumerGroupsCommand'). + +-include_lib("rabbitmq_stream_common/include/rabbit_stream.hrl"). + +-behaviour('Elixir.RabbitMQ.CLI.CommandBehaviour'). + +-ignore_xref([{'Elixir.RabbitMQ.CLI.DefaultOutput', output, 1}, + {'Elixir.RabbitMQ.CLI.Ctl.InfoKeys', prepare_info_keys, 1}, + {'Elixir.RabbitMQ.CLI.Ctl.InfoKeys', validate_info_keys, 2}, + {'Elixir.Enum', join, 2}]). + +-export([formatter/0, + scopes/0, + switches/0, + aliases/0, + usage/0, + usage_additional/0, + usage_doc_guides/0, + banner/2, + validate/2, + merge_defaults/2, + run/2, + output/2, + description/0, + help_section/0]). + +formatter() -> + 'Elixir.RabbitMQ.CLI.Formatters.PrettyTable'. + +scopes() -> + [ctl, diagnostics, streams]. + +switches() -> + [{verbose, boolean}]. + +aliases() -> + [{'V', verbose}]. + +description() -> + <<"Lists groups of stream single active consumers " + "for a vhost">>. + +help_section() -> + {plugin, stream}. + +validate(Args, _) -> + case 'Elixir.RabbitMQ.CLI.Ctl.InfoKeys':validate_info_keys(Args, + ?CONSUMER_GROUP_INFO_ITEMS) + of + {ok, _} -> + ok; + Error -> + Error + end. + +merge_defaults([], Opts) -> + merge_defaults([rabbit_data_coercion:to_binary(Item) + || Item <- ?CONSUMER_GROUP_INFO_ITEMS], + Opts); +merge_defaults(Args, Opts) -> + {Args, maps:merge(#{verbose => false, vhost => <<"/">>}, Opts)}. + +usage() -> + <<"list_stream_consumer_groups [--vhost " + "[ ...]">>. + +usage_additional() -> + Prefix = <<" must be one of ">>, + InfoItems = + 'Elixir.Enum':join( + lists:usort(?CONSUMER_GROUP_INFO_ITEMS), <<", ">>), + [{<<"">>, <>}]. + +usage_doc_guides() -> + [?STREAM_GUIDE_URL]. + +run(Args, + #{node := NodeName, + vhost := VHost, + timeout := Timeout, + verbose := Verbose}) -> + InfoKeys = + case Verbose of + true -> + ?CONSUMER_GROUP_INFO_ITEMS; + false -> + 'Elixir.RabbitMQ.CLI.Ctl.InfoKeys':prepare_info_keys(Args) + end, + + rabbit_misc:rpc_call(NodeName, + rabbit_stream_sac_coordinator, + consumer_groups, + [VHost, InfoKeys], + Timeout). + +banner(_, _) -> + <<"Listing stream consumer groups ...">>. + +output({ok, []}, _Opts) -> + ok; +output([], _Opts) -> + ok; +output(Result, _Opts) -> + 'Elixir.RabbitMQ.CLI.DefaultOutput':output(Result). diff --git a/deps/rabbitmq_stream/src/Elixir.RabbitMQ.CLI.Ctl.Commands.ListStreamGroupConsumersCommand.erl b/deps/rabbitmq_stream/src/Elixir.RabbitMQ.CLI.Ctl.Commands.ListStreamGroupConsumersCommand.erl new file mode 100644 index 000000000000..390b1fc7e2b6 --- /dev/null +++ b/deps/rabbitmq_stream/src/Elixir.RabbitMQ.CLI.Ctl.Commands.ListStreamGroupConsumersCommand.erl @@ -0,0 +1,127 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 2.0 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License +%% at https://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and +%% limitations under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is GoPivotal, Inc. +%% Copyright (c) 2022 VMware, Inc. or its affiliates. All rights reserved. + +-module('Elixir.RabbitMQ.CLI.Ctl.Commands.ListStreamGroupConsumersCommand'). + +-include_lib("rabbitmq_stream_common/include/rabbit_stream.hrl"). + +-behaviour('Elixir.RabbitMQ.CLI.CommandBehaviour'). + +-ignore_xref([{'Elixir.RabbitMQ.CLI.DefaultOutput', output, 1}, + {'Elixir.RabbitMQ.CLI.Ctl.InfoKeys', prepare_info_keys, 1}, + {'Elixir.RabbitMQ.CLI.Ctl.InfoKeys', validate_info_keys, 2}, + {'Elixir.Enum', join, 2}]). + +-export([formatter/0, + scopes/0, + switches/0, + aliases/0, + usage/0, + usage_additional/0, + usage_doc_guides/0, + banner/2, + validate/2, + merge_defaults/2, + run/2, + output/2, + description/0, + help_section/0]). + +formatter() -> + 'Elixir.RabbitMQ.CLI.Formatters.PrettyTable'. + +scopes() -> + [ctl, diagnostics, streams]. + +switches() -> + [{verbose, boolean}, {stream, string}, {reference, string}]. + +aliases() -> + [{'V', verbose}]. + +description() -> + <<"Lists consumers of a stream consumer group in " + "a vhost">>. + +help_section() -> + {plugin, stream}. + +validate(Args, #{stream := _, reference := _}) -> + case 'Elixir.RabbitMQ.CLI.Ctl.InfoKeys':validate_info_keys(Args, + ?GROUP_CONSUMER_INFO_ITEMS) + of + {ok, _} -> + ok; + Error -> + Error + end; +validate(_, _) -> + {validation_failure, not_enough_args}. + +merge_defaults([], Opts) -> + merge_defaults([rabbit_data_coercion:to_binary(Item) + || Item <- ?GROUP_CONSUMER_INFO_ITEMS], + Opts); +merge_defaults(Args, Opts) -> + {Args, maps:merge(#{verbose => false, vhost => <<"/">>}, Opts)}. + +usage() -> + <<"list_stream_consumer_groups --stream " + "--reference [--vhost [ " + "...]">>. + +usage_additional() -> + Prefix = <<" must be one of ">>, + InfoItems = + 'Elixir.Enum':join( + lists:usort(?GROUP_CONSUMER_INFO_ITEMS), <<", ">>), + [{<<"">>, <>}]. + +usage_doc_guides() -> + [?STREAM_GUIDE_URL]. + +run(Args, + #{node := NodeName, + vhost := VHost, + stream := Stream, + reference := Reference, + timeout := Timeout, + verbose := Verbose}) -> + InfoKeys = + case Verbose of + true -> + ?GROUP_CONSUMER_INFO_ITEMS; + false -> + 'Elixir.RabbitMQ.CLI.Ctl.InfoKeys':prepare_info_keys(Args) + end, + + rabbit_misc:rpc_call(NodeName, + rabbit_stream_sac_coordinator, + group_consumers, + [VHost, Stream, Reference, InfoKeys], + Timeout). + +banner(_, _) -> + <<"Listing group consumers ...">>. + +output({ok, []}, _Opts) -> + ok; +output([], _Opts) -> + ok; +output({error, not_found}, _Opts) -> + 'Elixir.RabbitMQ.CLI.DefaultOutput':output({error_string, + <<"The group does not exist">>}); +output(Result, _Opts) -> + 'Elixir.RabbitMQ.CLI.DefaultOutput':output(Result). diff --git a/deps/rabbitmq_stream/src/rabbit_stream_manager.erl b/deps/rabbitmq_stream/src/rabbit_stream_manager.erl index 6f8758ce8de8..19ca6c368a0f 100644 --- a/deps/rabbitmq_stream/src/rabbit_stream_manager.erl +++ b/deps/rabbitmq_stream/src/rabbit_stream_manager.erl @@ -35,7 +35,8 @@ lookup_local_member/2, topology/2, route/3, - partitions/2]). + partitions/2, + partition_index/3]). -record(state, {configuration}). @@ -118,6 +119,12 @@ route(RoutingKey, VirtualHost, SuperStream) -> partitions(VirtualHost, SuperStream) -> gen_server:call(?MODULE, {partitions, VirtualHost, SuperStream}). +-spec partition_index(binary(), binary(), binary()) -> + {ok, integer()} | {error, stream_not_found}. +partition_index(VirtualHost, SuperStream, Stream) -> + gen_server:call(?MODULE, + {partition_index, VirtualHost, SuperStream, Stream}). + stream_queue_arguments(Arguments) -> stream_queue_arguments([{<<"x-queue-type">>, longstr, <<"stream">>}], Arguments). @@ -418,13 +425,67 @@ handle_call({route, RoutingKey, VirtualHost, SuperStream}, _From, catch exit:Error -> rabbit_log:error("Error while looking up exchange ~p, ~p", - [ExchangeName, Error]), + [rabbit_misc:rs(ExchangeName), Error]), {error, stream_not_found} end, {reply, Res, State}; handle_call({partitions, VirtualHost, SuperStream}, _From, State) -> Res = super_stream_partitions(VirtualHost, SuperStream), {reply, Res, State}; +handle_call({partition_index, VirtualHost, SuperStream, Stream}, + _From, State) -> + ExchangeName = rabbit_misc:r(VirtualHost, exchange, SuperStream), + rabbit_log:debug("Looking for partition index of stream ~p in super " + "stream ~p (virtual host ~p)", + [Stream, SuperStream, VirtualHost]), + Res = try + rabbit_exchange:lookup_or_die(ExchangeName), + UnorderedBindings = + [Binding + || Binding = #binding{destination = #resource{name = Q} = D} + <- rabbit_binding:list_for_source(ExchangeName), + is_resource_stream_queue(D), Q == Stream], + OrderedBindings = + rabbit_stream_utils:sort_partitions(UnorderedBindings), + rabbit_log:debug("Bindings: ~p", [OrderedBindings]), + case OrderedBindings of + [] -> + {error, stream_not_found}; + Bindings -> + Binding = lists:nth(1, Bindings), + #binding{args = Args} = Binding, + case rabbit_misc:table_lookup(Args, + <<"x-stream-partition-order">>) + of + {_, Order} -> + Index = rabbit_data_coercion:to_integer(Order), + {ok, Index}; + _ -> + Pattern = <<"-">>, + Size = byte_size(Pattern), + case string:find(Stream, Pattern, trailing) of + nomatch -> + {ok, -1}; + <> -> + try + Index = binary_to_integer(Rest), + {ok, Index} + catch + error:_ -> + {ok, -1} + end; + _ -> + {ok, -1} + end + end + end + catch + exit:Error -> + rabbit_log:error("Error while looking up exchange ~p, ~p", + [ExchangeName, Error]), + {error, stream_not_found} + end, + {reply, Res, State}; handle_call(which_children, _From, State) -> {reply, [], State}. diff --git a/deps/rabbitmq_stream/src/rabbit_stream_metrics.erl b/deps/rabbitmq_stream/src/rabbit_stream_metrics.erl index 5096f6500d08..b19eff6a7c23 100644 --- a/deps/rabbitmq_stream/src/rabbit_stream_metrics.erl +++ b/deps/rabbitmq_stream/src/rabbit_stream_metrics.erl @@ -20,8 +20,8 @@ %% API -export([init/0]). --export([consumer_created/8, - consumer_updated/8, +-export([consumer_created/9, + consumer_updated/9, consumer_cancelled/3]). -export([publisher_created/4, publisher_updated/7, @@ -41,12 +41,16 @@ consumer_created(Connection, MessageCount, Offset, OffsetLag, + Active, Properties) -> Values = [{credits, Credits}, {consumed, MessageCount}, {offset, Offset}, {offset_lag, OffsetLag}, + {active, Active}, + {activity_status, + rabbit_stream_utils:consumer_activity_status(Active, Properties)}, {properties, Properties}], ets:insert(?TABLE_CONSUMER, {{StreamResource, Connection, SubscriptionId}, Values}), @@ -57,8 +61,9 @@ consumer_created(Connection, false, StreamResource, 0, - true, - up, + Active, + rabbit_stream_utils:consumer_activity_status(Active, + Properties), rabbit_misc:to_amqp_table(Properties)), ok. @@ -73,15 +78,30 @@ consumer_updated(Connection, MessageCount, Offset, OffsetLag, + Active, Properties) -> Values = [{credits, Credits}, {consumed, MessageCount}, {offset, Offset}, {offset_lag, OffsetLag}, + {active, Active}, + {activity_status, + rabbit_stream_utils:consumer_activity_status(Active, Properties)}, {properties, Properties}], ets:insert(?TABLE_CONSUMER, {{StreamResource, Connection, SubscriptionId}, Values}), + rabbit_core_metrics:consumer_updated(Connection, + consumer_tag(SubscriptionId), + false, + false, + StreamResource, + 0, + Active, + rabbit_stream_utils:consumer_activity_status(Active, + Properties), + rabbit_misc:to_amqp_table(Properties)), + ok. consumer_cancelled(Connection, StreamResource, SubscriptionId) -> diff --git a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl index 09bd692fa7eb..610eca7528cf 100644 --- a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl +++ b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl @@ -39,11 +39,12 @@ stream :: stream(), offset :: osiris:offset(), counters :: atomics:atomics_ref(), - properties :: map()}). + properties :: map(), + active :: boolean()}). -record(consumer, {configuration :: #consumer_configuration{}, credit :: non_neg_integer(), - log :: osiris_log:state(), + log :: undefined | osiris_log:state(), last_listener_offset = undefined :: undefined | osiris:offset()}). -record(stream_connection_state, {data :: rabbit_stream_core:state(), blocked :: boolean(), @@ -85,7 +86,9 @@ send_file_oct :: atomics:atomics_ref(), % number of bytes sent with send_file (for metrics) transport :: tcp | ssl, - proxy_socket :: undefined | ranch_proxy:proxy_socket()}). + proxy_socket :: undefined | ranch_proxy:proxy_socket(), + correlation_id_sequence :: integer(), + outstanding_requests :: #{integer() => term()}}). -record(configuration, {initial_credits :: integer(), credits_required_for_unblocking :: integer(), @@ -155,7 +158,8 @@ consumers_info/2, publishers_info/2, in_vhost/2]). --export([resource_alarm/3]). +-export([resource_alarm/3, + single_active_consumer/1]). %% gen_statem callbacks -export([callback_mode/0, terminate/3, @@ -177,10 +181,10 @@ callback_mode() -> terminate(Reason, State, #statem_data{transport = Transport, - connection = #stream_connection{socket = Socket}, + connection = Connection, connection_state = ConnectionState} = StatemData) -> - close(Transport, Socket, ConnectionState), + close(Transport, Connection, ConnectionState), rabbit_networking:unregister_non_amqp_connection(self()), notify_connection_closed(StatemData), rabbit_log:debug("~s terminating in state '~s' with reason '~W'", @@ -237,7 +241,9 @@ init([KeepaliveSup, send_file_oct = SendFileOct, transport = ConnTransport, proxy_socket = - rabbit_net:maybe_get_proxy_socket(Sock)}, + rabbit_net:maybe_get_proxy_socket(Sock), + correlation_id_sequence = 0, + outstanding_requests = #{}}, State = #stream_connection_state{consumers = #{}, blocked = false, @@ -641,10 +647,24 @@ augment_infos_with_user_provided_connection_name(Infos, Infos end. -close(Transport, S, +close(Transport, + #stream_connection{socket = S, virtual_host = VirtualHost}, #stream_connection_state{consumers = Consumers}) -> - [osiris_log:close(Log) - || #consumer{log = Log} <- maps:values(Consumers)], + [begin + maybe_unregister_consumer(VirtualHost, Consumer, + single_active_consumer(Properties)), + case Log of + undefined -> + ok; %% segment may not be defined on subscription (single active consumer) + L -> + osiris_log:close(L) + end + end + || #consumer{log = Log, + configuration = + #consumer_configuration{properties = Properties}} = + Consumer + <- maps:values(Consumers)], Transport:shutdown(S, write), Transport:close(S). @@ -757,6 +777,70 @@ open(info, {OK, S, Data}, StatemData#statem_data{connection = Connection1, connection_state = State2}} end; +open(info, + {sac, {{subscription_id, SubId}, {active, Active}, {extra, Extra}}}, + #statem_data{transport = Transport, + connection = Connection0, + connection_state = ConnState0} = + State) -> + rabbit_log:debug("Subscription ~p instructed to become active: ~p", + [SubId, Active]), + #stream_connection_state{consumers = Consumers0} = ConnState0, + {Connection1, ConnState1} = + case Consumers0 of + #{SubId := + #consumer{configuration = + #consumer_configuration{properties = + Properties} = + Conf0, + log = Log0} = + Consumer0} -> + case single_active_consumer(Properties) of + true -> + Log1 = + case {Active, Log0} of + {false, undefined} -> + undefined; + {false, L} -> + rabbit_log:debug("Closing Osiris segment of subscription ~p for " + "now", + [SubId]), + osiris_log:close(L), + undefined; + _ -> + Log0 + end, + Consumer1 = + Consumer0#consumer{configuration = + Conf0#consumer_configuration{active + = + Active}, + log = Log1}, + + Conn1 = + maybe_send_consumer_update(Transport, + Connection0, + SubId, + Active, + true, + Extra), + {Conn1, + ConnState0#stream_connection_state{consumers = + Consumers0#{SubId + => + Consumer1}}}; + false -> + rabbit_log:warning("Received SAC event for subscription ~p, which " + "is not a SAC. Not doing anything.", + [SubId]), + {Connection0, ConnState0} + end; + _ -> + {Connection0, ConnState0} + end, + {keep_state, + State#statem_data{connection = Connection1, + connection_state = ConnState1}}; open(info, {Closed, Socket}, #statem_data{connection = Connection}) when Closed =:= tcp_closed; Closed =:= ssl_closed -> demonitor_all_streams(Connection), @@ -995,15 +1079,18 @@ open(cast, maps:remove(StreamName, StreamSubscriptions)}, State}; - CorrelationIds when is_list(CorrelationIds) -> + SubscriptionIds when is_list(SubscriptionIds) -> Consumers1 = - lists:foldl(fun(CorrelationId, ConsumersAcc) -> - #{CorrelationId := Consumer} = ConsumersAcc, - #consumer{credit = Credit} = Consumer, + lists:foldl(fun(SubscriptionId, ConsumersAcc) -> + #{SubscriptionId := Consumer} = ConsumersAcc, + #consumer{credit = Credit, log = Log} = + Consumer, Consumer1 = - case Credit of - 0 -> Consumer; - _ -> + case {Credit, Log} of + {_, undefined} -> + Consumer; %% SAC not active + {0, _} -> Consumer; + {_, _} -> case send_chunks(Transport, Consumer, SendFileOct) @@ -1021,9 +1108,9 @@ open(cast, {ok, Csmr} -> Csmr end end, - ConsumersAcc#{CorrelationId => Consumer1} + ConsumersAcc#{SubscriptionId => Consumer1} end, - Consumers, CorrelationIds), + Consumers, SubscriptionIds), {Connection, State#stream_connection_state{consumers = Consumers1}} end, @@ -1721,7 +1808,8 @@ handle_frame_post_auth(Transport, {Connection0, State} end; handle_frame_post_auth(Transport, - #stream_connection{socket = Socket, + #stream_connection{name = ConnName, + socket = Socket, stream_subscriptions = StreamSubscriptions, virtual_host = VirtualHost, @@ -1789,70 +1877,106 @@ handle_frame_post_auth(Transport, Stream, OffsetSpec, Properties]), - CounterSpec = - {{?MODULE, - QueueResource, - SubscriptionId, - self()}, - []}, - Options = - #{transport => ConnTransport, - chunk_selector => - get_chunk_selector(Properties)}, - {ok, Log} = - osiris:init_reader(LocalMemberPid, - OffsetSpec, - CounterSpec, - Options), - rabbit_log:debug("Next offset for subscription ~p is ~p", - [SubscriptionId, - osiris_log:next_offset(Log)]), - ConsumerCounters = - atomics:new(2, [{signed, false}]), - ConsumerConfiguration = - #consumer_configuration{member_pid = - LocalMemberPid, - subscription_id = - SubscriptionId, - socket = Socket, - stream = Stream, - offset = OffsetSpec, - counters = - ConsumerCounters, - properties = - Properties}, - ConsumerState = - #consumer{configuration = ConsumerConfiguration, - log = Log, - credit = Credit}, - - Connection1 = - maybe_monitor_stream(LocalMemberPid, Stream, - Connection), - - response_ok(Transport, - Connection, - subscribe, - CorrelationId), - - rabbit_log:debug("Distributing existing messages to subscription ~p", - [SubscriptionId]), - - case send_chunks(Transport, ConsumerState, - SendFileOct) + Sac = single_active_consumer(Properties), + ConsumerName = consumer_name(Properties), + %% TODO check consumer name is defined when SAC + case {Sac, rabbit_stream_utils:is_sac_ff_enabled(), + ConsumerName} of - {error, closed} -> - rabbit_log_connection:info("Stream protocol connection has been closed by " - "peer", - []), - throw({stop, normal}); - {ok, - #consumer{log = Log1, credit = Credit1} = - ConsumerState1} -> - Consumers1 = - Consumers#{SubscriptionId => - ConsumerState1}, - + {true, false, _} -> + rabbit_log:warning("Cannot create subcription ~p, stream single active " + "consumer feature flag is not enabled", + [SubscriptionId]), + response(Transport, + Connection, + subscribe, + CorrelationId, + ?RESPONSE_CODE_PRECONDITION_FAILED), + rabbit_global_counters:increase_protocol_counter(stream, + ?PRECONDITION_FAILED, + 1), + {Connection, State}; + {true, _, undefined} -> + rabbit_log:warning("Cannot create subcription ~p, a single active " + "consumer must have a name", + [SubscriptionId]), + response(Transport, + Connection, + subscribe, + CorrelationId, + ?RESPONSE_CODE_PRECONDITION_FAILED), + rabbit_global_counters:increase_protocol_counter(stream, + ?PRECONDITION_FAILED, + 1), + {Connection, State}; + _ -> + Log = case Sac of + true -> + undefined; + false -> + init_reader(ConnTransport, + LocalMemberPid, + QueueResource, + SubscriptionId, + Properties, + OffsetSpec) + end, + + ConsumerCounters = + atomics:new(2, [{signed, false}]), + + response_ok(Transport, + Connection, + subscribe, + CorrelationId), + + Active = + maybe_register_consumer(VirtualHost, + Stream, + ConsumerName, + ConnName, + SubscriptionId, + Properties, + Sac), + + ConsumerConfiguration = + #consumer_configuration{member_pid = + LocalMemberPid, + subscription_id + = + SubscriptionId, + socket = Socket, + stream = Stream, + offset = + OffsetSpec, + counters = + ConsumerCounters, + properties = + Properties, + active = + Active}, + ConsumerState = + #consumer{configuration = + ConsumerConfiguration, + log = Log, + credit = Credit}, + + Connection1 = + maybe_monitor_stream(LocalMemberPid, + Stream, + Connection), + + State1 = + maybe_dispatch_on_subscription(Transport, + State, + ConsumerState, + Connection1, + Consumers, + Stream, + SubscriptionId, + Properties, + SendFileOct, + Sac), StreamSubscriptions1 = case StreamSubscriptions of #{Stream := SubscriptionIds} -> @@ -1863,38 +1987,10 @@ handle_frame_post_auth(Transport, StreamSubscriptions#{Stream => [SubscriptionId]} end, - - #consumer{configuration = - #consumer_configuration{counters - = - ConsumerCounters1}} = - ConsumerState1, - - ConsumerOffset = - osiris_log:next_offset(Log1), - ConsumerOffsetLag = - consumer_i(offset_lag, ConsumerState1), - - rabbit_log:debug("Subscription ~p is now at offset ~p with ~p message(s) " - "distributed after subscription", - [SubscriptionId, - ConsumerOffset, - messages_consumed(ConsumerCounters1)]), - - rabbit_stream_metrics:consumer_created(self(), - stream_r(Stream, - Connection1), - SubscriptionId, - Credit1, - messages_consumed(ConsumerCounters1), - ConsumerOffset, - ConsumerOffsetLag, - Properties), {Connection1#stream_connection{stream_subscriptions = StreamSubscriptions1}, - State#stream_connection_state{consumers = - Consumers1}} + State1} end end end; @@ -1916,6 +2012,21 @@ handle_frame_post_auth(Transport, #stream_connection_state{consumers = Consumers} = State, {credit, SubscriptionId, Credit}) -> case Consumers of + #{SubscriptionId := #consumer{log = undefined}} -> + %% the consumer is not active, it's likely to be credit leftovers + %% from a formerly active consumer, just logging and send an error + rabbit_log:debug("Giving credit to an inactive consumer: ~p", + [SubscriptionId]), + + Code = ?RESPONSE_CODE_PRECONDITION_FAILED, + Frame = + rabbit_stream_core:frame({response, 1, + {credit, Code, SubscriptionId}}), + send(Transport, S, Frame), + rabbit_global_counters:increase_protocol_counter(stream, + ?PRECONDITION_FAILED, + 1), + {Connection, State}; #{SubscriptionId := Consumer} -> #consumer{credit = AvailableCredit, last_listener_offset = LLO} = Consumer, @@ -2376,6 +2487,137 @@ handle_frame_post_auth(Transport, FrameSize = byte_size(Frame), Transport:send(S, <>), {Connection, State}; +handle_frame_post_auth(Transport, + #stream_connection{transport = ConnTransport, + outstanding_requests = Requests0, + send_file_oct = SendFileOct, + virtual_host = VirtualHost} = + Connection, + #stream_connection_state{consumers = Consumers} = State, + {response, CorrelationId, + {consumer_update, ResponseCode, ResponseOffsetSpec}}) -> + case ResponseCode of + ?RESPONSE_CODE_OK -> + ok; + RC -> + rabbit_log:info("Unexpected consumer update response code: ~p", + [RC]) + end, + case maps:take(CorrelationId, Requests0) of + {{{subscription_id, SubscriptionId}, {extra, Extra}}, Rs} -> + rabbit_log:debug("Received consumer update response for subscription ~p", + [SubscriptionId]), + Consumers1 = + case Consumers of + #{SubscriptionId := + #consumer{configuration = + #consumer_configuration{active = + true}} = + Consumer} -> + %% active, dispatch messages + #consumer{configuration = + #consumer_configuration{properties = + Properties, + member_pid = + LocalMemberPid, + offset = + SubscriptionOffsetSpec, + stream = + Stream}} = + Consumer, + + OffsetSpec = + case ResponseOffsetSpec of + none -> + SubscriptionOffsetSpec; + ROS -> + ROS + end, + + rabbit_log:debug("Initializing reader for active consumer, offset " + "spec is ~p", + [OffsetSpec]), + QueueResource = + #resource{name = Stream, + kind = queue, + virtual_host = VirtualHost}, + + Segment = + init_reader(ConnTransport, + LocalMemberPid, + QueueResource, + SubscriptionId, + Properties, + OffsetSpec), + Consumer1 = Consumer#consumer{log = Segment}, + Consumer2 = + case send_chunks(Transport, Consumer1, SendFileOct) + of + {error, closed} -> + rabbit_log_connection:info("Stream protocol connection has been closed by " + "peer", + []), + throw({stop, normal}); + {error, Reason} -> + rabbit_log_connection:info("Error while sending chunks: ~p", + [Reason]), + %% likely a connection problem + Consumer; + {ok, Csmr} -> + Csmr + end, + #consumer{configuration = + #consumer_configuration{counters = + ConsumerCounters}, + log = Log2} = + Consumer2, + ConsumerOffset = osiris_log:next_offset(Log2), + + rabbit_log:debug("Subscription ~p is now at offset ~p with ~p message(s) " + "distributed after subscription", + [SubscriptionId, ConsumerOffset, + messages_consumed(ConsumerCounters)]), + + Consumers#{SubscriptionId => Consumer2}; + #{SubscriptionId := + #consumer{configuration = + #consumer_configuration{active = false, + stream = Stream, + properties = + Properties}}} -> + rabbit_log:debug("Not an active consumer"), + + case Extra of + [{stepping_down, true}] -> + ConsumerName = consumer_name(Properties), + rabbit_stream_sac_coordinator:activate_consumer(VirtualHost, + Stream, + ConsumerName); + _ -> + ok + end, + + Consumers; + _ -> + rabbit_log:debug("No consumer found for subscription ~p", + [SubscriptionId]), + Consumers + end, + + {Connection#stream_connection{outstanding_requests = Rs}, + State#stream_connection_state{consumers = Consumers1}}; + {V, _Rs} -> + rabbit_log:warning("Unexpected outstanding requests for correlation " + "ID ~p: ~p", + [CorrelationId, V]), + {Connection, State}; + error -> + rabbit_log:warning("Could not find outstanding consumer update request " + "with correlation ID ~p. No actions taken for " + "the subscription.", + [CorrelationId]), + {Connection, State} + end; handle_frame_post_auth(Transport, #stream_connection{socket = S} = Connection, State, @@ -2409,6 +2651,188 @@ handle_frame_post_auth(Transport, ?UNKNOWN_FRAME, 1), {Connection#stream_connection{connection_step = close_sent}, State}. +init_reader(ConnectionTransport, + LocalMemberPid, + QueueResource, + SubscriptionId, + Properties, + OffsetSpec) -> + CounterSpec = {{?MODULE, QueueResource, SubscriptionId, self()}, []}, + Options = + #{transport => ConnectionTransport, + chunk_selector => get_chunk_selector(Properties)}, + {ok, Segment} = + osiris:init_reader(LocalMemberPid, OffsetSpec, CounterSpec, Options), + rabbit_log:debug("Next offset for subscription ~p is ~p", + [SubscriptionId, osiris_log:next_offset(Segment)]), + Segment. + +single_active_consumer(#{<<"single-active-consumer">> := + <<"true">>}) -> + true; +single_active_consumer(_Properties) -> + false. + +consumer_name(#{<<"name">> := Name}) -> + Name; +consumer_name(_Properties) -> + undefined. + +maybe_dispatch_on_subscription(Transport, + State, + ConsumerState, + Connection, + Consumers, + Stream, + SubscriptionId, + SubscriptionProperties, + SendFileOct, + false = _Sac) -> + rabbit_log:debug("Distributing existing messages to subscription ~p", + [SubscriptionId]), + case send_chunks(Transport, ConsumerState, SendFileOct) of + {error, closed} -> + rabbit_log_connection:info("Stream protocol connection has been closed by " + "peer", + []), + throw({stop, normal}); + {ok, #consumer{log = Log1, credit = Credit1} = ConsumerState1} -> + Consumers1 = Consumers#{SubscriptionId => ConsumerState1}, + + #consumer{configuration = + #consumer_configuration{counters = + ConsumerCounters1}} = + ConsumerState1, + + ConsumerOffset = osiris_log:next_offset(Log1), + ConsumerOffsetLag = consumer_i(offset_lag, ConsumerState1), + + rabbit_log:debug("Subscription ~p is now at offset ~p with ~p message(s) " + "distributed after subscription", + [SubscriptionId, ConsumerOffset, + messages_consumed(ConsumerCounters1)]), + + rabbit_stream_metrics:consumer_created(self(), + stream_r(Stream, Connection), + SubscriptionId, + Credit1, + messages_consumed(ConsumerCounters1), + ConsumerOffset, + ConsumerOffsetLag, + true, + SubscriptionProperties), + State#stream_connection_state{consumers = Consumers1} + end; +maybe_dispatch_on_subscription(_Transport, + State, + ConsumerState, + Connection, + Consumers, + Stream, + SubscriptionId, + SubscriptionProperties, + _SendFileOct, + true = _Sac) -> + rabbit_log:debug("No initial dispatch for subscription ~p for now, " + "waiting for consumer update response from client " + "(single active consumer)", + [SubscriptionId]), + #consumer{credit = Credit, + configuration = + #consumer_configuration{offset = Offset, active = Active}} = + ConsumerState, + + rabbit_stream_metrics:consumer_created(self(), + stream_r(Stream, Connection), + SubscriptionId, + Credit, + 0, %% messages consumed + Offset, + 0, %% offset lag + Active, + SubscriptionProperties), + Consumers1 = Consumers#{SubscriptionId => ConsumerState}, + State#stream_connection_state{consumers = Consumers1}. + +maybe_register_consumer(_, _, _, _, _, _, false = _Sac) -> + true; +maybe_register_consumer(VirtualHost, + Stream, + ConsumerName, + ConnectionName, + SubscriptionId, + Properties, + true) -> + PartitionIndex = partition_index(VirtualHost, Stream, Properties), + {ok, Active} = + rabbit_stream_sac_coordinator:register_consumer(VirtualHost, + Stream, + PartitionIndex, + ConsumerName, + self(), + ConnectionName, + SubscriptionId), + Active. + +maybe_send_consumer_update(_, Connection, _, _, false = _Sac, _) -> + Connection; +maybe_send_consumer_update(Transport, + #stream_connection{socket = S, + correlation_id_sequence = CorrIdSeq, + outstanding_requests = + OutstandingRequests0} = + Connection, + SubscriptionId, + Active, + true = _Sac, + Extra) -> + rabbit_log:debug("SAC subscription ~p, active = ~p", + [SubscriptionId, Active]), + Frame = + rabbit_stream_core:frame({request, CorrIdSeq, + {consumer_update, SubscriptionId, Active}}), + + OutstandingRequests1 = + maps:put(CorrIdSeq, + {{subscription_id, SubscriptionId}, {extra, Extra}}, + OutstandingRequests0), + send(Transport, S, Frame), + Connection#stream_connection{correlation_id_sequence = CorrIdSeq + 1, + outstanding_requests = OutstandingRequests1}. + +maybe_unregister_consumer(_, _, false = _Sac) -> + ok; +maybe_unregister_consumer(VirtualHost, + #consumer{configuration = + #consumer_configuration{stream = Stream, + properties = + Properties, + subscription_id + = + SubscriptionId}}, + true = _Sac) -> + ConsumerName = consumer_name(Properties), + rabbit_stream_sac_coordinator:unregister_consumer(VirtualHost, + Stream, + ConsumerName, + self(), + SubscriptionId). + +partition_index(VirtualHost, Stream, Properties) -> + case Properties of + #{<<"super-stream">> := SuperStream} -> + case rabbit_stream_manager:partition_index(VirtualHost, SuperStream, + Stream) + of + {ok, Index} -> + Index; + _ -> + -1 + end; + _ -> + -1 + end. + notify_connection_closed(#statem_data{connection = #stream_connection{name = Name, publishers = @@ -2459,7 +2883,9 @@ stream_r(Stream, #stream_connection{virtual_host = VHost}) -> virtual_host = VHost}. clean_state_after_stream_deletion_or_failure(Stream, - #stream_connection{stream_subscriptions + #stream_connection{virtual_host = + VirtualHost, + stream_subscriptions = StreamSubscriptions, publishers = @@ -2478,9 +2904,15 @@ clean_state_after_stream_deletion_or_failure(Stream, case stream_has_subscriptions(Stream, C0) of true -> #{Stream := SubscriptionIds} = StreamSubscriptions, - [rabbit_stream_metrics:consumer_cancelled(self(), - stream_r(Stream, C0), - SubId) + [begin + rabbit_stream_metrics:consumer_cancelled(self(), + stream_r(Stream, + C0), + SubId), + #{SubId := Consumer} = Consumers, + maybe_unregister_consumer(VirtualHost, Consumer, + single_active_consumer(Consumer#consumer.configuration#consumer_configuration.properties)) + end || SubId <- SubscriptionIds], {true, C0#stream_connection{stream_subscriptions = @@ -2561,7 +2993,8 @@ lookup_leader_from_manager(VirtualHost, Stream) -> rabbit_stream_manager:lookup_leader(VirtualHost, Stream). remove_subscription(SubscriptionId, - #stream_connection{stream_subscriptions = + #stream_connection{virtual_host = VirtualHost, + stream_subscriptions = StreamSubscriptions} = Connection, #stream_connection_state{consumers = Consumers} = State) -> @@ -2587,6 +3020,8 @@ remove_subscription(SubscriptionId, rabbit_stream_metrics:consumer_cancelled(self(), stream_r(Stream, Connection2), SubscriptionId), + maybe_unregister_consumer(VirtualHost, Consumer, + single_active_consumer(Consumer#consumer.configuration#consumer_configuration.properties)), {Connection2, State#stream_connection_state{consumers = Consumers1}}. maybe_clean_connection_from_stream(Stream, @@ -2811,11 +3246,13 @@ emit_stats(#stream_connection{publishers = Publishers} = Connection, messages_consumed(Counters), consumer_offset(Counters), consumer_i(offset_lag, Consumer), + Active, Properties) || #consumer{configuration = #consumer_configuration{stream = S, subscription_id = Id, counters = Counters, + active = Active, properties = Properties}, credit = Credit} = Consumer @@ -2873,6 +3310,8 @@ consumer_i(offset, #consumer{configuration = #consumer_configuration{counters = Counters}}) -> consumer_offset(Counters); +consumer_i(offset_lag, #consumer{log = undefined}) -> + 0; consumer_i(offset_lag, #consumer{configuration = #consumer_configuration{counters = Counters}, @@ -2888,6 +3327,15 @@ consumer_i(stream, #consumer{configuration = #consumer_configuration{stream = Stream}}) -> Stream; +consumer_i(active, + #consumer{configuration = + #consumer_configuration{active = Active}}) -> + Active; +consumer_i(activity_status, + #consumer{configuration = + #consumer_configuration{active = Active, + properties = Properties}}) -> + rabbit_stream_utils:consumer_activity_status(Active, Properties); consumer_i(_Unknown, _) -> ?UNKNOWN_FIELD. diff --git a/deps/rabbitmq_stream/src/rabbit_stream_utils.erl b/deps/rabbitmq_stream/src/rabbit_stream_utils.erl index 67fb15ceffa1..62bee75906ce 100644 --- a/deps/rabbitmq_stream/src/rabbit_stream_utils.erl +++ b/deps/rabbitmq_stream/src/rabbit_stream_utils.erl @@ -27,7 +27,9 @@ check_read_permitted/3, extract_stream_list/2, sort_partitions/1, - strip_cr_lf/1]). + strip_cr_lf/1, + is_sac_ff_enabled/0, + consumer_activity_status/2]). -define(MAX_PERMISSION_CACHE_SIZE, 12). @@ -240,3 +242,17 @@ sort_partitions(Partitions) -> strip_cr_lf(NameBin) -> binary:replace(NameBin, [<<"\n">>, <<"\r">>], <<"">>, [global]). + +is_sac_ff_enabled() -> + rabbit_feature_flags:is_enabled(stream_single_active_consumer). + +consumer_activity_status(Active, Properties) -> + case {rabbit_stream_reader:single_active_consumer(Properties), Active} + of + {false, true} -> + up; + {true, true} -> + single_active; + {true, false} -> + waiting + end. diff --git a/deps/rabbitmq_stream/test/commands_SUITE.erl b/deps/rabbitmq_stream/test/commands_SUITE.erl index 9df1ea72e00a..b0a5a18f72c5 100644 --- a/deps/rabbitmq_stream/test/commands_SUITE.erl +++ b/deps/rabbitmq_stream/test/commands_SUITE.erl @@ -2,7 +2,7 @@ %% License, v. 2.0. If a copy of the MPL was not distributed with this %% file, You can obtain one at https://mozilla.org/MPL/2.0/. %% -%% Copyright (c) 2007-2020-2022 VMware, Inc. or its affiliates. All rights reserved. +%% Copyright (c) 2020-2022 VMware, Inc. or its affiliates. All rights reserved. %% -module(commands_SUITE). @@ -27,11 +27,17 @@ 'Elixir.RabbitMQ.CLI.Ctl.Commands.AddSuperStreamCommand'). -define(COMMAND_DELETE_SUPER_STREAM, 'Elixir.RabbitMQ.CLI.Ctl.Commands.DeleteSuperStreamCommand'). +-define(COMMAND_LIST_CONSUMER_GROUPS, + 'Elixir.RabbitMQ.CLI.Ctl.Commands.ListStreamConsumerGroupsCommand'). +-define(COMMAND_LIST_GROUP_CONSUMERS, + 'Elixir.RabbitMQ.CLI.Ctl.Commands.ListStreamGroupConsumersCommand'). all() -> [{group, list_connections}, {group, list_consumers}, {group, list_publishers}, + {group, list_consumer_groups}, + {group, list_group_consumers}, {group, super_streams}]. groups() -> @@ -42,6 +48,11 @@ groups() -> [list_consumers_merge_defaults, list_consumers_run]}, {list_publishers, [], [list_publishers_merge_defaults, list_publishers_run]}, + {list_consumer_groups, [], + [list_consumer_groups_merge_defaults, list_consumer_groups_run]}, + {list_group_consumers, [], + [list_group_consumers_validate, list_group_consumers_merge_defaults, + list_group_consumers_run]}, {super_streams, [], [add_super_stream_merge_defaults, add_super_stream_validate, @@ -234,7 +245,7 @@ list_consumers_run(Config) -> to_list(?COMMAND_LIST_CONSUMERS:run([], Opts#{verbose => true})), ?assertEqual(AllKeys, Verbose), %% There are two consumers - [[First], [_Second]] = AllKeys, + [First, _Second] = AllKeys, %% Keys are info items ?assertEqual(length(InfoItems), length(First)), @@ -302,8 +313,8 @@ list_publishers_run(Config) -> Verbose = to_list(?COMMAND_LIST_PUBLISHERS:run([], Opts#{verbose => true})), ?assertEqual(AllKeys, Verbose), - %% There are two consumers - [[First], [_Second]] = AllKeys, + %% There are two publishers + [First, _Second] = AllKeys, %% Keys are info items ?assertEqual(length(InfoItems), length(First)), @@ -321,6 +332,195 @@ list_publishers_run(Config) -> ?awaitMatch(0, publisher_count(Config), ?WAIT), ok. +list_consumer_groups_merge_defaults(_Config) -> + DefaultItems = + [rabbit_data_coercion:to_binary(Item) + || Item <- ?CONSUMER_GROUP_INFO_ITEMS], + {DefaultItems, #{verbose := false, vhost := <<"/">>}} = + ?COMMAND_LIST_CONSUMER_GROUPS:merge_defaults([], #{}), + + {[<<"other_key">>], #{verbose := true, vhost := <<"/">>}} = + ?COMMAND_LIST_CONSUMER_GROUPS:merge_defaults([<<"other_key">>], + #{verbose => true}), + + {[<<"other_key">>], #{verbose := false, vhost := <<"/">>}} = + ?COMMAND_LIST_CONSUMER_GROUPS:merge_defaults([<<"other_key">>], + #{verbose => false}). + +list_consumer_groups_run(Config) -> + Node = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename), + Opts = + #{node => Node, + timeout => 10000, + vhost => <<"/">>, + verbose => true}, + + %% No connections, no consumers + {ok, []} = ?COMMAND_LIST_CONSUMER_GROUPS:run([], Opts), + + StreamPort = rabbit_stream_SUITE:get_stream_port(Config), + {S, C} = start_stream_connection(StreamPort), + ?awaitMatch(1, connection_count(Config), ?WAIT), + + ConsumerReference = <<"foo">>, + SubProperties = + #{<<"single-active-consumer">> => <<"true">>, + <<"name">> => ConsumerReference}, + + Stream1 = <<"list_consumer_groups_run_1">>, + create_stream(S, Stream1, C), + subscribe(S, 0, Stream1, SubProperties, C), + handle_consumer_update(S, C, 0), + subscribe(S, 1, Stream1, SubProperties, C), + subscribe(S, 2, Stream1, SubProperties, C), + + ?awaitMatch(3, consumer_count(Config), ?WAIT), + + {ok, [CG1]} = ?COMMAND_LIST_CONSUMER_GROUPS:run([], Opts), + assertConsumerGroup(Stream1, ConsumerReference, -1, 3, CG1), + + Stream2 = <<"list_consumer_groups_run_2">>, + create_stream(S, Stream2, C), + subscribe(S, 3, Stream2, SubProperties, C), + handle_consumer_update(S, C, 3), + subscribe(S, 4, Stream2, SubProperties, C), + subscribe(S, 5, Stream2, SubProperties, C), + + ?awaitMatch(3 + 3, consumer_count(Config), ?WAIT), + + {ok, [CG1, CG2]} = ?COMMAND_LIST_CONSUMER_GROUPS:run([], Opts), + assertConsumerGroup(Stream1, ConsumerReference, -1, 3, CG1), + assertConsumerGroup(Stream2, ConsumerReference, -1, 3, CG2), + + delete_stream(S, Stream1, C), + delete_stream(S, Stream2, C), + + close(S, C), + {ok, []} = ?COMMAND_LIST_CONSUMER_GROUPS:run([], Opts), + ok. + +list_group_consumers_validate(_) -> + ValidOpts = + #{vhost => <<"/">>, + stream => <<"s1">>, + reference => <<"foo">>}, + ?assertMatch({validation_failure, not_enough_args}, + ?COMMAND_LIST_GROUP_CONSUMERS:validate([], #{})), + ?assertMatch({validation_failure, not_enough_args}, + ?COMMAND_LIST_GROUP_CONSUMERS:validate([], + #{vhost => + <<"test">>})), + ?assertMatch({validation_failure, {bad_info_key, [foo]}}, + ?COMMAND_LIST_GROUP_CONSUMERS:validate([<<"foo">>], + ValidOpts)), + ?assertMatch(ok, + ?COMMAND_LIST_GROUP_CONSUMERS:validate([<<"subscription_id">>], + ValidOpts)), + ?assertMatch(ok, + ?COMMAND_LIST_GROUP_CONSUMERS:validate([], ValidOpts)). + +list_group_consumers_merge_defaults(_Config) -> + DefaultItems = + [rabbit_data_coercion:to_binary(Item) + || Item <- ?GROUP_CONSUMER_INFO_ITEMS], + {DefaultItems, #{verbose := false, vhost := <<"/">>}} = + ?COMMAND_LIST_GROUP_CONSUMERS:merge_defaults([], #{}), + + {[<<"other_key">>], #{verbose := true, vhost := <<"/">>}} = + ?COMMAND_LIST_GROUP_CONSUMERS:merge_defaults([<<"other_key">>], + #{verbose => true}), + + {[<<"other_key">>], #{verbose := false, vhost := <<"/">>}} = + ?COMMAND_LIST_GROUP_CONSUMERS:merge_defaults([<<"other_key">>], + #{verbose => false}). + +list_group_consumers_run(Config) -> + Node = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename), + Opts = + #{node => Node, + timeout => 10000, + vhost => <<"/">>, + verbose => false}, + Args = [<<"subscription_id">>, <<"state">>], + + Stream1 = <<"list_group_consumers_run_1">>, + ConsumerReference = <<"foo">>, + OptsGroup1 = + maps:merge(#{stream => Stream1, reference => ConsumerReference}, + Opts), + + %% the group does not exist yet + {error, not_found} = + ?COMMAND_LIST_GROUP_CONSUMERS:run(Args, OptsGroup1), + + StreamPort = rabbit_stream_SUITE:get_stream_port(Config), + {S, C} = start_stream_connection(StreamPort), + ?awaitMatch(1, connection_count(Config), ?WAIT), + + SubProperties = + #{<<"single-active-consumer">> => <<"true">>, + <<"name">> => ConsumerReference}, + + create_stream(S, Stream1, C), + subscribe(S, 0, Stream1, SubProperties, C), + handle_consumer_update(S, C, 0), + subscribe(S, 1, Stream1, SubProperties, C), + subscribe(S, 2, Stream1, SubProperties, C), + + ?awaitMatch(3, consumer_count(Config), ?WAIT), + + {ok, Consumers1} = + ?COMMAND_LIST_GROUP_CONSUMERS:run(Args, OptsGroup1), + ?assertEqual([[{subscription_id, 0}, {state, active}], + [{subscription_id, 1}, {state, inactive}], + [{subscription_id, 2}, {state, inactive}]], + Consumers1), + + Stream2 = <<"list_group_consumers_run_2">>, + OptsGroup2 = + maps:merge(#{stream => Stream2, reference => ConsumerReference}, + Opts), + + create_stream(S, Stream2, C), + subscribe(S, 3, Stream2, SubProperties, C), + handle_consumer_update(S, C, 3), + subscribe(S, 4, Stream2, SubProperties, C), + subscribe(S, 5, Stream2, SubProperties, C), + + ?awaitMatch(3 + 3, consumer_count(Config), ?WAIT), + + {ok, Consumers2} = + ?COMMAND_LIST_GROUP_CONSUMERS:run(Args, OptsGroup2), + ?assertEqual([[{subscription_id, 3}, {state, active}], + [{subscription_id, 4}, {state, inactive}], + [{subscription_id, 5}, {state, inactive}]], + Consumers2), + + delete_stream(S, Stream1, C), + delete_stream(S, Stream2, C), + + {error, not_found} = + ?COMMAND_LIST_GROUP_CONSUMERS:run(Args, OptsGroup2), + + close(S, C), + ok. + +handle_consumer_update(S, C0, SubId) -> + {{request, CorrId, {consumer_update, SubId, true}}, C1} = + rabbit_stream_SUITE:receive_commands(gen_tcp, S, C0), + ConsumerUpdateCmd = + {response, CorrId, {consumer_update, ?RESPONSE_CODE_OK, next}}, + ConsumerUpdateFrame = rabbit_stream_core:frame(ConsumerUpdateCmd), + ok = gen_tcp:send(S, ConsumerUpdateFrame), + C1. + +assertConsumerGroup(S, R, PI, Cs, Record) -> + ?assertEqual(S, proplists:get_value(stream, Record)), + ?assertEqual(R, proplists:get_value(reference, Record)), + ?assertEqual(PI, proplists:get_value(partition_index, Record)), + ?assertEqual(Cs, proplists:get_value(consumers, Record)), + ok. + add_super_stream_merge_defaults(_Config) -> ?assertMatch({[<<"super-stream">>], #{partitions := 3, vhost := <<"/">>}}, @@ -492,6 +692,14 @@ partitions(Config, Name) -> create_stream(S, Stream, C0) -> rabbit_stream_SUITE:test_create_stream(gen_tcp, S, Stream, C0). +subscribe(S, SubId, Stream, SubProperties, C) -> + rabbit_stream_SUITE:test_subscribe(gen_tcp, + S, + SubId, + Stream, + SubProperties, + C). + subscribe(S, SubId, Stream, C) -> rabbit_stream_SUITE:test_subscribe(gen_tcp, S, SubId, Stream, C). @@ -523,8 +731,21 @@ options(Config) -> vhost => <<"/">>}, %% just for list_consumers and list_publishers Opts. +flatten_command_result([], []) -> + []; +flatten_command_result([], Acc) -> + lists:reverse(Acc); +flatten_command_result([[{_K, _V} | _RecordRest] = Record | Rest], + Acc) -> + flatten_command_result(Rest, [Record | Acc]); +flatten_command_result([H | T], Acc) -> + Acc1 = flatten_command_result(H, Acc), + flatten_command_result(T, Acc1). + to_list(CommandRun) -> - 'Elixir.Enum':to_list(CommandRun). + Lists = 'Elixir.Enum':to_list(CommandRun), + %% we can get results from different connections, so we flatten out + flatten_command_result(Lists, []). command_result_count(CommandRun) -> length(to_list(CommandRun)). diff --git a/deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl b/deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl index 6152dd267493..26d341f0b930 100644 --- a/deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl +++ b/deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl @@ -16,7 +16,6 @@ -module(rabbit_stream_SUITE). -% -include_lib("common_test/include/ct.hrl"). -include_lib("eunit/include/eunit.hrl"). -include_lib("rabbit_common/include/rabbit.hrl"). -include_lib("rabbitmq_ct_helpers/include/rabbit_assert.hrl"). @@ -34,7 +33,8 @@ all() -> groups() -> [{single_node, [], - [test_stream, + [sac_ff, %% must stay at the top, stream sac feature flag disabled for this one + test_stream, test_stream_tls, test_gc_consumers, test_gc_publishers, @@ -71,6 +71,26 @@ init_per_group(Group, Config) {rabbitmq_ct_tls_verify, verify_none}), Config3 = rabbit_ct_helpers:set_config(Config2, {rabbitmq_stream, verify_none}), + %% stream sac feature flag disabled for the first test, + %% then enabled in the end_per_testcase function + ExtraSetupSteps = + case Group of + single_node -> + [fun(StepConfig) -> + rabbit_ct_helpers:merge_app_env(StepConfig, + {rabbit, + [{forced_feature_flags_on_init, + [classic_mirrored_queue_version, + implicit_default_bindings, + maintenance_mode_status, + user_limits, + virtual_host_metadata, + quorum_queue, + stream_queue]}]}) + end]; + _ -> + [] + end, rabbit_ct_helpers:run_setup_steps(Config3, [fun(StepConfig) -> rabbit_ct_helpers:merge_app_env(StepConfig, @@ -84,6 +104,7 @@ init_per_group(Group, Config) [{connection_negotiation_step_timeout, 500}]}) end] + ++ ExtraSetupSteps ++ rabbit_ct_broker_helpers:setup_steps()); init_per_group(cluster = Group, Config) -> Config1 = @@ -116,6 +137,13 @@ end_per_group(_, Config) -> init_per_testcase(_TestCase, Config) -> Config. +end_per_testcase(sac_ff, Config) -> + rabbit_ct_broker_helpers:rpc(Config, + 0, + rabbit_feature_flags, + enable, + [stream_single_active_consumer]), + ok; end_per_testcase(_Test, _Config) -> ok. @@ -172,6 +200,7 @@ test_gc_consumers(Config) -> 0, 0, 0, + true, #{}]), ?awaitMatch(0, consumer_count(Config), ?WAIT), ok. @@ -262,6 +291,41 @@ timeout_close_sent(Config) -> % Now, rabbit_stream_reader is in state close_sent. ?assertEqual(closed, wait_for_socket_close(gen_tcp, S, 1)). +sac_ff(Config) -> + Port = get_stream_port(Config), + {ok, S} = + gen_tcp:connect("localhost", Port, [{active, false}, {mode, binary}]), + C = rabbit_stream_core:init(0), + test_peer_properties(gen_tcp, S, C), + test_authenticate(gen_tcp, S, C), + Stream = <<"stream1">>, + test_create_stream(gen_tcp, S, Stream, C), + test_declare_publisher(gen_tcp, S, 1, Stream, C), + ?awaitMatch(#{publishers := 1}, get_global_counters(Config), ?WAIT), + Body = <<"hello">>, + test_publish_confirm(gen_tcp, S, 1, Body, C), + + SubscriptionId = 42, + SubCmd = + {request, 1, + {subscribe, + SubscriptionId, + Stream, + 0, + 10, + #{<<"single-active-consumer">> => <<"true">>, + <<"name">> => <<"foo">>}}}, + SubscribeFrame = rabbit_stream_core:frame(SubCmd), + ok = gen_tcp:send(S, SubscribeFrame), + {Cmd, C} = receive_commands(gen_tcp, S, C), + ?assertMatch({response, 1, + {subscribe, ?RESPONSE_CODE_PRECONDITION_FAILED}}, + Cmd), + test_delete_stream(gen_tcp, S, Stream, C), + test_close(gen_tcp, S, C), + closed = wait_for_socket_close(gen_tcp, S, 10), + ok. + consumer_count(Config) -> ets_count(Config, ?TABLE_CONSUMER). @@ -449,14 +513,22 @@ test_publish_confirm(Transport, S, PublisherId, Body, C0) -> C. test_subscribe(Transport, S, SubscriptionId, Stream, C0) -> + test_subscribe(Transport, + S, + SubscriptionId, + Stream, + #{<<"random">> => <<"thing">>}, + C0). + +test_subscribe(Transport, + S, + SubscriptionId, + Stream, + SubscriptionProperties, + C0) -> SubCmd = {request, 1, - {subscribe, - SubscriptionId, - Stream, - 0, - 10, - #{<<"random">> => <<"thing">>}}}, + {subscribe, SubscriptionId, Stream, 0, 10, SubscriptionProperties}}, SubscribeFrame = rabbit_stream_core:frame(SubCmd), ok = Transport:send(S, SubscribeFrame), {Cmd, C} = receive_commands(Transport, S, C0), diff --git a/deps/rabbitmq_stream/test/rabbit_stream_manager_SUITE.erl b/deps/rabbitmq_stream/test/rabbit_stream_manager_SUITE.erl index 529d78220be5..0a5eb2b7d338 100644 --- a/deps/rabbitmq_stream/test/rabbit_stream_manager_SUITE.erl +++ b/deps/rabbitmq_stream/test/rabbit_stream_manager_SUITE.erl @@ -9,6 +9,7 @@ -include_lib("eunit/include/eunit.hrl"). -include_lib("common_test/include/ct.hrl"). +-include_lib("amqp_client/include/amqp_client.hrl"). -compile(export_all). @@ -16,7 +17,8 @@ all() -> [{group, non_parallel_tests}]. groups() -> - [{non_parallel_tests, [], [manage_super_stream, lookup_leader]}]. + [{non_parallel_tests, [], + [manage_super_stream, lookup_leader, partition_index]}]. %% ------------------------------------------------------------------- %% Testsuite setup/teardown. @@ -123,6 +125,54 @@ manage_super_stream(Config) -> <<"invoices-2">>], [<<"0">>, <<"1">>, <<"2">>])), + ?assertMatch({ok, _}, delete_stream(Config, <<"invoices-1">>)), + ok. + +partition_index(Config) -> + % create super stream + ?assertEqual(ok, + create_super_stream(Config, + <<"invoices">>, + [<<"invoices-0">>, <<"invoices-1">>, + <<"invoices-2">>], + [<<"0">>, <<"1">>, <<"2">>])), + [?assertEqual({ok, Index}, + partition_index(Config, <<"invoices">>, Stream)) + || {Index, Stream} + <- [{0, <<"invoices-0">>}, {1, <<"invoices-1">>}, + {2, <<"invoices-2">>}]], + + ?assertEqual(ok, delete_super_stream(Config, <<"invoices">>)), + + C = start_amqp_connection(Config), + {ok, Ch} = amqp_connection:open_channel(C), + + StreamsWithIndexes = + [<<"invoices-0">>, <<"invoices-1">>, <<"invoices-2">>], + create_super_stream_topology(<<"invoices">>, StreamsWithIndexes, Ch), + + [?assertEqual({ok, Index}, + partition_index(Config, <<"invoices">>, Stream)) + || {Index, Stream} + <- [{0, <<"invoices-0">>}, {1, <<"invoices-1">>}, + {2, <<"invoices-2">>}]], + + delete_super_stream_topology(<<"invoices">>, StreamsWithIndexes, Ch), + + StreamsWithNoIndexes = + [<<"invoices-amer">>, <<"invoices-emea">>, <<"invoices-apac">>], + create_super_stream_topology(<<"invoices">>, StreamsWithNoIndexes, + Ch), + + [?assertEqual({ok, -1}, + partition_index(Config, <<"invoices">>, Stream)) + || Stream + <- [<<"invoices-amer">>, <<"invoices-emea">>, <<"invoices-apac">>]], + + delete_super_stream_topology(<<"invoices">>, StreamsWithNoIndexes, + Ch), + + amqp_connection:close(C), ok. create_super_stream(Config, Name, Partitions, RKs) -> @@ -178,3 +228,59 @@ route(Config, RoutingKey, SuperStream) -> rabbit_stream_manager, route, [RoutingKey, <<"/">>, SuperStream]). + +partition_index(Config, SuperStream, Stream) -> + rabbit_ct_broker_helpers:rpc(Config, + 0, + rabbit_stream_manager, + partition_index, + [<<"/">>, SuperStream, Stream]). + +start_amqp_connection(Config) -> + Port = + rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_amqp), + {ok, Connection} = + amqp_connection:start(#amqp_params_network{port = Port}), + Connection. + +create_super_stream_topology(SuperStream, Streams, Ch) -> + ExchangeDeclare = + #'exchange.declare'{exchange = SuperStream, + type = <<"direct">>, + passive = false, + durable = true, + auto_delete = false, + internal = false, + nowait = false, + arguments = []}, + #'exchange.declare_ok'{} = amqp_channel:call(Ch, ExchangeDeclare), + + [begin + QueueDeclare = + #'queue.declare'{queue = S, + durable = true, + exclusive = false, + auto_delete = false, + arguments = + [{<<"x-queue-type">>, longstr, + <<"stream">>}]}, + #'queue.declare_ok'{} = amqp_channel:call(Ch, QueueDeclare), + Binding = + #'queue.bind'{queue = S, + exchange = SuperStream, + routing_key = S}, + #'queue.bind_ok'{} = amqp_channel:call(Ch, Binding) + end + || S <- Streams], + ok. + +delete_super_stream_topology(SuperStream, Streams, Ch) -> + DeleteExchange = #'exchange.delete'{exchange = SuperStream}, + #'exchange.delete_ok'{} = amqp_channel:call(Ch, DeleteExchange), + + [begin + DeleteQueue = #'queue.delete'{queue = S}, + #'queue.delete_ok'{} = amqp_channel:call(Ch, DeleteQueue) + end + || S <- Streams], + ok. diff --git a/deps/rabbitmq_stream_common/include/rabbit_stream.hrl b/deps/rabbitmq_stream_common/include/rabbit_stream.hrl index e552cc7a02d8..38ac963d2a5b 100644 --- a/deps/rabbitmq_stream_common/include/rabbit_stream.hrl +++ b/deps/rabbitmq_stream_common/include/rabbit_stream.hrl @@ -23,6 +23,7 @@ -define(COMMAND_HEARTBEAT, 23). -define(COMMAND_ROUTE, 24). -define(COMMAND_PARTITIONS, 25). +-define(COMMAND_CONSUMER_UPDATE, 26). -define(REQUEST, 0). -define(RESPONSE, 1). @@ -50,6 +51,7 @@ -define(RESPONSE_CODE_NO_OFFSET, 19). +-define(OFFSET_TYPE_NONE, 0). -define(OFFSET_TYPE_FIRST, 1). -define(OFFSET_TYPE_LAST, 2). -define(OFFSET_TYPE_NEXT, 3). @@ -96,6 +98,8 @@ offset, offset_lag, credits, + active, + activity_status, properties ]). @@ -109,4 +113,17 @@ messages_errored ]). +-define(CONSUMER_GROUP_INFO_ITEMS, [ + stream, + reference, + partition_index, + consumers + ]). + +-define(GROUP_CONSUMER_INFO_ITEMS, [ + subscription_id, + connection_name, + state +]). + -define(STREAM_GUIDE_URL, <<"https://rabbitmq.com/stream.html">>). diff --git a/deps/rabbitmq_stream_common/src/rabbit_stream_core.erl b/deps/rabbitmq_stream_common/src/rabbit_stream_core.erl index d16b887df194..5ec5f73e1ce1 100644 --- a/deps/rabbitmq_stream_common/src/rabbit_stream_core.erl +++ b/deps/rabbitmq_stream_common/src/rabbit_stream_core.erl @@ -63,6 +63,7 @@ -type credit() :: non_neg_integer(). -type offset_ref() :: binary(). -type endpoint() :: {Host :: binary(), Port :: non_neg_integer()}. +-type active() :: boolean(). -type command() :: {publish, publisher_id(), @@ -98,7 +99,8 @@ {open, VirtualHost :: binary()} | {close, Code :: non_neg_integer(), Reason :: binary()} | {route, RoutingKey :: binary(), SuperStream :: binary()} | - {partitions, SuperStream :: binary()}} | + {partitions, SuperStream :: binary()} | + {consumer_update, subscription_id(), active()}} | {response, correlation_id(), {declare_publisher | delete_publisher | @@ -124,7 +126,8 @@ HeartBeat :: non_neg_integer()} | {credit, response_code(), subscription_id()} | {route, response_code(), stream_name()} | - {partitions, response_code(), [stream_name()]}} | + {partitions, response_code(), [stream_name()]} | + {consumer_update, response_code(), none | offset_spec()}} | {unknown, binary()}. -spec init(term()) -> state(). @@ -423,7 +426,24 @@ response_body({route = Tag, Code, Stream}) -> {command_id(Tag), <>}; response_body({partitions = Tag, Code, Streams}) -> StreamsBin = [<> || Stream <- Streams], - {command_id(Tag), [<>, StreamsBin]}. + {command_id(Tag), [<>, StreamsBin]}; +response_body({consumer_update = Tag, Code, OffsetSpec}) -> + OffsetSpecBin = + case OffsetSpec of + none -> + <>; + first -> + <>; + last -> + <>; + next -> + <>; + Offset when is_integer(Offset) -> + <>; + {timestamp, Ts} -> + <> + end, + {command_id(Tag), [<>]}. request_body({declare_publisher = Tag, PublisherId, @@ -510,7 +530,16 @@ request_body({close = Tag, Code, Reason}) -> request_body({route = Tag, RoutingKey, SuperStream}) -> {Tag, <>}; request_body({partitions = Tag, SuperStream}) -> - {Tag, <>}. + {Tag, <>}; +request_body({consumer_update = Tag, SubscriptionId, Active}) -> + ActiveBin = + case Active of + true -> + 1; + false -> + 0 + end, + {Tag, <>}. append_data(Prev, Data) when is_binary(Prev) -> [Prev, Data]; @@ -748,6 +777,20 @@ parse_request(<>) -> request(CorrelationId, {partitions, SuperStream}); +parse_request(<>) -> + Active = + case ActiveBin of + 0 -> + false; + 1 -> + true + end, + request(CorrelationId, {consumer_update, SubscriptionId, Active}); parse_request(Bin) -> {unknown, Bin}. @@ -823,7 +866,30 @@ parse_response_body(?COMMAND_ROUTE, parse_response_body(?COMMAND_PARTITIONS, <>) -> Partitions = list_of_strings(PartitionsBin), - {partitions, ResponseCode, Partitions}. + {partitions, ResponseCode, Partitions}; +parse_response_body(?COMMAND_CONSUMER_UPDATE, + <>) -> + OffsetSpec = offset_spec(OffsetType, OffsetValue), + {consumer_update, ResponseCode, OffsetSpec}. + +offset_spec(OffsetType, OffsetValueBin) -> + case OffsetType of + ?OFFSET_TYPE_NONE -> + none; + ?OFFSET_TYPE_FIRST -> + first; + ?OFFSET_TYPE_LAST -> + last; + ?OFFSET_TYPE_NEXT -> + next; + ?OFFSET_TYPE_OFFSET -> + <> = OffsetValueBin, + Offset; + ?OFFSET_TYPE_TIMESTAMP -> + <> = OffsetValueBin, + {timestamp, Timestamp} + end. request(Corr, Cmd) -> {request, Corr, Cmd}. @@ -941,7 +1007,9 @@ command_id(heartbeat) -> command_id(route) -> ?COMMAND_ROUTE; command_id(partitions) -> - ?COMMAND_PARTITIONS. + ?COMMAND_PARTITIONS; +command_id(consumer_update) -> + ?COMMAND_CONSUMER_UPDATE. parse_command_id(?COMMAND_DECLARE_PUBLISHER) -> declare_publisher; @@ -992,7 +1060,9 @@ parse_command_id(?COMMAND_HEARTBEAT) -> parse_command_id(?COMMAND_ROUTE) -> route; parse_command_id(?COMMAND_PARTITIONS) -> - partitions. + partitions; +parse_command_id(?COMMAND_CONSUMER_UPDATE) -> + consumer_update. element_index(Element, List) -> element_index(Element, List, 0). diff --git a/deps/rabbitmq_stream_common/test/rabbit_stream_core_SUITE.erl b/deps/rabbitmq_stream_common/test/rabbit_stream_core_SUITE.erl index 74b0cc1d6091..40152279c8d1 100644 --- a/deps/rabbitmq_stream_common/test/rabbit_stream_core_SUITE.erl +++ b/deps/rabbitmq_stream_common/test/rabbit_stream_core_SUITE.erl @@ -103,6 +103,7 @@ roundtrip(_Config) -> test_roundtrip({request, 99, {close, 99, <<"reason">>}}), test_roundtrip({request, 99, {route, <<"rkey.*">>, <<"exchange">>}}), test_roundtrip({request, 99, {partitions, <<"super stream">>}}), + test_roundtrip({request, 99, {consumer_update, 1, true}}), %% RESPONSES [test_roundtrip({response, 99, {Tag, 53}}) || Tag @@ -128,10 +129,10 @@ roundtrip(_Config) -> test_roundtrip({response, 0, {tune, 10000, 12345}}), % %% NB: does not write correlation id test_roundtrip({response, 0, {credit, 98, 200}}), - % %% TODO should route return a list of routed streams? test_roundtrip({response, 99, {route, 1, <<"stream_name">>}}), test_roundtrip({response, 99, {partitions, 1, [<<"stream1">>, <<"stream2">>]}}), + test_roundtrip({response, 99, {consumer_update, 1, none}}), ok. roundtrip_metadata(_Config) -> diff --git a/deps/rabbitmq_stream_management/priv/www/js/tmpl/streamConsumersList.ejs b/deps/rabbitmq_stream_management/priv/www/js/tmpl/streamConsumersList.ejs index a91c0a0f0d6c..f0219df857a0 100644 --- a/deps/rabbitmq_stream_management/priv/www/js/tmpl/streamConsumersList.ejs +++ b/deps/rabbitmq_stream_management/priv/www/js/tmpl/streamConsumersList.ejs @@ -8,6 +8,9 @@ Offset Offset Lag Credits + Active + Activity status + Properties <% @@ -21,6 +24,9 @@ <%= consumer.offset %> <%= consumer.offset_lag %> <%= consumer.credits %> + <%= fmt_boolean(consumer.active, "●") %> + <%= fmt_activity_status(consumer.activity_status, "up") %> + <%= fmt_table_short(consumer.properties) %> <% } %> diff --git a/deps/rabbitmq_stream_management/test/http_SUITE_data/src/test/java/com/rabbitmq/stream/HttpTest.java b/deps/rabbitmq_stream_management/test/http_SUITE_data/src/test/java/com/rabbitmq/stream/HttpTest.java index 495c831b6ae2..f4448e6e234f 100644 --- a/deps/rabbitmq_stream_management/test/http_SUITE_data/src/test/java/com/rabbitmq/stream/HttpTest.java +++ b/deps/rabbitmq_stream_management/test/http_SUITE_data/src/test/java/com/rabbitmq/stream/HttpTest.java @@ -572,6 +572,8 @@ void consumers(Map subscriptionProperties) throws Exception { assertThat(((Number) consumer.get("consumed")).intValue()).isEqualTo(0); assertThat(((Number) consumer.get("offset")).intValue()).isEqualTo(0); assertThat(((Number) consumer.get("subscription_id")).intValue()).isEqualTo(0); + assertThat(consumer.get("active")).isEqualTo(true); + assertThat(consumer.get("activity_status")).isEqualTo("up"); assertThat(consumer.get("properties")).isNotNull().isEqualTo(subscriptionProperties); assertThat(connectionDetails(consumer))