Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
50 commits
Select commit Hold shift + click to select a range
d5ae62b
Handle single active consumer registration
acogoluegnes Nov 17, 2021
c47d83f
Handle SAC (simple) failover
acogoluegnes Nov 18, 2021
da3f11a
Unregister stream SAC consumers on connection closing
acogoluegnes Nov 18, 2021
d98b10f
Compute stream partition index
acogoluegnes Nov 22, 2021
e91009b
Notify stream consumer connection of SAC changes
acogoluegnes Nov 23, 2021
8665184
Refactor SAC coordinator for SAC in partition context
acogoluegnes Nov 24, 2021
3a05cf1
Remove unused function
acogoluegnes Nov 24, 2021
a607f84
Implement basic SAC rollover on stream partition
acogoluegnes Nov 24, 2021
11ef789
Fix stream SAC computation and notification
acogoluegnes Nov 25, 2021
2fe558a
Fix stream SAC computation and notification
acogoluegnes Nov 26, 2021
7059832
Fix stream consumer state change
acogoluegnes Dec 3, 2021
60529aa
Move SAC coordinator to stream coordinator
acogoluegnes Dec 9, 2021
037af8c
Use RA side effects to notify consumer connections
acogoluegnes Dec 14, 2021
4ff2c4a
Refactor stream SAC coordinator
acogoluegnes Dec 15, 2021
7e4ee5e
Re-arrange code in stream SAC coordinator
acogoluegnes Dec 15, 2021
e02aa98
Start to monitor connection PIDs in stream SAC coordinator
acogoluegnes Dec 16, 2021
bd4e440
Fix typo in bazel configuration file
acogoluegnes Dec 16, 2021
5e0089f
Comment out unused variable initialization
acogoluegnes Dec 16, 2021
f1876e2
Fix test
acogoluegnes Dec 16, 2021
eeefd7c
Monitor connection PIDs in stream SAC coordinator
acogoluegnes Jan 4, 2022
d1fea82
Use helpers in stream SAC coordinator test
acogoluegnes Jan 4, 2022
6b6953e
Add unit for "simple" SAC group
acogoluegnes Jan 5, 2022
29b4b3e
Add unit test for SAC in super stream partition
acogoluegnes Jan 5, 2022
de7f5e1
Do not emit consumer lag for inactive consumer
acogoluegnes Jan 20, 2022
1ec49a7
Add list_stream_consumer_groups CLI command
acogoluegnes Jan 27, 2022
e547972
Fix target name in bazel configuration
acogoluegnes Jan 27, 2022
c8fea3a
Indent
acogoluegnes Jan 27, 2022
434d7b5
Add colums argument to list_stream_consumer_groups
acogoluegnes Jan 27, 2022
fb6481c
Add ignore_xref entries
acogoluegnes Jan 27, 2022
5ad9e34
Add list_stream_group_consumers CLI command
acogoluegnes Jan 28, 2022
0623192
Handle "active" field for stream consumers
acogoluegnes Jan 28, 2022
bee4fca
Fix stream SAC coordinator unit test
acogoluegnes Jan 31, 2022
0806fa0
Add stream single active consumer feature flag
acogoluegnes Feb 1, 2022
cd22bef
Use feature flag public API
acogoluegnes Feb 1, 2022
ad29176
Add active and activity_status flag to stream consumer metrics
acogoluegnes Feb 2, 2022
b1d0277
Check active and activity_status metrics field
acogoluegnes Feb 2, 2022
a1e92b8
Add active and activity_status to list_stream_consumers
acogoluegnes Feb 2, 2022
073c96d
Tolerate unknown fields in CLI command requests
acogoluegnes Feb 2, 2022
827fe6f
Add active and activity_status in stream consumer UI list
acogoluegnes Feb 3, 2022
adba97f
Do not dispatch on credit request is consumer is inactive
acogoluegnes Feb 7, 2022
8406e01
Bump stream coordinator machine version to 3
acogoluegnes Mar 23, 2022
4cb814d
Check consumer is actually a SAC on SAC event
acogoluegnes Mar 28, 2022
ee79237
Return error if SAC has no name
acogoluegnes Mar 28, 2022
7415abe
Log message if message update response code is not OK
acogoluegnes Mar 28, 2022
da8c23d
Adapt clause to send_chunk new return type
acogoluegnes Mar 30, 2022
67126a8
Use more specific type
ansd Mar 31, 2022
f4e2a95
Address code review comments for stream SAC
acogoluegnes Apr 1, 2022
8ccdd7b
Move SAC API to SAC coordinator
acogoluegnes Apr 1, 2022
68e8ae8
Rename function for clarity
acogoluegnes Apr 7, 2022
85b0625
Address code review comments for stream SAC
acogoluegnes Apr 13, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions deps/rabbit/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -794,6 +794,12 @@ suites = [
"//deps/rabbit_common:erlang_app",
],
),
rabbitmq_suite(
name = "rabbit_stream_sac_coordinator_SUITE",
deps = [
"//deps/rabbit_common:erlang_app",
],
),
rabbitmq_integration_suite(
PACKAGE,
name = "rabbit_stream_queue_SUITE",
Expand Down
13 changes: 13 additions & 0 deletions deps/rabbit/rebar.config
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
{plugins, [rebar3_format]}.

{format, [
{files, ["src/rabbit_stream_sac_coordinator.*",
"test/rabbit_stream_sac_coordinator_SUITE.erl"]},
{formatter, default_formatter},
{options, #{
paper => 80,
ribbon => 70,
inline_attributes => {when_under, 1},
inline_items => {when_under, 4}
}}
]}.
21 changes: 20 additions & 1 deletion deps/rabbit/src/rabbit_core_ff.erl
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@
implicit_default_bindings_migration/3,
virtual_host_metadata_migration/3,
maintenance_mode_status_migration/3,
user_limits_migration/3]).
user_limits_migration/3,
stream_single_active_consumer_migration/3]).

-rabbit_feature_flag(
{classic_mirrored_queue_version,
Expand Down Expand Up @@ -68,6 +69,15 @@
migration_fun => {?MODULE, user_limits_migration}
}}).

-rabbit_feature_flag(
{stream_single_active_consumer,
#{desc => "Single active consumer for streams",
doc_url => "https://www.rabbitmq.com/stream.html",
stability => stable,
depends_on => [stream_queue],
migration_fun => {?MODULE, stream_single_active_consumer_migration}
}}).

classic_mirrored_queue_version_migration(_FeatureName, _FeatureProps, _Enable) ->
ok.

Expand Down Expand Up @@ -188,3 +198,12 @@ user_limits_migration(_FeatureName, _FeatureProps, enable) ->
end;
user_limits_migration(_FeatureName, _FeatureProps, is_enabled) ->
mnesia:table_info(rabbit_user, attributes) =:= internal_user:fields(internal_user_v2).

%% -------------------------------------------------------------------
%% Stream single active consumer.
%% -------------------------------------------------------------------

stream_single_active_consumer_migration(_FeatureName, _FeatureProps, enable) ->
ok;
stream_single_active_consumer_migration(_FeatureName, _FeatureProps, is_enabled) ->
undefined.
32 changes: 27 additions & 5 deletions deps/rabbit/src/rabbit_stream_coordinator.erl
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@
-export([log_overview/1]).
-export([replay/1]).

%% for SAC coordinator
-export([process_command/1,
sac_state/1]).

%% for testing and debugging
-export([eval_listeners/3,
state/0]).
Expand Down Expand Up @@ -88,6 +92,7 @@
{member_stopped, stream_id(), args()} |
{retention_updated, stream_id(), args()} |
{mnesia_updated, stream_id(), args()} |
{sac, rabbit_stream_sac_coordinator:command()} |
ra_machine:effect().

-export_type([command/0]).
Expand Down Expand Up @@ -171,6 +176,9 @@ policy_changed(Q) when ?is_amqqueue(Q) ->
StreamId = maps:get(name, amqqueue:get_type_state(Q)),
process_command({policy_changed, StreamId, #{queue => Q}}).

sac_state(#?MODULE{single_active_consumer = SacState}) ->
SacState.

%% for debugging
state() ->
case ra:local_query({?MODULE, node()}, fun(State) -> State end) of
Expand Down Expand Up @@ -338,15 +346,15 @@ all_coord_members() ->
Nodes = rabbit_mnesia:cluster_nodes(running) -- [node()],
[{?MODULE, Node} || Node <- [node() | Nodes]].

version() -> 2.
version() -> 3.

which_module(_) ->
?MODULE.

init(_Conf) ->
#?MODULE{}.
#?MODULE{single_active_consumer = rabbit_stream_sac_coordinator:init_state()}.

-spec apply(map(), command(), state()) ->
-spec apply(ra_machine:command_meta_data(), command(), state()) ->
{state(), term(), ra_machine:effects()}.
apply(#{index := _Idx, machine_version := MachineVersion} = Meta0,
{_CmdTag, StreamId, #{}} = Cmd,
Expand Down Expand Up @@ -380,11 +388,18 @@ apply(#{index := _Idx, machine_version := MachineVersion} = Meta0,
Reply ->
return(Meta, State0, Reply, [])
end;
apply(Meta, {sac, SacCommand}, #?MODULE{single_active_consumer = SacState0,
monitors = Monitors0} = State0) ->
{SacState1, Reply, Effects0} = rabbit_stream_sac_coordinator:apply(SacCommand, SacState0),
{SacState2, Monitors1, Effects1} =
rabbit_stream_sac_coordinator:ensure_monitors(SacCommand, SacState1, Monitors0, Effects0),
return(Meta, State0#?MODULE{single_active_consumer = SacState2,
monitors = Monitors1}, Reply, Effects1);
apply(#{machine_version := MachineVersion} = Meta, {down, Pid, Reason} = Cmd,
#?MODULE{streams = Streams0,
monitors = Monitors0,
listeners = StateListeners0} = State) ->

listeners = StateListeners0,
single_active_consumer = SacState0 } = State) ->
Effects0 = case Reason of
noconnection ->
[{monitor, node, node(Pid)}];
Expand Down Expand Up @@ -438,6 +453,10 @@ apply(#{machine_version := MachineVersion} = Meta, {down, Pid, Reason} = Cmd,
return(Meta, State#?MODULE{streams = Streams0,
monitors = Monitors1}, ok, Effects0)
end;
{sac, Monitors1} ->
{SacState1, Effects} = rabbit_stream_sac_coordinator:handle_connection_down(Pid, SacState0),
return(Meta, State#?MODULE{single_active_consumer = SacState1,
monitors = Monitors1}, ok, Effects);
error ->
return(Meta, State, ok, Effects0)
end;
Expand Down Expand Up @@ -1784,6 +1803,9 @@ machine_version(1, 2, State = #?MODULE{streams = Streams0,
{State#?MODULE{streams = Streams1,
monitors = Monitors2,
listeners = undefined}, Effects};
machine_version(2, 3, State) ->
rabbit_log:info("Stream coordinator machine version changes from 2 to 3, updating state."),
{State#?MODULE{single_active_consumer = rabbit_stream_sac_coordinator:init_state()}, []};
machine_version(From, To, State) ->
rabbit_log:info("Stream coordinator machine version changes from ~p to ~p, no state changes required.",
[From, To]),
Expand Down
5 changes: 3 additions & 2 deletions deps/rabbit/src/rabbit_stream_coordinator.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,11 @@
-record(rabbit_stream_coordinator, {streams = #{} :: #{stream_id() => #stream{}},
monitors = #{} :: #{pid() => {stream_id() | %% v0 & v1
#{stream_id() => ok}, %% v2
monitor_role()}},
monitor_role()} |
sac},
%% not used as of v2
listeners = #{} :: undefined | #{stream_id() =>
#{pid() := queue_ref()}},
single_active_consumer = undefined :: undefined | rabbit_stream_sac_coordinator:state(),
%% future extensibility
reserved_1,
reserved_2}).
Loading