diff --git a/deps/rabbitmq_mqtt/BUILD.bazel b/deps/rabbitmq_mqtt/BUILD.bazel index e48a15fb0ad9..9d7d3e88e43b 100644 --- a/deps/rabbitmq_mqtt/BUILD.bazel +++ b/deps/rabbitmq_mqtt/BUILD.bazel @@ -84,7 +84,6 @@ rabbitmq_app( "//deps/amqp10_common:erlang_app", "//deps/rabbit:erlang_app", "//deps/rabbit_common:erlang_app", - "@ra//:erlang_app", "@ranch//:erlang_app", ], ) @@ -178,16 +177,6 @@ rabbitmq_integration_suite( name = "config_schema_SUITE", ) -rabbitmq_integration_suite( - name = "ff_SUITE", - additional_beam = [ - ":test_util_beam", - ], - runtime_deps = [ - "@emqtt//:erlang_app", - ], -) - rabbitmq_integration_suite( name = "java_SUITE", additional_beam = [ @@ -197,11 +186,6 @@ rabbitmq_integration_suite( sharding_method = "group", ) -rabbitmq_suite( - name = "mqtt_machine_SUITE", - size = "small", -) - rabbitmq_suite( name = "processor_SUITE", size = "small", diff --git a/deps/rabbitmq_mqtt/Makefile b/deps/rabbitmq_mqtt/Makefile index 4db52834f61d..eb1d6b657356 100644 --- a/deps/rabbitmq_mqtt/Makefile +++ b/deps/rabbitmq_mqtt/Makefile @@ -44,7 +44,7 @@ BUILD_WITHOUT_QUIC=1 export BUILD_WITHOUT_QUIC LOCAL_DEPS = ssl -DEPS = ranch rabbit_common rabbit ra amqp10_common +DEPS = ranch rabbit_common rabbit amqp10_common TEST_DEPS = emqtt ct_helper rabbitmq_ct_helpers rabbitmq_ct_client_helpers rabbitmq_management rabbitmq_web_mqtt amqp_client rabbitmq_consistent_hash_exchange rabbitmq_amqp_client rabbitmq_stomp rabbitmq_stream PLT_APPS += rabbitmqctl elixir diff --git a/deps/rabbitmq_mqtt/app.bzl b/deps/rabbitmq_mqtt/app.bzl index 44db6fc9d2e1..87d17a12e46d 100644 --- a/deps/rabbitmq_mqtt/app.bzl +++ b/deps/rabbitmq_mqtt/app.bzl @@ -17,14 +17,9 @@ def all_beam_files(name = "all_beam_files"): erlang_bytecode( name = "other_beam", srcs = [ - "src/Elixir.RabbitMQ.CLI.Ctl.Commands.DecommissionMqttNodeCommand.erl", "src/Elixir.RabbitMQ.CLI.Ctl.Commands.ListMqttConnectionsCommand.erl", "src/mc_mqtt.erl", - "src/mqtt_machine.erl", - "src/mqtt_machine_v0.erl", - "src/mqtt_node.erl", "src/rabbit_mqtt.erl", - "src/rabbit_mqtt_collector.erl", "src/rabbit_mqtt_confirms.erl", "src/rabbit_mqtt_ff.erl", "src/rabbit_mqtt_internal_event_handler.erl", @@ -46,7 +41,7 @@ def all_beam_files(name = "all_beam_files"): beam = [":behaviours"], dest = "ebin", erlc_opts = "//:erlc_opts", - deps = ["//deps/amqp10_common:erlang_app", "//deps/rabbit:erlang_app", "//deps/rabbit_common:erlang_app", "//deps/rabbitmq_cli:erlang_app", "@ra//:erlang_app", "@ranch//:erlang_app"], + deps = ["//deps/amqp10_common:erlang_app", "//deps/rabbit:erlang_app", "//deps/rabbit_common:erlang_app", "//deps/rabbitmq_cli:erlang_app", "@ranch//:erlang_app"], ) def all_test_beam_files(name = "all_test_beam_files"): @@ -68,14 +63,9 @@ def all_test_beam_files(name = "all_test_beam_files"): name = "test_other_beam", testonly = True, srcs = [ - "src/Elixir.RabbitMQ.CLI.Ctl.Commands.DecommissionMqttNodeCommand.erl", "src/Elixir.RabbitMQ.CLI.Ctl.Commands.ListMqttConnectionsCommand.erl", "src/mc_mqtt.erl", - "src/mqtt_machine.erl", - "src/mqtt_machine_v0.erl", - "src/mqtt_node.erl", "src/rabbit_mqtt.erl", - "src/rabbit_mqtt_collector.erl", "src/rabbit_mqtt_confirms.erl", "src/rabbit_mqtt_ff.erl", "src/rabbit_mqtt_internal_event_handler.erl", @@ -102,7 +92,6 @@ def all_test_beam_files(name = "all_test_beam_files"): "//deps/rabbit:erlang_app", "//deps/rabbit_common:erlang_app", "//deps/rabbitmq_cli:erlang_app", - "@ra//:erlang_app", "@ranch//:erlang_app", ], ) @@ -127,14 +116,9 @@ def all_srcs(name = "all_srcs"): filegroup( name = "srcs", srcs = [ - "src/Elixir.RabbitMQ.CLI.Ctl.Commands.DecommissionMqttNodeCommand.erl", "src/Elixir.RabbitMQ.CLI.Ctl.Commands.ListMqttConnectionsCommand.erl", "src/mc_mqtt.erl", - "src/mqtt_machine.erl", - "src/mqtt_machine_v0.erl", - "src/mqtt_node.erl", "src/rabbit_mqtt.erl", - "src/rabbit_mqtt_collector.erl", "src/rabbit_mqtt_confirms.erl", "src/rabbit_mqtt_ff.erl", "src/rabbit_mqtt_internal_event_handler.erl", @@ -156,8 +140,6 @@ def all_srcs(name = "all_srcs"): filegroup( name = "public_hdrs", srcs = [ - "include/mqtt_machine.hrl", - "include/mqtt_machine_v0.hrl", "include/rabbit_mqtt.hrl", "include/rabbit_mqtt_packet.hrl", ], @@ -213,15 +195,7 @@ def test_suite_beam_files(name = "test_suite_beam_files"): app_name = "rabbitmq_mqtt", erlc_opts = "//:test_erlc_opts", ) - erlang_bytecode( - name = "mqtt_machine_SUITE_beam_files", - testonly = True, - srcs = ["test/mqtt_machine_SUITE.erl"], - outs = ["test/mqtt_machine_SUITE.beam"], - hdrs = ["include/mqtt_machine.hrl"], - app_name = "rabbitmq_mqtt", - erlc_opts = "//:test_erlc_opts", - ) + erlang_bytecode( name = "processor_SUITE_beam_files", testonly = True, @@ -280,14 +254,7 @@ def test_suite_beam_files(name = "test_suite_beam_files"): app_name = "rabbitmq_mqtt", erlc_opts = "//:test_erlc_opts", ) - erlang_bytecode( - name = "ff_SUITE_beam_files", - testonly = True, - srcs = ["test/ff_SUITE.erl"], - outs = ["test/ff_SUITE.beam"], - app_name = "rabbitmq_mqtt", - erlc_opts = "//:test_erlc_opts", - ) + erlang_bytecode( name = "shared_SUITE_beam_files", testonly = True, diff --git a/deps/rabbitmq_mqtt/include/mqtt_machine.hrl b/deps/rabbitmq_mqtt/include/mqtt_machine.hrl deleted file mode 100644 index 2d69c35c71bf..000000000000 --- a/deps/rabbitmq_mqtt/include/mqtt_machine.hrl +++ /dev/null @@ -1,25 +0,0 @@ -%% This Source Code Form is subject to the terms of the Mozilla Public -%% License, v. 2.0. If a copy of the MPL was not distributed with this -%% file, You can obtain one at https://mozilla.org/MPL/2.0/. -%% -%% Copyright (c) 2007-2024 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved. -%% - -%% A client ID that is tracked in Ra is a list of bytes -%% as returned by binary_to_list/1 in -%% https://github.com/rabbitmq/rabbitmq-server/blob/48467d6e1283b8d81e52cfd49c06ea4eaa31617d/deps/rabbitmq_mqtt/src/rabbit_mqtt_frame.erl#L137 -%% prior to 3.12.0. -%% This has two downsides: -%% 1. Lists consume more memory than binaries (when tracking many clients). -%% 2. This violates the MQTT spec which states -%% "The ClientId MUST be a UTF-8 encoded string as defined in Section 1.5.3 [MQTT-3.1.3-4]." [v4 3.1.3.1] -%% However, for backwards compatibility, we leave the client ID as a list of bytes in the Ra machine state because -%% feature flag delete_ra_cluster_mqtt_node introduced in 3.12.0 will delete the Ra cluster anyway. --type client_id_ra() :: [byte()]. - --record(machine_state, { - client_ids = #{} :: #{client_id_ra() => Connection :: pid()}, - pids = #{} :: #{Connection :: pid() => [client_id_ra(), ...]}, - %% add acouple of fields for future extensibility - reserved_1, - reserved_2}). diff --git a/deps/rabbitmq_mqtt/include/mqtt_machine_v0.hrl b/deps/rabbitmq_mqtt/include/mqtt_machine_v0.hrl deleted file mode 100644 index 2c332e8a4344..000000000000 --- a/deps/rabbitmq_mqtt/include/mqtt_machine_v0.hrl +++ /dev/null @@ -1,8 +0,0 @@ -%% This Source Code Form is subject to the terms of the Mozilla Public -%% License, v. 2.0. If a copy of the MPL was not distributed with this -%% file, You can obtain one at https://mozilla.org/MPL/2.0/. -%% -%% Copyright (c) 2007-2024 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved. -%% - --record(machine_state, {client_ids = #{}}). diff --git a/deps/rabbitmq_mqtt/src/Elixir.RabbitMQ.CLI.Ctl.Commands.DecommissionMqttNodeCommand.erl b/deps/rabbitmq_mqtt/src/Elixir.RabbitMQ.CLI.Ctl.Commands.DecommissionMqttNodeCommand.erl deleted file mode 100644 index 5bfb30523176..000000000000 --- a/deps/rabbitmq_mqtt/src/Elixir.RabbitMQ.CLI.Ctl.Commands.DecommissionMqttNodeCommand.erl +++ /dev/null @@ -1,67 +0,0 @@ -%% This Source Code Form is subject to the terms of the Mozilla Public -%% License, v. 2.0. If a copy of the MPL was not distributed with this -%% file, You can obtain one at https://mozilla.org/MPL/2.0/. -%% -%% Copyright (c) 2007-2024 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved. - --module('Elixir.RabbitMQ.CLI.Ctl.Commands.DecommissionMqttNodeCommand'). - --include("rabbit_mqtt.hrl"). - --behaviour('Elixir.RabbitMQ.CLI.CommandBehaviour'). - --export([scopes/0, - switches/0, - aliases/0, - usage/0, - usage_doc_guides/0, - banner/2, - validate/2, - merge_defaults/2, - run/2, - output/2, - description/0, - help_section/0]). - -scopes() -> [ctl]. -switches() -> []. -aliases() -> []. - -description() -> <<"Removes cluster member and permanently deletes its cluster-wide MQTT state">>. - -help_section() -> - {plugin, mqtt}. - -validate([], _Opts) -> - {validation_failure, not_enough_args}; -validate([_, _ | _], _Opts) -> - {validation_failure, too_many_args}; -validate([_], _) -> - ok. - -merge_defaults(Args, Opts) -> - {Args, Opts}. - -usage() -> - <<"decommission_mqtt_node ">>. - -usage_doc_guides() -> - [?MQTT_GUIDE_URL]. - -run([Node], #{node := NodeName, - timeout := Timeout}) -> - case rabbit_misc:rpc_call(NodeName, rabbit_mqtt_collector, leave, [Node], Timeout) of - {badrpc, _} = Error -> - Error; - nodedown -> - {ok, list_to_binary(io_lib:format("Node ~ts is down but has been successfully removed" - " from the cluster", [Node]))}; - Result -> - %% 'ok' or 'timeout' - Result - end. - -banner([Node], _) -> list_to_binary(io_lib:format("Removing node ~ts from the list of MQTT nodes...", [Node])). - -output(Result, _Opts) -> - 'Elixir.RabbitMQ.CLI.DefaultOutput':output(Result). diff --git a/deps/rabbitmq_mqtt/src/mqtt_machine.erl b/deps/rabbitmq_mqtt/src/mqtt_machine.erl deleted file mode 100644 index 50b6f994a38e..000000000000 --- a/deps/rabbitmq_mqtt/src/mqtt_machine.erl +++ /dev/null @@ -1,201 +0,0 @@ -%% This Source Code Form is subject to the terms of the Mozilla Public -%% License, v. 2.0. If a copy of the MPL was not distributed with this -%% file, You can obtain one at https://mozilla.org/MPL/2.0/. -%% -%% Copyright (c) 2007-2024 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved. -%% --module(mqtt_machine). --behaviour(ra_machine). - --include("mqtt_machine.hrl"). - --export([version/0, - which_module/1, - init/1, - apply/3, - state_enter/2, - notify_connection/2, - overview/1]). - --type state() :: #machine_state{}. - --type config() :: map(). - --type reply() :: {ok, term()} | {error, term()}. - --type command() :: {register, client_id_ra(), pid()} | - {unregister, client_id_ra(), pid()} | - list. -version() -> 1. - -which_module(1) -> ?MODULE; -which_module(0) -> mqtt_machine_v0. - --spec init(config()) -> state(). -init(_Conf) -> - #machine_state{}. - --spec apply(map(), command(), state()) -> - {state(), reply(), ra_machine:effects()}. -apply(_Meta, {register, ClientId, Pid}, - #machine_state{client_ids = Ids, - pids = Pids0} = State0) -> - {Effects, Ids1, Pids} = - case maps:find(ClientId, Ids) of - {ok, OldPid} when Pid =/= OldPid -> - Effects0 = [{demonitor, process, OldPid}, - {monitor, process, Pid}, - {mod_call, ?MODULE, notify_connection, - [OldPid, duplicate_id]}], - Pids2 = case maps:take(OldPid, Pids0) of - error -> - Pids0; - {[ClientId], Pids1} -> - Pids1; - {ClientIds, Pids1} -> - Pids1#{ClientId => lists:delete(ClientId, ClientIds)} - end, - Pids3 = maps:update_with(Pid, fun(CIds) -> [ClientId | CIds] end, - [ClientId], Pids2), - {Effects0, maps:remove(ClientId, Ids), Pids3}; - - {ok, Pid} -> - {[], Ids, Pids0}; - error -> - Pids1 = maps:update_with(Pid, fun(CIds) -> [ClientId | CIds] end, - [ClientId], Pids0), - Effects0 = [{monitor, process, Pid}], - {Effects0, Ids, Pids1} - end, - State = State0#machine_state{client_ids = maps:put(ClientId, Pid, Ids1), - pids = Pids}, - {State, ok, Effects}; - -apply(Meta, {unregister, ClientId, Pid}, #machine_state{client_ids = Ids, - pids = Pids0} = State0) -> - State = case maps:find(ClientId, Ids) of - {ok, Pid} -> - Pids = case maps:get(Pid, Pids0, undefined) of - undefined -> - Pids0; - [ClientId] -> - maps:remove(Pid, Pids0); - Cids -> - Pids0#{Pid => lists:delete(ClientId, Cids)} - end, - - State0#machine_state{client_ids = maps:remove(ClientId, Ids), - pids = Pids}; - %% don't delete client id that might belong to a newer connection - %% that kicked the one with Pid out - {ok, _AnotherPid} -> - State0; - error -> - State0 - end, - Effects0 = [{demonitor, process, Pid}], - %% snapshot only when the map has changed - Effects = case State of - State0 -> Effects0; - _ -> Effects0 ++ snapshot_effects(Meta, State) - end, - {State, ok, Effects}; - -apply(_Meta, {down, DownPid, noconnection}, State) -> - %% Monitor the node the pid is on (see {nodeup, Node} below) - %% so that we can detect when the node is re-connected and discover the - %% actual fate of the connection processes on it - Effect = {monitor, node, node(DownPid)}, - {State, ok, Effect}; - -apply(Meta, {down, DownPid, _}, #machine_state{client_ids = Ids, - pids = Pids0} = State0) -> - case maps:get(DownPid, Pids0, undefined) of - undefined -> - {State0, ok, []}; - ClientIds -> - Ids1 = maps:without(ClientIds, Ids), - State = State0#machine_state{client_ids = Ids1, - pids = maps:remove(DownPid, Pids0)}, - Effects = lists:map(fun(Id) -> - [{mod_call, rabbit_log, debug, - ["MQTT connection with client id '~ts' failed", [Id]]}] - end, ClientIds), - {State, ok, Effects ++ snapshot_effects(Meta, State)} - end; - -apply(_Meta, {nodeup, Node}, State) -> - %% Work out if any pids that were disconnected are still - %% alive. - %% Re-request the monitor for the pids on the now-back node. - Effects = [{monitor, process, Pid} || Pid <- all_pids(State), node(Pid) == Node], - {State, ok, Effects}; -apply(_Meta, {nodedown, _Node}, State) -> - {State, ok}; - -apply(Meta, {leave, Node}, #machine_state{client_ids = Ids, - pids = Pids0} = State0) -> - {Keep, Remove} = maps:fold( - fun (ClientId, Pid, {In, Out}) -> - case node(Pid) =/= Node of - true -> - {In#{ClientId => Pid}, Out}; - false -> - {In, Out#{ClientId => Pid}} - end - end, {#{}, #{}}, Ids), - Effects = maps:fold(fun (ClientId, _Pid, Acc) -> - Pid = maps:get(ClientId, Ids), - [ - {demonitor, process, Pid}, - {mod_call, ?MODULE, notify_connection, [Pid, decommission_node]}, - {mod_call, rabbit_log, debug, - ["MQTT will remove client ID '~ts' from known " - "as its node has been decommissioned", [ClientId]]} - ] ++ Acc - end, [], Remove), - - State = State0#machine_state{client_ids = Keep, - pids = maps:without(maps:values(Remove), Pids0)}, - {State, ok, Effects ++ snapshot_effects(Meta, State)}; -apply(_Meta, {machine_version, 0, 1}, {machine_state, Ids}) -> - Pids = maps:fold( - fun(Id, Pid, Acc) -> - maps:update_with(Pid, - fun(CIds) -> [Id | CIds] end, - [Id], Acc) - end, #{}, Ids), - {#machine_state{client_ids = Ids, - pids = Pids}, ok, []}; -apply(_Meta, Unknown, State) -> - logger:error("MQTT Raft state machine v1 received unknown command ~tp", [Unknown]), - {State, {error, {unknown_command, Unknown}}, []}. - --spec state_enter(ra_server:ra_state() | eol, state()) -> - ra_machine:effects(). -state_enter(leader, State) -> - %% re-request monitors for all known pids, this would clean up - %% records for all connections are no longer around, e.g. right after node restart - [{monitor, process, Pid} || Pid <- all_pids(State)]; -state_enter(_, _) -> - []. - --spec overview(state()) -> map(). -overview(#machine_state{client_ids = ClientIds, - pids = Pids}) -> - #{num_client_ids => maps:size(ClientIds), - num_pids => maps:size(Pids)}. - -%% ========================== - -%% Avoids blocking the Raft leader. --spec notify_connection(pid(), duplicate_id | decommission_node) -> pid(). -notify_connection(Pid, Reason) -> - spawn(fun() -> gen_server2:cast(Pid, Reason) end). - --spec snapshot_effects(map(), state()) -> ra_machine:effects(). -snapshot_effects(#{index := RaftIdx}, State) -> - [{release_cursor, RaftIdx, State}]. - -all_pids(#machine_state{client_ids = Ids}) -> - maps:values(Ids). diff --git a/deps/rabbitmq_mqtt/src/mqtt_machine_v0.erl b/deps/rabbitmq_mqtt/src/mqtt_machine_v0.erl deleted file mode 100644 index d522b7f30323..000000000000 --- a/deps/rabbitmq_mqtt/src/mqtt_machine_v0.erl +++ /dev/null @@ -1,137 +0,0 @@ -%% This Source Code Form is subject to the terms of the Mozilla Public -%% License, v. 2.0. If a copy of the MPL was not distributed with this -%% file, You can obtain one at https://mozilla.org/MPL/2.0/. -%% -%% Copyright (c) 2007-2024 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved. -%% --module(mqtt_machine_v0). --behaviour(ra_machine). - --include("mqtt_machine_v0.hrl"). - --export([init/1, - apply/3, - state_enter/2, - notify_connection/2]). - --type state() :: #machine_state{}. - --type config() :: map(). - --type reply() :: {ok, term()} | {error, term()}. --type client_id_ra() :: term(). - --type command() :: {register, client_id_ra(), pid()} | - {unregister, client_id_ra(), pid()} | - list. - --spec init(config()) -> state(). -init(_Conf) -> - #machine_state{}. - --spec apply(map(), command(), state()) -> - {state(), reply(), ra_machine:effects()}. -apply(_Meta, {register, ClientId, Pid}, #machine_state{client_ids = Ids} = State0) -> - {Effects, Ids1} = - case maps:find(ClientId, Ids) of - {ok, OldPid} when Pid =/= OldPid -> - Effects0 = [{demonitor, process, OldPid}, - {monitor, process, Pid}, - {mod_call, ?MODULE, notify_connection, [OldPid, duplicate_id]}], - {Effects0, maps:remove(ClientId, Ids)}; - _ -> - Effects0 = [{monitor, process, Pid}], - {Effects0, Ids} - end, - State = State0#machine_state{client_ids = maps:put(ClientId, Pid, Ids1)}, - {State, ok, Effects}; - -apply(Meta, {unregister, ClientId, Pid}, #machine_state{client_ids = Ids} = State0) -> - State = case maps:find(ClientId, Ids) of - {ok, Pid} -> State0#machine_state{client_ids = maps:remove(ClientId, Ids)}; - %% don't delete client id that might belong to a newer connection - %% that kicked the one with Pid out - {ok, _AnotherPid} -> State0; - error -> State0 - end, - Effects0 = [{demonitor, process, Pid}], - %% snapshot only when the map has changed - Effects = case State of - State0 -> Effects0; - _ -> Effects0 ++ snapshot_effects(Meta, State) - end, - {State, ok, Effects}; - -apply(_Meta, {down, DownPid, noconnection}, State) -> - %% Monitor the node the pid is on (see {nodeup, Node} below) - %% so that we can detect when the node is re-connected and discover the - %% actual fate of the connection processes on it - Effect = {monitor, node, node(DownPid)}, - {State, ok, Effect}; - -apply(Meta, {down, DownPid, _}, #machine_state{client_ids = Ids} = State0) -> - Ids1 = maps:filter(fun (_ClientId, Pid) when Pid =:= DownPid -> - false; - (_, _) -> - true - end, Ids), - State = State0#machine_state{client_ids = Ids1}, - Delta = maps:keys(Ids) -- maps:keys(Ids1), - Effects = lists:map(fun(Id) -> - [{mod_call, rabbit_log, debug, - ["MQTT connection with client id '~ts' failed", [Id]]}] end, Delta), - {State, ok, Effects ++ snapshot_effects(Meta, State)}; - -apply(_Meta, {nodeup, Node}, State) -> - %% Work out if any pids that were disconnected are still - %% alive. - %% Re-request the monitor for the pids on the now-back node. - Effects = [{monitor, process, Pid} || Pid <- all_pids(State), node(Pid) == Node], - {State, ok, Effects}; -apply(_Meta, {nodedown, _Node}, State) -> - {State, ok}; - -apply(Meta, {leave, Node}, #machine_state{client_ids = Ids} = State0) -> - Ids1 = maps:filter(fun (_ClientId, Pid) -> node(Pid) =/= Node end, Ids), - Delta = maps:keys(Ids) -- maps:keys(Ids1), - - Effects = lists:foldl(fun (ClientId, Acc) -> - Pid = maps:get(ClientId, Ids), - [ - {demonitor, process, Pid}, - {mod_call, ?MODULE, notify_connection, [Pid, decommission_node]}, - {mod_call, rabbit_log, debug, - ["MQTT will remove client ID '~ts' from known " - "as its node has been decommissioned", [ClientId]]} - ] ++ Acc - end, [], Delta), - - State = State0#machine_state{client_ids = Ids1}, - {State, ok, Effects ++ snapshot_effects(Meta, State)}; - -apply(_Meta, Unknown, State) -> - logger:error("MQTT Raft state machine received an unknown command ~tp", [Unknown]), - {State, {error, {unknown_command, Unknown}}, []}. - --spec state_enter(ra_server:ra_state(), state()) -> - ra_machine:effects(). -state_enter(leader, State) -> - %% re-request monitors for all known pids, this would clean up - %% records for all connections are no longer around, e.g. right after node restart - [{monitor, process, Pid} || Pid <- all_pids(State)]; -state_enter(_, _) -> - []. - -%% ========================== - -%% Avoids blocking the Raft leader. --spec notify_connection(pid(), duplicate_id | decommission_node) -> pid(). -notify_connection(Pid, Reason) -> - spawn(fun() -> gen_server2:cast(Pid, Reason) end). - --spec snapshot_effects(map(), state()) -> ra_machine:effects(). -snapshot_effects(#{index := RaftIdx}, State) -> - [{release_cursor, RaftIdx, State}]. - -all_pids(#machine_state{client_ids = Ids}) -> - maps:values(Ids). diff --git a/deps/rabbitmq_mqtt/src/mqtt_node.erl b/deps/rabbitmq_mqtt/src/mqtt_node.erl deleted file mode 100644 index 75f049261bcc..000000000000 --- a/deps/rabbitmq_mqtt/src/mqtt_node.erl +++ /dev/null @@ -1,174 +0,0 @@ -%% This Source Code Form is subject to the terms of the Mozilla Public -%% License, v. 2.0. If a copy of the MPL was not distributed with this -%% file, You can obtain one at https://mozilla.org/MPL/2.0/. -%% -%% Copyright (c) 2007-2024 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved. -%% --module(mqtt_node). - --export([start/0, node_id/0, server_id/0, all_node_ids/0, leave/1, trigger_election/0, - delete/1]). - --define(ID_NAME, mqtt_node). --define(START_TIMEOUT, 100_000). --define(RETRY_INTERVAL, 5000). --define(RA_OPERATION_TIMEOUT, 60_000). --define(RA_SYSTEM, coordination). - -node_id() -> - server_id(node()). - -server_id() -> - server_id(node()). - -server_id(Node) -> - {?ID_NAME, Node}. - -all_node_ids() -> - [server_id(N) || N <- rabbit_nodes:list_members(), - can_participate_in_clientid_tracking(N)]. - -start() -> - %% 3s to 6s randomized - Repetitions = rand:uniform(10) + 10, - start(300, Repetitions). - -start(_Delay, AttemptsLeft) when AttemptsLeft =< 0 -> - ok = start_server(), - trigger_election(); -start(Delay, AttemptsLeft) -> - NodeId = server_id(), - Nodes = compatible_peer_servers(), - case ra_directory:uid_of(?RA_SYSTEM, ?ID_NAME) of - undefined -> - case Nodes of - [] -> - %% Since cluster members are not known ahead of time and initial boot can be happening in parallel, - %% we wait and check a few times (up to a few seconds) to see if we can discover any peers to - %% join before forming a cluster. This reduces the probability of N independent clusters being - %% formed in the common scenario of N nodes booting in parallel e.g. because they were started - %% at the same time by a deployment tool. - %% - %% This scenario does not guarantee single cluster formation but without knowing the list of members - %% ahead of time, this is a best effort workaround. Multi-node consensus is apparently hard - %% to achieve without having consensus around expected cluster members. - rabbit_log:info("MQTT: will wait for ~tp more ms for cluster members to join before triggering a Raft leader election", [Delay]), - timer:sleep(Delay), - start(Delay, AttemptsLeft - 1); - Peers -> - %% Trigger an election. - %% This is required when we start a node for the first time. - %% Using default timeout because it supposed to reply fast. - rabbit_log:info("MQTT: discovered cluster peers that support client ID tracking: ~p", [Peers]), - ok = start_server(), - _ = join_peers(NodeId, Peers), - ra:trigger_election(NodeId, ?RA_OPERATION_TIMEOUT) - end; - _ -> - _ = join_peers(NodeId, Nodes), - ok = ra:restart_server(?RA_SYSTEM, NodeId), - ra:trigger_election(NodeId, ?RA_OPERATION_TIMEOUT) - end. - -compatible_peer_servers() -> - all_node_ids() -- [(node_id())]. - -start_server() -> - NodeId = node_id(), - Nodes = compatible_peer_servers(), - UId = ra:new_uid(ra_lib:to_binary(?ID_NAME)), - Timeout = application:get_env(kernel, net_ticktime, 60) + 5, - Conf = #{cluster_name => ?ID_NAME, - id => NodeId, - uid => UId, - friendly_name => atom_to_list(?ID_NAME), - initial_members => Nodes, - log_init_args => #{uid => UId}, - tick_timeout => Timeout, - machine => {module, mqtt_machine, #{}} - }, - rabbit_log:info("MQTT: starting Ra server with initial members: ~p", [Nodes]), - ra:start_server(?RA_SYSTEM, Conf). - -trigger_election() -> - ra:trigger_election(server_id(), ?RA_OPERATION_TIMEOUT). - -join_peers(_NodeId, []) -> - ok; -join_peers(NodeId, Nodes) -> - join_peers(NodeId, Nodes, 100). - -join_peers(_NodeId, _Nodes, RetriesLeft) when RetriesLeft =:= 0 -> - rabbit_log:error("MQTT: exhausted all attempts while trying to rejoin cluster peers"); -join_peers(NodeId, Nodes, RetriesLeft) -> - case ra:members(Nodes, ?START_TIMEOUT) of - {ok, Members, _} -> - case lists:member(NodeId, Members) of - true -> ok; - false -> ra:add_member(Members, NodeId) - end; - {timeout, _} -> - rabbit_log:debug("MQTT: timed out contacting cluster peers, %s retries left", [RetriesLeft]), - timer:sleep(?RETRY_INTERVAL), - join_peers(NodeId, Nodes, RetriesLeft - 1); - Err -> - Err - end. - --spec leave(node()) -> 'ok' | 'timeout' | 'nodedown'. -leave(Node) -> - NodeId = server_id(), - ToLeave = server_id(Node), - try - ra:leave_and_delete_server(?RA_SYSTEM, NodeId, ToLeave) - catch - exit:{{nodedown, Node}, _} -> - nodedown - end. - -can_participate_in_clientid_tracking(Node) -> - case rpc:call(Node, mqtt_machine, module_info, []) of - {badrpc, _} -> false; - _ -> true - end. - --spec delete(Args) -> Ret when - Args :: rabbit_feature_flags:enable_callback_args(), - Ret :: rabbit_feature_flags:enable_callback_ret(). -delete(_) -> - RaNodes = all_node_ids(), - Nodes = lists:map(fun({_, N}) -> N end, RaNodes), - LockId = {?ID_NAME, node_id()}, - rabbit_log:info("Trying to acquire lock ~p on nodes ~p ...", [LockId, Nodes]), - true = global:set_lock(LockId, Nodes), - rabbit_log:info("Acquired lock ~p", [LockId]), - try whereis(?ID_NAME) of - undefined -> - rabbit_log:info("Local Ra process ~s does not exist", [?ID_NAME]), - ok; - _ -> - rabbit_log:info("Deleting Ra cluster ~s ...", [?ID_NAME]), - try ra:delete_cluster(RaNodes, 15_000) of - {ok, _Leader} -> - rabbit_log:info("Successfully deleted Ra cluster ~s", [?ID_NAME]), - ok; - {error, Reason} -> - rabbit_log:info("Failed to delete Ra cluster ~s: ~p", [?ID_NAME, Reason]), - ServerId = server_id(), - case ra:force_delete_server(?RA_SYSTEM, ServerId) of - ok -> - rabbit_log:info("Successfully force deleted Ra server ~p", [ServerId]), - ok; - Error -> - rabbit_log:error("Failed to force delete Ra server ~p: ~p", - [ServerId, Error]), - {error, Error} - end - catch exit:{{shutdown, delete}, _StackTrace} -> - rabbit_log:info("Ra cluster ~s already being deleted", [?ID_NAME]), - ok - end - after - true = global:del_lock(LockId, Nodes), - rabbit_log:info("Released lock ~p", [LockId]) - end. diff --git a/deps/rabbitmq_mqtt/src/rabbit_mqtt.erl b/deps/rabbitmq_mqtt/src/rabbit_mqtt.erl index 86cf09d149b4..c5ea59abedea 100644 --- a/deps/rabbitmq_mqtt/src/rabbit_mqtt.erl +++ b/deps/rabbitmq_mqtt/src/rabbit_mqtt.erl @@ -26,12 +26,6 @@ start(normal, []) -> persist_static_configuration(), {ok, Listeners} = application:get_env(tcp_listeners), {ok, SslListeners} = application:get_env(ssl_listeners), - case rabbit_mqtt_ff:track_client_id_in_ra() of - true -> - ok = mqtt_node:start(); - false -> - ok - end, Result = rabbit_mqtt_sup:start_link({Listeners, SslListeners}, []), EMPid = case rabbit_event:start_link() of {ok, Pid} -> Pid; @@ -45,32 +39,19 @@ stop(_) -> -spec emit_connection_info_all([node()], rabbit_types:info_keys(), reference(), pid()) -> term(). emit_connection_info_all(Nodes, Items, Ref, AggregatorPid) -> - case rabbit_mqtt_ff:track_client_id_in_ra() of - true -> - %% Ra tracks connections cluster-wide. - AllPids = rabbit_mqtt_collector:list_pids(), - emit_connection_info(Items, Ref, AggregatorPid, AllPids), - %% Our node already emitted infos for all connections. Therefore, for the - %% remaining nodes, we send back 'finished' so that the CLI does not time out. - [AggregatorPid ! {Ref, finished} || _ <- lists:seq(1, length(Nodes) - 1)]; - false -> - Pids = [spawn_link(Node, ?MODULE, emit_connection_info_local, - [Items, Ref, AggregatorPid]) - || Node <- Nodes], - rabbit_control_misc:await_emitters_termination(Pids) - end. + Pids = [spawn_link(Node, ?MODULE, emit_connection_info_local, + [Items, Ref, AggregatorPid]) + || Node <- Nodes], + rabbit_control_misc:await_emitters_termination(Pids). -spec emit_connection_info_local(rabbit_types:info_keys(), reference(), pid()) -> ok. emit_connection_info_local(Items, Ref, AggregatorPid) -> LocalPids = list_local_mqtt_connections(), - emit_connection_info(Items, Ref, AggregatorPid, LocalPids). - -emit_connection_info(Items, Ref, AggregatorPid, Pids) -> rabbit_control_misc:emitting_map_with_exit_handler( AggregatorPid, Ref, fun(Pid) -> rabbit_mqtt_reader:info(Pid, Items) - end, Pids). + end, LocalPids). -spec close_local_client_connections(atom()) -> {'ok', non_neg_integer()}. close_local_client_connections(Reason) -> @@ -82,16 +63,10 @@ close_local_client_connections(Reason) -> -spec local_connection_pids() -> [pid()]. local_connection_pids() -> - case rabbit_mqtt_ff:track_client_id_in_ra() of - true -> - AllPids = rabbit_mqtt_collector:list_pids(), - lists:filter(fun(Pid) -> node(Pid) =:= node() end, AllPids); - false -> - PgScope = persistent_term:get(?PG_SCOPE), - lists:flatmap(fun(Group) -> - pg:get_local_members(PgScope, Group) - end, pg:which_groups(PgScope)) - end. + PgScope = persistent_term:get(?PG_SCOPE), + lists:flatmap(fun(Group) -> + pg:get_local_members(PgScope, Group) + end, pg:which_groups(PgScope)). %% This function excludes Web MQTT connections list_local_mqtt_connections() -> diff --git a/deps/rabbitmq_mqtt/src/rabbit_mqtt_collector.erl b/deps/rabbitmq_mqtt/src/rabbit_mqtt_collector.erl deleted file mode 100644 index d21f10111a82..000000000000 --- a/deps/rabbitmq_mqtt/src/rabbit_mqtt_collector.erl +++ /dev/null @@ -1,98 +0,0 @@ -%% This Source Code Form is subject to the terms of the Mozilla Public -%% License, v. 2.0. If a copy of the MPL was not distributed with this -%% file, You can obtain one at https://mozilla.org/MPL/2.0/. -%% -%% Copyright (c) 2007-2024 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved. -%% - --module(rabbit_mqtt_collector). - --include("mqtt_machine.hrl"). - --export([register/2, register/3, unregister/2, - list/0, list_pids/0, leave/1]). - -%%---------------------------------------------------------------------------- --spec register(client_id_ra(), pid()) -> {ok, reference()} | {error, term()}. -register(ClientId, Pid) -> - {ClusterName, _} = NodeId = mqtt_node:server_id(), - case ra_leaderboard:lookup_leader(ClusterName) of - undefined -> - case ra:members(NodeId) of - {ok, _, Leader} -> - register(Leader, ClientId, Pid); - _ = Error -> - Error - end; - Leader -> - register(Leader, ClientId, Pid) - end. - --spec register(ra:server_id(), client_id_ra(), pid()) -> - {ok, reference()} | {error, term()}. -register(ServerId, ClientId, Pid) -> - Corr = make_ref(), - send_ra_command(ServerId, {register, ClientId, Pid}, Corr), - erlang:send_after(5000, self(), {ra_event, undefined, register_timeout}), - {ok, Corr}. - --spec unregister(client_id_ra(), pid()) -> ok. -unregister(ClientId, Pid) -> - {ClusterName, _} = mqtt_node:server_id(), - case ra_leaderboard:lookup_leader(ClusterName) of - undefined -> - ok; - Leader -> - send_ra_command(Leader, {unregister, ClientId, Pid}, no_correlation) - end. - --spec list_pids() -> [pid()]. -list_pids() -> - list(fun(#machine_state{pids = Pids}) -> maps:keys(Pids) end). - --spec list() -> term(). -list() -> - list(fun(#machine_state{client_ids = Ids}) -> maps:to_list(Ids) end). - -list(QF) -> - {ClusterName, _} = mqtt_node:server_id(), - case ra_leaderboard:lookup_leader(ClusterName) of - undefined -> - NodeIds = mqtt_node:all_node_ids(), - case ra:leader_query(NodeIds, QF) of - {ok, {_, Result}, _} -> Result; - {timeout, _} -> - rabbit_log:debug("~ts:list/1 leader query timed out", - [?MODULE]), - [] - end; - Leader -> - case ra:leader_query(Leader, QF) of - {ok, {_, Result}, _} -> Result; - {error, _} -> - []; - {timeout, _} -> - rabbit_log:debug("~ts:list/1 leader query timed out", - [?MODULE]), - [] - end - end. - --spec leave(binary()) -> ok | timeout | nodedown. -leave(NodeBin) -> - Node = binary_to_atom(NodeBin, utf8), - ServerId = mqtt_node:server_id(), - run_ra_command(ServerId, {leave, Node}), - mqtt_node:leave(Node). - -%%---------------------------------------------------------------------------- --spec run_ra_command(term(), term()) -> term() | {error, term()}. -run_ra_command(ServerId, RaCommand) -> - case ra:process_command(ServerId, RaCommand) of - {ok, Result, _} -> Result; - _ = Error -> Error - end. - --spec send_ra_command(term(), term(), term()) -> ok. -send_ra_command(ServerId, RaCommand, Correlation) -> - ok = ra:pipeline_command(ServerId, RaCommand, Correlation, normal). diff --git a/deps/rabbitmq_mqtt/src/rabbit_mqtt_ff.erl b/deps/rabbitmq_mqtt/src/rabbit_mqtt_ff.erl index 7e8c0ce0294e..3b35c794af39 100644 --- a/deps/rabbitmq_mqtt/src/rabbit_mqtt_ff.erl +++ b/deps/rabbitmq_mqtt/src/rabbit_mqtt_ff.erl @@ -9,8 +9,6 @@ -include("rabbit_mqtt.hrl"). --export([track_client_id_in_ra/0]). - %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %% Feature flags introduced in 3.12.0 %% %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% @@ -18,14 +16,13 @@ -rabbit_feature_flag( {?QUEUE_TYPE_QOS_0, #{desc => "Support pseudo queue type for MQTT QoS 0 subscribers omitting a queue process", - stability => stable + stability => required }}). -rabbit_feature_flag( {delete_ra_cluster_mqtt_node, #{desc => "Delete Ra cluster 'mqtt_node' since MQTT client IDs are tracked locally", - stability => stable, - callbacks => #{enable => {mqtt_node, delete}} + stability => required }}). %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% @@ -40,14 +37,10 @@ -rabbit_feature_flag( {mqtt_v5, #{desc => "Support MQTT 5.0", - stability => stable, + stability => required, depends_on => [ %% MQTT 5.0 feature Will Delay Interval depends on client ID tracking in pg local. delete_ra_cluster_mqtt_node, message_containers ] }}). - --spec track_client_id_in_ra() -> boolean(). -track_client_id_in_ra() -> - rabbit_feature_flags:is_disabled(delete_ra_cluster_mqtt_node). diff --git a/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl b/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl index ca74b4fa84b6..eeea5b8a8295 100644 --- a/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl +++ b/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl @@ -10,7 +10,7 @@ -export([info/2, init/4, process_packet/2, terminate/3, handle_pre_hibernate/0, - handle_ra_event/2, handle_down/2, handle_queue_event/2, + handle_down/2, handle_queue_event/2, proto_version_tuple/1, throttle/2, format_status/1, remove_duplicate_client_id_connections/2, remove_duplicate_client_id_connections/3, @@ -100,7 +100,6 @@ %% [v5 4.8.1] subscriptions = #{} :: subscriptions(), auth_state = #auth_state{}, - ra_register_state :: option(registered | {pending, reference()}), %% quorum queues and streams whose soft limit has been exceeded queues_soft_limit_exceeded = sets:new([{version, 2}]) :: sets:set(), qos0_messages_dropped = 0 :: non_neg_integer(), @@ -176,7 +175,6 @@ process_connect( end, Result0 = maybe - ok ?= check_protocol_version(ProtoVer), ok ?= check_extended_auth(ConnectProps), {ok, ClientId} ?= ensure_client_id(ClientId0, CleanStart, ProtoVer), {ok, {Username1, Password}} ?= check_credentials(Username0, Password0, SslLoginName, PeerIp), @@ -192,7 +190,7 @@ process_connect( {ok, AuthzCtx} ?= check_vhost_access(VHost, User, ClientId, PeerIp), ok ?= check_user_loopback(Username, PeerIp), rabbit_core_metrics:auth_attempt_succeeded(PeerIp, Username, mqtt), - {ok, RaRegisterState} ?= register_client_id(VHost, ClientId, CleanStart, WillProps), + ok = register_client_id(VHost, ClientId, CleanStart, WillProps), {ok, WillMsg} ?= make_will_msg(Packet), {TraceState, ConnName} = init_trace(VHost, ConnName0), ok = rabbit_mqtt_keepalive:start(KeepaliveSecs, Socket), @@ -221,8 +219,7 @@ process_connect( topic_alias_maximum_outbound = TopicAliasMaxOutbound}, auth_state = #auth_state{ user = User, - authz_ctx = AuthzCtx}, - ra_register_state = RaRegisterState}, + authz_ctx = AuthzCtx}}, ok ?= clear_will_msg(S), {ok, S} end, @@ -317,7 +314,6 @@ process_connect(State0) -> {ok, SessPresent, State} else {error, _} = Error -> - unregister_client(State0), Error end. @@ -613,18 +609,6 @@ update_session_expiry_interval(QName, Expiry) -> ok = rabbit_queue_type:policy_changed(Q) % respects queue args end. -check_protocol_version(V) - when V =:= 3 orelse V =:= 4 -> - ok; -check_protocol_version(5) -> - case rabbit_feature_flags:is_enabled(mqtt_v5) of - true -> - ok; - false -> - ?LOG_ERROR("Rejecting MQTT 5.0 connection because feature flag mqtt_v5 is disabled"), - {error, ?RC_UNSUPPORTED_PROTOCOL_VERSION} - end. - check_extended_auth(#{'Authentication-Method' := Method}) -> %% In future, we could support SASL via rabbit_auth_mechanism %% as done by rabbit_reader and rabbit_stream_reader. @@ -665,47 +649,29 @@ ensure_client_id(ClientId, _, _) when is_binary(ClientId) -> {ok, ClientId}. --spec register_client_id(rabbit_types:vhost(), client_id(), boolean(), properties()) -> - {ok, RaRegisterState :: undefined | {pending, reference()}} | - {error, ConnectErrorCode :: pos_integer()}. +-spec register_client_id(rabbit_types:vhost(), client_id(), boolean(), properties()) -> ok. register_client_id(VHost, ClientId, CleanStart, WillProps) when is_binary(VHost), is_binary(ClientId) -> - %% Always register client ID in pg. PgGroup = {VHost, ClientId}, ok = pg:join(persistent_term:get(?PG_SCOPE), PgGroup, self()), - - case rabbit_mqtt_ff:track_client_id_in_ra() of - true -> - case collector_register(ClientId) of - {ok, Corr} -> - %% Ra node takes care of removing duplicate client ID connections. - {ok, {pending, Corr}}; - {error, _} = Err -> - %% e.g. this node was removed from the MQTT cluster members - ?LOG_ERROR("MQTT connection failed to register client ID ~s in vhost ~s in Ra: ~p", - [ClientId, VHost, Err]), - {error, ?RC_IMPLEMENTATION_SPECIFIC_ERROR} - end; - false -> - %% "If a Network Connection uses a Client Identifier of an existing Network Connection to - %% the Server, the Will Message for the exiting connection is sent unless the new - %% connection specifies Clean Start of 0 and the Will Delay is greater than zero." - %% [v5 3.1.3.2.2] - Args = case {CleanStart, WillProps} of - {false, #{'Will-Delay-Interval' := I}} when I > 0 -> - [PgGroup, self(), false]; - _ -> - [PgGroup, self()] - end, - ok = erpc:multicast([node() | nodes()], - ?MODULE, - remove_duplicate_client_id_connections, - Args), - {ok, undefined} - end. - -%% Once feature flag mqtt_v5 becomes required, the caller should always pass SendWill to this -%% function (remove_duplicate_client_id_connections/2) so that we can delete this function. + %% "If a Network Connection uses a Client Identifier of an existing Network Connection to + %% the Server, the Will Message for the exiting connection is sent unless the new + %% connection specifies Clean Start of 0 and the Will Delay is greater than zero." + %% [v5 3.1.3.2.2] + SendWill = case {CleanStart, WillProps} of + {false, #{'Will-Delay-Interval' := I}} when I > 0 -> + false; + _ -> + true + end, + ok = erpc:multicast([node() | nodes()], + ?MODULE, + remove_duplicate_client_id_connections, + [PgGroup, self(), SendWill]). + +%% remove_duplicate_client_id_connections/2 is only called from 3.13 nodes. +%% Hence, this function can be deleted when mixed version clusters between +%% this version and 3.13 are disallowed. -spec remove_duplicate_client_id_connections( {rabbit_types:vhost(), client_id()}, pid()) -> ok. remove_duplicate_client_id_connections(PgGroup, PidToKeep) -> @@ -1403,13 +1369,8 @@ queue_ttl_args(SessionExpirySecs) when is_integer(SessionExpirySecs) andalso SessionExpirySecs > 0 -> [{?QUEUE_TTL_KEY, long, timer:seconds(SessionExpirySecs)}]. -queue_type(?QOS_0, 0, QArgs) -> - case rabbit_queue_type:is_enabled(?QUEUE_TYPE_QOS_0) of - true -> - ?QUEUE_TYPE_QOS_0; - false -> - rabbit_amqqueue:get_queue_type(QArgs) - end; +queue_type(?QOS_0, 0, _QArgs) -> + ?QUEUE_TYPE_QOS_0; queue_type(_, _, QArgs) -> rabbit_amqqueue:get_queue_type(QArgs). @@ -1703,7 +1664,6 @@ terminate(SendWill, Infos, State = #state{queue_states = QStates}) -> rabbit_core_metrics:connection_closed(self()), rabbit_event:notify(connection_closed, Infos), rabbit_networking:unregister_non_amqp_connection(self()), - unregister_client(State), maybe_decrement_consumer(State), maybe_decrement_publisher(State), _ = maybe_delete_mqtt_qos0_queue(State), @@ -1799,15 +1759,6 @@ log_delayed_will_failure(Topic, ClientId, Reason) -> ?LOG_DEBUG("failed to schedule delayed Will Message to topic ~s for MQTT client ID ~s: ~p", [Topic, ClientId, Reason]). -unregister_client(#state{cfg = #cfg{client_id = ClientIdBin}}) -> - case rabbit_mqtt_ff:track_client_id_in_ra() of - true -> - ClientId = rabbit_data_coercion:to_list(ClientIdBin), - rabbit_mqtt_collector:unregister(ClientId, self()); - false -> - ok - end. - maybe_delete_mqtt_qos0_queue( State = #state{cfg = #cfg{clean_start = true}, auth_state = #auth_state{user = #user{username = Username}}}) -> @@ -1867,41 +1818,6 @@ handle_pre_hibernate() -> erase(topic_permission_cache), ok. --spec handle_ra_event(register_timeout -| {applied, [{reference(), ok}]} -| {not_leader, term(), reference()}, state()) -> state(). -handle_ra_event({applied, [{Corr, ok}]}, - State = #state{ra_register_state = {pending, Corr}}) -> - %% success case - command was applied transition into registered state - State#state{ra_register_state = registered}; -handle_ra_event({not_leader, Leader, Corr}, - State = #state{ra_register_state = {pending, Corr}, - cfg = #cfg{client_id = ClientIdBin}}) -> - case rabbit_mqtt_ff:track_client_id_in_ra() of - true -> - ClientId = rabbit_data_coercion:to_list(ClientIdBin), - %% retry command against actual leader - {ok, NewCorr} = rabbit_mqtt_collector:register(Leader, ClientId, self()), - State#state{ra_register_state = {pending, NewCorr}}; - false -> - State - end; -handle_ra_event(register_timeout, - State = #state{ra_register_state = {pending, _Corr}, - cfg = #cfg{client_id = ClientId}}) -> - case rabbit_mqtt_ff:track_client_id_in_ra() of - true -> - {ok, NewCorr} = collector_register(ClientId), - State#state{ra_register_state = {pending, NewCorr}}; - false -> - State - end; -handle_ra_event(register_timeout, State) -> - State; -handle_ra_event(Evt, State) -> - ?LOG_DEBUG("unhandled ra_event: ~w ", [Evt]), - State. - -spec handle_down(term(), state()) -> {ok, state()} | {error, Reason :: any()}. handle_down({{'DOWN', QName}, _MRef, process, QPid, Reason}, @@ -2441,10 +2357,6 @@ message_redelivered(true, ProtoVer, QType) -> message_redelivered(_, _, _) -> ok. -collector_register(ClientIdBin) -> - ClientId = rabbit_data_coercion:to_list(ClientIdBin), - rabbit_mqtt_collector:register(ClientId, self()). - %% "Reason Codes less than 0x80 indicate successful completion of an operation. %% Reason Code values of 0x80 or greater indicate failure." -spec is_success(reason_code()) -> boolean(). @@ -2459,7 +2371,6 @@ format_status( packet_id = PackID, subscriptions = Subscriptions, auth_state = AuthState, - ra_register_state = RaRegisterState, queues_soft_limit_exceeded = QSLE, qos0_messages_dropped = Qos0MsgsDropped, cfg = #cfg{ @@ -2510,7 +2421,6 @@ format_status( packet_id => PackID, subscriptions => Subscriptions, auth_state => AuthState, - ra_register_state => RaRegisterState, queues_soft_limit_exceeded => QSLE, qos0_messages_dropped => Qos0MsgsDropped}. diff --git a/deps/rabbitmq_mqtt/src/rabbit_mqtt_qos0_queue.erl b/deps/rabbitmq_mqtt/src/rabbit_mqtt_qos0_queue.erl index aad723bd1216..920276966c6c 100644 --- a/deps/rabbitmq_mqtt/src/rabbit_mqtt_qos0_queue.erl +++ b/deps/rabbitmq_mqtt/src/rabbit_mqtt_qos0_queue.erl @@ -145,7 +145,7 @@ deliver(Qs, Msg, Options) -> {[], Actions}. -spec is_enabled() -> boolean(). -is_enabled() -> rabbit_feature_flags:is_enabled(?MODULE). +is_enabled() -> true. -spec is_compatible(boolean(), boolean(), boolean()) -> boolean(). diff --git a/deps/rabbitmq_mqtt/src/rabbit_mqtt_reader.erl b/deps/rabbitmq_mqtt/src/rabbit_mqtt_reader.erl index 1050b38f7258..47b12effe37d 100644 --- a/deps/rabbitmq_mqtt/src/rabbit_mqtt_reader.erl +++ b/deps/rabbitmq_mqtt/src/rabbit_mqtt_reader.erl @@ -112,11 +112,6 @@ handle_call({info, InfoItems}, _From, State) -> handle_call(Msg, From, State) -> {stop, {mqtt_unexpected_call, Msg, From}, State}. -%% Delete this backward compatibility clause when feature flag -%% delete_ra_cluster_mqtt_node becomes required. -handle_cast(duplicate_id, State) -> - handle_cast({duplicate_id, true}, State); - handle_cast({duplicate_id, SendWill}, State = #state{proc_state = PState, conn_name = ConnName}) -> @@ -235,13 +230,6 @@ handle_info(login_timeout, State) -> handle_info(emit_stats, State) -> {noreply, emit_stats(State), ?HIBERNATE_AFTER}; -handle_info({ra_event, _From, Evt}, - #state{proc_state = PState0} = State) -> - %% handle applied event to ensure registration command actually got applied - %% handle not_leader notification in case we send the command to a non-leader - PState = rabbit_mqtt_processor:handle_ra_event(Evt, PState0), - {noreply, pstate(State, PState), ?HIBERNATE_AFTER}; - handle_info({{'DOWN', _QName}, _MRef, process, _Pid, _Reason} = Evt, #state{proc_state = PState0} = State) -> case rabbit_mqtt_processor:handle_down(Evt, PState0) of diff --git a/deps/rabbitmq_mqtt/test/auth_SUITE.erl b/deps/rabbitmq_mqtt/test/auth_SUITE.erl index 1f2cd90411a7..8a15566290b0 100644 --- a/deps/rabbitmq_mqtt/test/auth_SUITE.erl +++ b/deps/rabbitmq_mqtt/test/auth_SUITE.erl @@ -129,18 +129,16 @@ init_per_group(authz, Config0) -> ,{vhost, VHost} ,{exchange, <<"amq.topic">>} ]}, - Config1 = rabbit_ct_helpers:run_setup_steps(rabbit_ct_helpers:merge_app_env(Config0, MqttConfig), - rabbit_ct_broker_helpers:setup_steps() ++ - rabbit_ct_client_helpers:setup_steps()), - rabbit_ct_broker_helpers:add_user(Config1, User, Password), - rabbit_ct_broker_helpers:add_vhost(Config1, VHost), - [Log|_] = rpc(Config1, 0, rabbit, log_locations, []), - Config2 = [{mqtt_user, User}, - {mqtt_vhost, VHost}, - {mqtt_password, Password}, - {log_location, Log} | Config1], - ok = rabbit_ct_broker_helpers:enable_feature_flag(Config2, mqtt_v5), - Config2; + Config = rabbit_ct_helpers:run_setup_steps(rabbit_ct_helpers:merge_app_env(Config0, MqttConfig), + rabbit_ct_broker_helpers:setup_steps() ++ + rabbit_ct_client_helpers:setup_steps()), + rabbit_ct_broker_helpers:add_user(Config, User, Password), + rabbit_ct_broker_helpers:add_vhost(Config, VHost), + [Log|_] = rpc(Config, 0, rabbit, log_locations, []), + [{mqtt_user, User}, + {mqtt_vhost, VHost}, + {mqtt_password, Password}, + {log_location, Log} | Config]; init_per_group(Group, Config) -> Suffix = rabbit_ct_helpers:testcase_absname(Config, "", "-"), Config1 = rabbit_ct_helpers:set_config(Config, [ @@ -149,22 +147,20 @@ init_per_group(Group, Config) -> ]), MqttConfig = mqtt_config(Group), AuthConfig = auth_config(Group), - Config2 = rabbit_ct_helpers:run_setup_steps( - Config1, - [fun(Conf) -> case MqttConfig of - undefined -> Conf; - _ -> merge_app_env(MqttConfig, Conf) - end - end] ++ - [fun(Conf) -> case AuthConfig of - undefined -> Conf; - _ -> merge_app_env(AuthConfig, Conf) - end - end] ++ - rabbit_ct_broker_helpers:setup_steps() ++ - rabbit_ct_client_helpers:setup_steps()), - ok = rabbit_ct_broker_helpers:enable_feature_flag(Config2, mqtt_v5), - Config2. + rabbit_ct_helpers:run_setup_steps( + Config1, + [fun(Conf) -> case MqttConfig of + undefined -> Conf; + _ -> merge_app_env(MqttConfig, Conf) + end + end] ++ + [fun(Conf) -> case AuthConfig of + undefined -> Conf; + _ -> merge_app_env(AuthConfig, Conf) + end + end] ++ + rabbit_ct_broker_helpers:setup_steps() ++ + rabbit_ct_client_helpers:setup_steps()). end_per_group(G, Config) when G =:= v4; diff --git a/deps/rabbitmq_mqtt/test/cluster_SUITE.erl b/deps/rabbitmq_mqtt/test/cluster_SUITE.erl index 36333799ef32..bb404499d6f5 100644 --- a/deps/rabbitmq_mqtt/test/cluster_SUITE.erl +++ b/deps/rabbitmq_mqtt/test/cluster_SUITE.erl @@ -79,11 +79,11 @@ init_per_testcase(Testcase, Config) -> {rmq_nodename_suffix, Testcase}, {rmq_nodes_clustered, true} ]), - Config2 = rabbit_ct_helpers:run_setup_steps(Config1, - [ fun merge_app_env/1 ] ++ + rabbit_ct_helpers:run_setup_steps( + Config1, + [fun merge_app_env/1] ++ setup_steps() ++ - rabbit_ct_client_helpers:setup_steps()), - util:maybe_skip_v5(Config2). + rabbit_ct_client_helpers:setup_steps()). end_per_testcase(Testcase, Config) -> rabbit_ct_helpers:run_steps(Config, @@ -139,7 +139,7 @@ connection_id_tracking_on_nodedown(Config) -> process_flag(trap_exit, true), ok = stop_node(Config, 0), await_exit(C), - ok = eventually(?_assertEqual([], util:all_connection_pids(1, Config)), 500, 4). + ok = eventually(?_assertEqual([], util:all_connection_pids(Config)), 500, 4). %% %% Helpers diff --git a/deps/rabbitmq_mqtt/test/command_SUITE.erl b/deps/rabbitmq_mqtt/test/command_SUITE.erl index 04badd9ff526..528c4b0b1b97 100644 --- a/deps/rabbitmq_mqtt/test/command_SUITE.erl +++ b/deps/rabbitmq_mqtt/test/command_SUITE.erl @@ -56,9 +56,7 @@ end_per_suite(Config) -> init_per_group(unit, Config) -> Config; init_per_group(Group, Config) -> - ok = rabbit_ct_broker_helpers:enable_feature_flag(Config, delete_ra_cluster_mqtt_node), - Config1 = rabbit_ct_helpers:set_config(Config, {mqtt_version, Group}), - util:maybe_skip_v5(Config1). + rabbit_ct_helpers:set_config(Config, {mqtt_version, Group}). end_per_group(_, Config) -> Config. diff --git a/deps/rabbitmq_mqtt/test/ff_SUITE.erl b/deps/rabbitmq_mqtt/test/ff_SUITE.erl deleted file mode 100644 index ce36864dda1e..000000000000 --- a/deps/rabbitmq_mqtt/test/ff_SUITE.erl +++ /dev/null @@ -1,150 +0,0 @@ -%% This Source Code Form is subject to the terms of the Mozilla Public -%% License, v. 2.0. If a copy of the MPL was not distributed with this -%% file, You can obtain one at https://mozilla.org/MPL/2.0/. -%% -%% Copyright (c) 2007-2024 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved. - --module(ff_SUITE). - --compile([export_all, nowarn_export_all]). - --include_lib("eunit/include/eunit.hrl"). - --import(rabbit_ct_broker_helpers, [rpc/5]). --import(rabbit_ct_helpers, [eventually/1]). --import(util, [expect_publishes/3, - get_global_counters/4, - connect/2, - connect/4]). - --define(PROTO_VER, v4). - -all() -> - [ - {group, cluster_size_3} - ]. - -groups() -> - [ - {cluster_size_3, [], - [rabbit_mqtt_qos0_queue, - %% delete_ra_cluster_mqtt_node must run before mqtt_v5 - %% because the latter depends on (i.e. auto-enables) the former. - delete_ra_cluster_mqtt_node, - mqtt_v5]} - ]. - -suite() -> - [ - {timetrap, {minutes, 10}} - ]. - -init_per_suite(Config) -> - rabbit_ct_helpers:log_environment(), - rabbit_ct_helpers:run_setup_steps(Config, []). - -end_per_suite(Config) -> - rabbit_ct_helpers:run_teardown_steps(Config). - -init_per_group(Group = cluster_size_3, Config0) -> - Config1 = rabbit_ct_helpers:set_config(Config0, [{rmq_nodes_count, 3}, - {rmq_nodename_suffix, Group}]), - Config = rabbit_ct_helpers:merge_app_env( - Config1, {rabbit, [{forced_feature_flags_on_init, []}]}), - rabbit_ct_helpers:run_steps(Config, - rabbit_ct_broker_helpers:setup_steps() ++ - rabbit_ct_client_helpers:setup_steps()). - -end_per_group(_Group, Config) -> - rabbit_ct_helpers:run_steps(Config, - rabbit_ct_client_helpers:teardown_steps() ++ - rabbit_ct_broker_helpers:teardown_steps()). - -init_per_testcase(TestCase, Config) -> - case rabbit_ct_broker_helpers:is_feature_flag_supported(Config, TestCase) of - true -> - ?assertNot(rabbit_ct_broker_helpers:is_feature_flag_enabled(Config, TestCase)), - Config; - false -> - {skip, io_lib:format("feature flag ~s is unsupported", [TestCase])} - end. - -end_per_testcase(_TestCase, Config) -> - Config. - -delete_ra_cluster_mqtt_node(Config) -> - FeatureFlag = ?FUNCTION_NAME, - C = connect(<<"my-client">>, Config, 1, []), - timer:sleep(500), - %% old client ID tracking works - ?assertEqual(1, length(util:all_connection_pids(Config))), - %% Ra processes are alive - ?assert(lists:all(fun erlang:is_pid/1, - rabbit_ct_broker_helpers:rpc_all(Config, erlang, whereis, [mqtt_node]))), - - ?assertEqual(ok, - rabbit_ct_broker_helpers:enable_feature_flag(Config, FeatureFlag)), - - %% Ra processes should be gone - eventually( - ?_assert(lists:all(fun(Pid) -> Pid =:= undefined end, - rabbit_ct_broker_helpers:rpc_all(Config, erlang, whereis, [mqtt_node])))), - %% new client ID tracking works - ?assertEqual(1, length(util:all_connection_pids(Config))), - ok = emqtt:disconnect(C), - eventually(?_assertEqual(0, length(util:all_connection_pids(Config)))). - -rabbit_mqtt_qos0_queue(Config) -> - FeatureFlag = ?FUNCTION_NAME, - Msg = Topic = ClientId = atom_to_binary(?FUNCTION_NAME), - - C1 = connect(ClientId, Config), - {ok, _, [0]} = emqtt:subscribe(C1, Topic, qos0), - ok = emqtt:publish(C1, Topic, Msg, qos0), - ok = expect_publishes(C1, Topic, [Msg]), - ?assertEqual(1, - length(rpc(Config, 0, rabbit_amqqueue, list_by_type, [rabbit_classic_queue]))), - - ?assertEqual(ok, - rabbit_ct_broker_helpers:enable_feature_flag(Config, FeatureFlag)), - - %% Queue type does not chanage for existing connection. - ?assertEqual(1, - length(rpc(Config, 0, rabbit_amqqueue, list_by_type, [rabbit_classic_queue]))), - ok = emqtt:publish(C1, Topic, Msg, qos0), - ok = expect_publishes(C1, Topic, [Msg]), - ?assertMatch(#{messages_delivered_total := 2, - messages_delivered_consume_auto_ack_total := 2}, - get_global_counters(Config, ?PROTO_VER, 0, [{queue_type, rabbit_classic_queue}])), - - %% Reconnecting with the same client ID will terminate the old connection. - true = unlink(C1), - C2 = connect(ClientId, Config), - {ok, _, [0]} = emqtt:subscribe(C2, Topic, qos0), - %% This time, we get the new queue type. - eventually( - ?_assertEqual(0, - length(rpc(Config, 0, rabbit_amqqueue, list_by_type, [rabbit_classic_queue])))), - ?assertEqual(1, - length(rpc(Config, 0, rabbit_amqqueue, list_by_type, [FeatureFlag]))), - ok = emqtt:publish(C2, Topic, Msg, qos0), - ok = expect_publishes(C2, Topic, [Msg]), - ?assertMatch(#{messages_delivered_total := 1, - messages_delivered_consume_auto_ack_total := 1}, - get_global_counters(Config, ?PROTO_VER, 0, [{queue_type, FeatureFlag}])), - ok = emqtt:disconnect(C2). - -mqtt_v5(Config) -> - FeatureFlag = ?FUNCTION_NAME, - - %% MQTT 5.0 is not yet supported. - {C1, Connect} = util:start_client(?FUNCTION_NAME, Config, 0, [{proto_ver, v5}]), - unlink(C1), - ?assertEqual({error, {unsupported_protocol_version, #{}}}, Connect(C1)), - - ?assertEqual(ok, rabbit_ct_broker_helpers:enable_feature_flag(Config, FeatureFlag)), - - %% MQTT 5.0 is now supported. - {C5, Connect} = util:start_client(?FUNCTION_NAME, Config, 0, [{proto_ver, v5}]), - ?assertMatch({ok, _}, Connect(C5)), - ok = emqtt:disconnect(C5). diff --git a/deps/rabbitmq_mqtt/test/java_SUITE.erl b/deps/rabbitmq_mqtt/test/java_SUITE.erl index 121644083d49..eb4f6ac48622 100644 --- a/deps/rabbitmq_mqtt/test/java_SUITE.erl +++ b/deps/rabbitmq_mqtt/test/java_SUITE.erl @@ -54,18 +54,17 @@ end_per_suite(Config) -> init_per_group(Group, Config0) -> Suffix = rabbit_ct_helpers:testcase_absname(Config0, "", "-"), - Config1 = rabbit_ct_helpers:set_config(Config0, [ - {rmq_nodename_suffix, Suffix}, - {rmq_certspwd, "bunnychow"}, - {rmq_nodes_clustered, true}, - {rmq_nodes_count, 3}, - {mqtt_version, Group} - ]), - Config = rabbit_ct_helpers:run_setup_steps(Config1, - [ fun merge_app_env/1 ] ++ + Config = rabbit_ct_helpers:set_config( + Config0, [{rmq_nodename_suffix, Suffix}, + {rmq_certspwd, "bunnychow"}, + {rmq_nodes_clustered, true}, + {rmq_nodes_count, 3}, + {mqtt_version, Group}]), + rabbit_ct_helpers:run_setup_steps( + Config, + [fun merge_app_env/1] ++ rabbit_ct_broker_helpers:setup_steps() ++ - rabbit_ct_client_helpers:setup_steps()), - util:maybe_skip_v5(Config). + rabbit_ct_client_helpers:setup_steps()). end_per_group(_, Config) -> rabbit_ct_helpers:run_teardown_steps(Config, diff --git a/deps/rabbitmq_mqtt/test/mqtt_machine_SUITE.erl b/deps/rabbitmq_mqtt/test/mqtt_machine_SUITE.erl deleted file mode 100644 index 6568681d07b1..000000000000 --- a/deps/rabbitmq_mqtt/test/mqtt_machine_SUITE.erl +++ /dev/null @@ -1,95 +0,0 @@ --module(mqtt_machine_SUITE). - --compile([export_all, nowarn_export_all]). - --include_lib("eunit/include/eunit.hrl"). --include("mqtt_machine.hrl"). - -%%%=================================================================== -%%% Common Test callbacks -%%%=================================================================== - -all() -> - [ - {group, tests} - ]. - - -all_tests() -> - [ - basics, - machine_upgrade, - many_downs - ]. - -groups() -> - [ - {tests, [], all_tests()} - ]. - -%%%=================================================================== -%%% Test cases -%%%=================================================================== - -basics(_Config) -> - S0 = mqtt_machine:init(#{}), - ClientId = <<"id1">>, - OthPid = spawn(fun () -> ok end), - {S1, ok, _} = mqtt_machine:apply(meta(1), {register, ClientId, self()}, S0), - ?assertMatch(#machine_state{client_ids = Ids} when map_size(Ids) == 1, S1), - ?assertMatch(#machine_state{pids = Pids} when map_size(Pids) == 1, S1), - {S2, ok, _} = mqtt_machine:apply(meta(2), {register, ClientId, OthPid}, S1), - ?assertMatch(#machine_state{client_ids = #{ClientId := OthPid} = Ids} - when map_size(Ids) == 1, S2), - {S3, ok, _} = mqtt_machine:apply(meta(3), {down, OthPid, noproc}, S2), - ?assertMatch(#machine_state{client_ids = Ids} when map_size(Ids) == 0, S3), - {S4, ok, _} = mqtt_machine:apply(meta(3), {unregister, ClientId, OthPid}, S2), - ?assertMatch(#machine_state{client_ids = Ids} when map_size(Ids) == 0, S4), - - ok. - -machine_upgrade(_Config) -> - S0 = mqtt_machine_v0:init(#{}), - ClientId = <<"id1">>, - Self = self(), - {S1, ok, _} = mqtt_machine_v0:apply(meta(1), {register, ClientId, self()}, S0), - ?assertMatch({machine_state, Ids} when map_size(Ids) == 1, S1), - {S2, ok, _} = mqtt_machine:apply(meta(2), {machine_version, 0, 1}, S1), - ?assertMatch(#machine_state{client_ids = #{ClientId := Self}, - pids = #{Self := [ClientId]} = Pids} - when map_size(Pids) == 1, S2), - {S3, ok, _} = mqtt_machine:apply(meta(3), {down, self(), noproc}, S2), - ?assertMatch(#machine_state{client_ids = Ids, - pids = Pids} - when map_size(Ids) == 0 andalso map_size(Pids) == 0, S3), - - ok. - -many_downs(_Config) -> - S0 = mqtt_machine:init(#{}), - Clients = [{list_to_binary(integer_to_list(I)), spawn(fun() -> ok end)} - || I <- lists:seq(1, 10000)], - S1 = lists:foldl( - fun ({ClientId, Pid}, Acc0) -> - {Acc, ok, _} = mqtt_machine:apply(meta(1), {register, ClientId, Pid}, Acc0), - Acc - end, S0, Clients), - _ = lists:foldl( - fun ({_ClientId, Pid}, Acc0) -> - {Acc, ok, _} = mqtt_machine:apply(meta(1), {down, Pid, noproc}, Acc0), - Acc - end, S1, Clients), - _ = lists:foldl( - fun ({ClientId, Pid}, Acc0) -> - {Acc, ok, _} = mqtt_machine:apply(meta(1), {unregister, ClientId, - Pid}, Acc0), - Acc - end, S0, Clients), - - ok. -%% Utility - -meta(Idx) -> - #{index => Idx, - term => 1, - ts => erlang:system_time(millisecond)}. diff --git a/deps/rabbitmq_mqtt/test/protocol_interop_SUITE.erl b/deps/rabbitmq_mqtt/test/protocol_interop_SUITE.erl index 4c8ba680b23e..d85fc4fb1b14 100644 --- a/deps/rabbitmq_mqtt/test/protocol_interop_SUITE.erl +++ b/deps/rabbitmq_mqtt/test/protocol_interop_SUITE.erl @@ -71,7 +71,6 @@ init_per_group(Group, Config0) -> Config1, rabbit_ct_broker_helpers:setup_steps() ++ rabbit_ct_client_helpers:setup_steps()), - ok = rabbit_ct_broker_helpers:enable_feature_flag(Config, mqtt_v5), Plugins = [rabbitmq_stomp, rabbitmq_stream], @@ -367,7 +366,6 @@ amqp_mqtt_amqp(Config) -> %% consume via MQTT 5.0 with a QoS 0 subscription. amqp_mqtt_qos0(Config) -> %% We want to test that the old node can receive from an MQTT QoS 0 queue. - ok = rabbit_ct_broker_helpers:enable_feature_flag(Config, rabbit_mqtt_qos0_queue), amqp_mqtt(0, Config). %% Send messages with different AMQP body sections and diff --git a/deps/rabbitmq_mqtt/test/proxy_protocol_SUITE.erl b/deps/rabbitmq_mqtt/test/proxy_protocol_SUITE.erl index 0224a18ac5f3..3a84150b0b27 100644 --- a/deps/rabbitmq_mqtt/test/proxy_protocol_SUITE.erl +++ b/deps/rabbitmq_mqtt/test/proxy_protocol_SUITE.erl @@ -57,9 +57,10 @@ end_per_suite(Config) -> rabbit_ct_broker_helpers:teardown_steps()). init_per_group(Group, Config) -> - Config1 = rabbit_ct_helpers:set_config(Config, {mqtt_version, Group}), - util:maybe_skip_v5(Config1). -end_per_group(_, Config) -> Config. + rabbit_ct_helpers:set_config(Config, {mqtt_version, Group}). + +end_per_group(_Group, Config) -> + Config. init_per_testcase(Testcase, Config) -> rabbit_ct_helpers:testcase_started(Config, Testcase). diff --git a/deps/rabbitmq_mqtt/test/reader_SUITE.erl b/deps/rabbitmq_mqtt/test/reader_SUITE.erl index 59abae0fc149..1c4fa1331980 100644 --- a/deps/rabbitmq_mqtt/test/reader_SUITE.erl +++ b/deps/rabbitmq_mqtt/test/reader_SUITE.erl @@ -63,13 +63,11 @@ merge_app_env(Config) -> init_per_suite(Config) -> rabbit_ct_helpers:log_environment(), Config1 = rabbit_ct_helpers:set_config(Config, {rmq_nodename_suffix, ?MODULE}), - Config2 = rabbit_ct_helpers:run_setup_steps( - Config1, - [fun merge_app_env/1] ++ - rabbit_ct_broker_helpers:setup_steps() ++ - rabbit_ct_client_helpers:setup_steps()), - ok = rabbit_ct_broker_helpers:enable_feature_flag(Config2, mqtt_v5), - Config2. + rabbit_ct_helpers:run_setup_steps( + Config1, + [fun merge_app_env/1] ++ + rabbit_ct_broker_helpers:setup_steps() ++ + rabbit_ct_client_helpers:setup_steps()). end_per_suite(Config) -> rabbit_ct_helpers:run_teardown_steps(Config, @@ -265,8 +263,6 @@ rabbit_mqtt_qos0_queue_overflow(Config) -> #{messages_dead_lettered_maxlen_total := NumDeadLettered} } = rabbit_ct_broker_helpers:rpc(Config, rabbit_global_counters, overview, []), - ok = rabbit_ct_broker_helpers:enable_feature_flag(Config, QType), - Topic = atom_to_binary(?FUNCTION_NAME), Msg = binary:copy(<<"x">>, 4000), NumMsgs = 10_000, diff --git a/deps/rabbitmq_mqtt/test/retainer_SUITE.erl b/deps/rabbitmq_mqtt/test/retainer_SUITE.erl index 0930b7768a06..a69df2e6e2aa 100644 --- a/deps/rabbitmq_mqtt/test/retainer_SUITE.erl +++ b/deps/rabbitmq_mqtt/test/retainer_SUITE.erl @@ -63,7 +63,7 @@ init_per_group(G, Config) rabbit_ct_helpers:set_config(Config, {mqtt_version, G}); init_per_group(Group, Config0) -> Suffix = rabbit_ct_helpers:testcase_absname(Config0, "", "-"), - Config1 = rabbit_ct_helpers:set_config( + Config = rabbit_ct_helpers:set_config( Config0, {rmq_nodename_suffix, Suffix}), Mod = list_to_atom("rabbit_mqtt_retained_msg_store_" ++ atom_to_list(Group)), Env = [{rabbitmq_mqtt, [{retained_message_store, Mod}]}, @@ -73,13 +73,11 @@ init_per_group(Group, Config0) -> {default_vhost, "/"}, {default_permissions, [".*", ".*", ".*"]} ]}], - Config = rabbit_ct_helpers:run_setup_steps( - Config1, - [fun(Conf) -> rabbit_ct_helpers:merge_app_env(Conf, Env) end] ++ - rabbit_ct_broker_helpers:setup_steps() ++ - rabbit_ct_client_helpers:setup_steps()), - ok = rabbit_ct_broker_helpers:enable_feature_flag(Config, mqtt_v5), - Config. + rabbit_ct_helpers:run_setup_steps( + Config, + [fun(Conf) -> rabbit_ct_helpers:merge_app_env(Conf, Env) end] ++ + rabbit_ct_broker_helpers:setup_steps() ++ + rabbit_ct_client_helpers:setup_steps()). end_per_group(G, Config) when G =:= v4; diff --git a/deps/rabbitmq_mqtt/test/shared_SUITE.erl b/deps/rabbitmq_mqtt/test/shared_SUITE.erl index ecd9681adb3b..a63e1a83ffe9 100644 --- a/deps/rabbitmq_mqtt/test/shared_SUITE.erl +++ b/deps/rabbitmq_mqtt/test/shared_SUITE.erl @@ -27,8 +27,7 @@ rpc_all/4, get_node_config/3, drain_node/2, - revive_node/2, - is_feature_flag_enabled/2 + revive_node/2 ]). -import(rabbit_ct_helpers, [eventually/3, @@ -174,12 +173,11 @@ init_per_group(mqtt, Config) -> init_per_group(web_mqtt, Config) -> rabbit_ct_helpers:set_config(Config, {websocket, true}); -init_per_group(Group, Config0) +init_per_group(Group, Config) when Group =:= v3; Group =:= v4; Group =:= v5 -> - Config = rabbit_ct_helpers:set_config(Config0, {mqtt_version, Group}), - util:maybe_skip_v5(Config); + rabbit_ct_helpers:set_config(Config, {mqtt_version, Group}); init_per_group(Group, Config0) -> Nodes = case Group of @@ -209,7 +207,7 @@ end_per_group(_, Config) -> init_per_testcase(T, Config) when T =:= management_plugin_connection; T =:= management_plugin_enable -> - ok = inets:start(), + inets:start(), init_per_testcase0(T, Config); init_per_testcase(Testcase, Config) -> init_per_testcase0(Testcase, Config). @@ -578,27 +576,13 @@ events(Config) -> QueueNameBin = <<"mqtt-subscription-", ClientId/binary, "qos0">>, QueueName = {resource, <<"/">>, queue, QueueNameBin}, - [E2, E3 | E4] = get_events(Server), - QueueType = case is_feature_flag_enabled(Config, rabbit_mqtt_qos0_queue) of - true -> - ?assertEqual([], E4), - rabbit_mqtt_qos0_queue; - false -> - [ConsumerCreated] = E4, - assert_event_type(consumer_created, ConsumerCreated), - assert_event_prop([{queue, QueueName}, - {ack_required, false}, - {exclusive, false}, - {arguments, []}], - ConsumerCreated), - rabbit_classic_queue - end, + [E2, E3] = get_events(Server), assert_event_type(queue_created, E2), assert_event_prop([{name, QueueName}, {durable, true}, {auto_delete, false}, {exclusive, true}, - {type, QueueType}, + {type, rabbit_mqtt_qos0_queue}, {arguments, []}], E2), assert_event_type(binding_created, E3), @@ -617,28 +601,18 @@ events(Config) -> {ok, _, _} = emqtt:unsubscribe(C, MqttTopic), - [E5] = get_events(Server), - assert_event_type(binding_deleted, E5), + [E4] = get_events(Server), + assert_event_type(binding_deleted, E4), ok = emqtt:disconnect(C), - [E6, E7 | E8] = get_events(Server), - assert_event_type(connection_closed, E6), - ?assertEqual(E1#event.props, E6#event.props, + [E5, E6] = get_events(Server), + assert_event_type(connection_closed, E5), + ?assertEqual(E1#event.props, E5#event.props, "connection_closed event props should match connection_created event props. " "See https://github.com/rabbitmq/rabbitmq-server/discussions/6331"), - - case is_feature_flag_enabled(Config, rabbit_mqtt_qos0_queue) of - true -> - assert_event_type(queue_deleted, E7), - assert_event_prop({name, QueueName}, E7); - false -> - assert_event_type(consumer_deleted, E7), - assert_event_prop({queue, QueueName}, E7), - [QueueDeleted] = E8, - assert_event_type(queue_deleted, QueueDeleted), - assert_event_prop({name, QueueName}, QueueDeleted) - end, + assert_event_type(queue_deleted, E6), + assert_event_prop({name, QueueName}, E6), ok = gen_event:delete_handler({rabbit_event, Server}, event_recorder, []). @@ -681,38 +655,24 @@ global_counters(Config) -> messages_unroutable_dropped_total => 1, messages_unroutable_returned_total => 1}, get_global_counters(Config, ProtoVer)), - - case is_feature_flag_enabled(Config, rabbit_mqtt_qos0_queue) of - true -> - ?assertEqual(#{messages_delivered_total => 2, - messages_acknowledged_total => 1, - messages_delivered_consume_auto_ack_total => 1, - messages_delivered_consume_manual_ack_total => 1, - messages_delivered_get_auto_ack_total => 0, - messages_delivered_get_manual_ack_total => 0, - messages_get_empty_total => 0, - messages_redelivered_total => 0}, - get_global_counters(Config, ProtoVer, 0, [{queue_type, rabbit_classic_queue}])), - ?assertEqual(#{messages_delivered_total => 1, - messages_acknowledged_total => 0, - messages_delivered_consume_auto_ack_total => 1, - messages_delivered_consume_manual_ack_total => 0, - messages_delivered_get_auto_ack_total => 0, - messages_delivered_get_manual_ack_total => 0, - messages_get_empty_total => 0, - messages_redelivered_total => 0}, - get_global_counters(Config, ProtoVer, 0, [{queue_type, rabbit_mqtt_qos0_queue}])); - false -> - ?assertEqual(#{messages_delivered_total => 3, - messages_acknowledged_total => 1, - messages_delivered_consume_auto_ack_total => 2, - messages_delivered_consume_manual_ack_total => 1, - messages_delivered_get_auto_ack_total => 0, - messages_delivered_get_manual_ack_total => 0, - messages_get_empty_total => 0, - messages_redelivered_total => 0}, - get_global_counters(Config, ProtoVer, 0, [{queue_type, rabbit_classic_queue}])) - end, + ?assertEqual(#{messages_delivered_total => 2, + messages_acknowledged_total => 1, + messages_delivered_consume_auto_ack_total => 1, + messages_delivered_consume_manual_ack_total => 1, + messages_delivered_get_auto_ack_total => 0, + messages_delivered_get_manual_ack_total => 0, + messages_get_empty_total => 0, + messages_redelivered_total => 0}, + get_global_counters(Config, ProtoVer, 0, [{queue_type, rabbit_classic_queue}])), + ?assertEqual(#{messages_delivered_total => 1, + messages_acknowledged_total => 0, + messages_delivered_consume_auto_ack_total => 1, + messages_delivered_consume_manual_ack_total => 0, + messages_delivered_get_auto_ack_total => 0, + messages_delivered_get_manual_ack_total => 0, + messages_get_empty_total => 0, + messages_redelivered_total => 0}, + get_global_counters(Config, ProtoVer, 0, [{queue_type, rabbit_mqtt_qos0_queue}])), {ok, _, _} = emqtt:unsubscribe(C, Topic1), ?assertEqual(1, maps:get(consumers, get_global_counters(Config, ProtoVer))), @@ -1255,13 +1215,7 @@ cli_list_queues(Config) -> "type", "name", "state", "durable", "auto_delete", "arguments", "pid", "owner_pid", "messages", "exclusive_consumer_tag" ]), - ExpectedQueueType = case is_feature_flag_enabled(Config, rabbit_mqtt_qos0_queue) of - true -> - <<"MQTT QoS 0">>; - false -> - <<"classic">> - end, - ?assertMatch([[ExpectedQueueType, <<"mqtt-subscription-cli_list_queuesqos0">>, + ?assertMatch([[<<"MQTT QoS 0">>, <<"mqtt-subscription-cli_list_queuesqos0">>, <<"running">>, <<"true">>, <<"false">>, <<"[]">>, _, _, <<"0">>, <<"">>]], Qs), @@ -1273,11 +1227,6 @@ cli_list_queues(Config) -> ok = emqtt:disconnect(C). maintenance(Config) -> - %% When either file rabbit_mqtt_collector changes or different OTP versions - %% are used for compilation, the rabbit_mqtt_collector module version will - %% change and cause a bad fun error when executing ra:leader_query/2 remotely. - ok = rabbit_ct_broker_helpers:enable_feature_flag(Config, delete_ra_cluster_mqtt_node), - C0 = connect(<<"client-0">>, Config, 0, []), C1a = connect(<<"client-1a">>, Config, 1, []), C1b = connect(<<"client-1b">>, Config, 1, []), @@ -1367,11 +1316,6 @@ keepalive_turned_off(Config) -> ok = emqtt:disconnect(C). duplicate_client_id(Config) -> - %% When either file rabbit_mqtt_collector changes or different OTP versions - %% are used for compilation, the rabbit_mqtt_collector module version will - %% change and cause a bad fun error when executing ra:leader_query/2 remotely. - ok = rabbit_ct_broker_helpers:enable_feature_flag(Config, delete_ra_cluster_mqtt_node), - [Server1, Server2, _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), %% Test session takeover by both new and old node in mixed version clusters. ClientId1 = <<"c1">>, @@ -1509,14 +1453,8 @@ clean_session_disconnect_client(Config) -> {ok, _, _} = emqtt:subscribe(C, <<"topic1">>, qos1), QsQos0 = rpc(Config, rabbit_amqqueue, list_by_type, [rabbit_mqtt_qos0_queue]), QsClassic = rpc(Config, rabbit_amqqueue, list_by_type, [rabbit_classic_queue]), - case is_feature_flag_enabled(Config, rabbit_mqtt_qos0_queue) of - true -> - ?assertEqual(1, length(QsQos0)), - ?assertEqual(1, length(QsClassic)); - false -> - ?assertEqual(0, length(QsQos0)), - ?assertEqual(2, length(QsClassic)) - end, + ?assertEqual(1, length(QsQos0)), + ?assertEqual(1, length(QsClassic)), ok = emqtt:disconnect(C), %% After terminating a clean session, we expect any session state to be cleaned up on the server. @@ -1536,14 +1474,8 @@ clean_session_node_down(NodeDown, Config) -> {ok, _, _} = emqtt:subscribe(C, <<"topic1">>, qos1), QsQos0 = rpc(Config, rabbit_amqqueue, list_by_type, [rabbit_mqtt_qos0_queue]), QsClassic = rpc(Config, rabbit_amqqueue, list_by_type, [rabbit_classic_queue]), - case is_feature_flag_enabled(Config, rabbit_mqtt_qos0_queue) of - true -> - ?assertEqual(1, length(QsQos0)), - ?assertEqual(1, length(QsClassic)); - false -> - ?assertEqual(0, length(QsQos0)), - ?assertEqual(2, length(QsClassic)) - end, + ?assertEqual(1, length(QsQos0)), + ?assertEqual(1, length(QsClassic)), ?assertEqual(2, rpc(Config, rabbit_amqqueue, count, [])), unlink(C), diff --git a/deps/rabbitmq_mqtt/test/util.erl b/deps/rabbitmq_mqtt/test/util.erl index df600459c173..160235e158e3 100644 --- a/deps/rabbitmq_mqtt/test/util.erl +++ b/deps/rabbitmq_mqtt/test/util.erl @@ -6,7 +6,6 @@ -include_lib("eunit/include/eunit.hrl"). -export([all_connection_pids/1, - all_connection_pids/2, publish_qos1_timeout/4, sync_publish_result/3, get_global_counters/1, @@ -25,23 +24,13 @@ assert_message_expiry_interval/2, await_exit/1, await_exit/2, - maybe_skip_v5/1, non_clean_sess_opts/0 ]). all_connection_pids(Config) -> - all_connection_pids(0, Config). - -all_connection_pids(Node, Config) -> - case rabbit_ct_broker_helpers:rpc( - Config, Node, rabbit_feature_flags, is_enabled, [delete_ra_cluster_mqtt_node]) of - true -> - Nodes = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), - Result = erpc:multicall(Nodes, rabbit_mqtt, local_connection_pids, [], 5000), - lists:append([Pids || {ok, Pids} <- Result]); - false -> - rabbit_ct_broker_helpers:rpc(Config, Node, rabbit_mqtt_collector, list_pids, []) - end. + Nodes = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + Result = erpc:multicall(Nodes, rabbit_mqtt, local_connection_pids, [], 5000), + lists:append([Pids || {ok, Pids} <- Result]). publish_qos1_timeout(Client, Topic, Payload, Timeout) -> Mref = erlang:monitor(process, Client), @@ -141,21 +130,6 @@ await_exit(Pid, Reason) -> 20_000 -> ct:fail({missing_exit, Pid}) end. -maybe_skip_v5({skip, _Reason} = Skip) -> - %% Mixed-version can be skipped as `khepri_db` - %% is not supported - Skip; -maybe_skip_v5(Config) -> - case ?config(mqtt_version, Config) of - v5 -> - case rabbit_ct_broker_helpers:enable_feature_flag(Config, mqtt_v5) of - ok -> Config; - {skip, _} = Skip -> Skip - end; - _ -> - Config - end. - %% "CleanStart=0 and SessionExpiry=0xFFFFFFFF (UINT_MAX) for %% MQTT 5.0 would provide the same as CleanSession=0 for 3.1.1." %% https://issues.oasis-open.org/projects/MQTT/issues/MQTT-538 diff --git a/deps/rabbitmq_mqtt/test/v5_SUITE.erl b/deps/rabbitmq_mqtt/test/v5_SUITE.erl index 309e221aabe2..475b9450af9a 100644 --- a/deps/rabbitmq_mqtt/test/v5_SUITE.erl +++ b/deps/rabbitmq_mqtt/test/v5_SUITE.erl @@ -167,27 +167,13 @@ init_per_group(Group, Config0) -> [{mqtt_version, v5}, {rmq_nodes_count, Nodes}, {rmq_nodename_suffix, Suffix}]), - Config2 = rabbit_ct_helpers:merge_app_env( - Config1, - {rabbit, [{quorum_tick_interval, 200}]}), - Config = rabbit_ct_helpers:run_steps( - Config2, - rabbit_ct_broker_helpers:setup_steps() ++ - rabbit_ct_client_helpers:setup_steps()), - %% Mixed-version is skipped as `khepri_db` - %% is not supported - case Config of - {skip, _Reason} = Skip -> - Skip; - _ -> - case Group of - cluster_size_1 -> - ok = rabbit_ct_broker_helpers:enable_feature_flag(Config, mqtt_v5), - Config; - cluster_size_3 -> - util:maybe_skip_v5(Config) - end - end. + Config = rabbit_ct_helpers:merge_app_env( + Config1, + {rabbit, [{quorum_tick_interval, 200}]}), + rabbit_ct_helpers:run_steps( + Config, + rabbit_ct_broker_helpers:setup_steps() ++ + rabbit_ct_client_helpers:setup_steps()). end_per_group(G, Config) when G =:= cluster_size_1; diff --git a/deps/rabbitmq_web_mqtt/src/rabbit_web_mqtt_handler.erl b/deps/rabbitmq_web_mqtt/src/rabbit_web_mqtt_handler.erl index 48d1fae89feb..176a29e86842 100644 --- a/deps/rabbitmq_web_mqtt/src/rabbit_web_mqtt_handler.erl +++ b/deps/rabbitmq_web_mqtt/src/rabbit_web_mqtt_handler.erl @@ -168,10 +168,6 @@ websocket_info({'$gen_cast', QueueEvent = {queue_event, _, _}}, [State#state.conn_name, Reason]), stop(State#state{proc_state = PState}) end; -websocket_info({'$gen_cast', duplicate_id}, State) -> - %% Delete this backward compatibility clause when feature flag - %% delete_ra_cluster_mqtt_node becomes required. - websocket_info({'$gen_cast', {duplicate_id, true}}, State); websocket_info({'$gen_cast', {duplicate_id, SendWill}}, State = #state{proc_state = ProcState, conn_name = ConnName}) -> @@ -221,10 +217,6 @@ websocket_info({keepalive, Req}, State = #state{proc_state = ProcState, end; websocket_info(emit_stats, State) -> {[], emit_stats(State), hibernate}; -websocket_info({ra_event, _From, Evt}, - #state{proc_state = PState0} = State) -> - PState = rabbit_mqtt_processor:handle_ra_event(Evt, PState0), - {[], State#state{proc_state = PState}, hibernate}; websocket_info({{'DOWN', _QName}, _MRef, process, _Pid, _Reason} = Evt, State = #state{proc_state = PState0}) -> case rabbit_mqtt_processor:handle_down(Evt, PState0) of diff --git a/deps/rabbitmq_web_mqtt/test/command_SUITE.erl b/deps/rabbitmq_web_mqtt/test/command_SUITE.erl index 16a2c8117fc5..c526d8c4f217 100644 --- a/deps/rabbitmq_web_mqtt/test/command_SUITE.erl +++ b/deps/rabbitmq_web_mqtt/test/command_SUITE.erl @@ -55,12 +55,8 @@ end_per_suite(Config) -> init_per_group(unit, Config) -> Config; -init_per_group(v5 = V5, Config0) -> - Config = rabbit_ct_helpers:set_config(Config0, {mqtt_version, V5}), - case rabbit_ct_broker_helpers:enable_feature_flag(Config, mqtt_v5) of - ok -> Config; - {skip, _} = Skip -> Skip - end. +init_per_group(v5 = V5, Config) -> + rabbit_ct_helpers:set_config(Config, {mqtt_version, V5}). end_per_group(_, Config) -> Config. diff --git a/moduleindex.yaml b/moduleindex.yaml index e847db8bdbb6..8209c0784c4d 100755 --- a/moduleindex.yaml +++ b/moduleindex.yaml @@ -1018,14 +1018,9 @@ rabbitmq_management_agent: - rabbit_mgmt_metrics_gc - rabbit_mgmt_storage rabbitmq_mqtt: -- Elixir.RabbitMQ.CLI.Ctl.Commands.DecommissionMqttNodeCommand - Elixir.RabbitMQ.CLI.Ctl.Commands.ListMqttConnectionsCommand - mc_mqtt -- mqtt_machine -- mqtt_machine_v0 -- mqtt_node - rabbit_mqtt -- rabbit_mqtt_collector - rabbit_mqtt_confirms - rabbit_mqtt_ff - rabbit_mqtt_internal_event_handler