diff --git a/deps/rabbitmq_shovel/src/rabbit_amqp091_shovel.erl b/deps/rabbitmq_shovel/src/rabbit_amqp091_shovel.erl index 1729169ac5bf..27f689923e5f 100644 --- a/deps/rabbitmq_shovel/src/rabbit_amqp091_shovel.erl +++ b/deps/rabbitmq_shovel/src/rabbit_amqp091_shovel.erl @@ -33,7 +33,8 @@ ack/3, nack/3, status/1, - forward/3 + forward/3, + pending_count/1 ]). %% Function references should not be stored on the metadata store. @@ -360,6 +361,10 @@ status(#{dest := #{blocked_by := BlockReasons}}) when BlockReasons =/= [] -> status(_) -> running. +pending_count(#{dest := Dest}) -> + Pending = maps:get(pending, Dest, queue:new()), + queue:len(Pending). + add_pending(Elem, State = #{dest := Dest}) -> Pending = maps:get(pending, Dest, queue:new()), State#{dest => Dest#{pending => queue:in(Elem, Pending)}}. diff --git a/deps/rabbitmq_shovel/src/rabbit_amqp10_shovel.erl b/deps/rabbitmq_shovel/src/rabbit_amqp10_shovel.erl index 229c475d7f2d..619d4daee82f 100644 --- a/deps/rabbitmq_shovel/src/rabbit_amqp10_shovel.erl +++ b/deps/rabbitmq_shovel/src/rabbit_amqp10_shovel.erl @@ -32,7 +32,8 @@ ack/3, nack/3, status/1, - forward/3 + forward/3, + pending_count/1 ]). -import(rabbit_misc, [pget/2, pget/3]). @@ -317,6 +318,10 @@ status(_) -> %% Destination not yet connected ignore. +pending_count(#{dest := Dest}) -> + Pending = maps:get(pending, Dest, []), + length(Pending). + -spec forward(Tag :: tag(), Mc :: mc:state(), state()) -> state() | {stop, any()}. forward(_Tag, _Mc, diff --git a/deps/rabbitmq_shovel/src/rabbit_local_shovel.erl b/deps/rabbitmq_shovel/src/rabbit_local_shovel.erl index ac491cb5fbd1..cd23e304db38 100644 --- a/deps/rabbitmq_shovel/src/rabbit_local_shovel.erl +++ b/deps/rabbitmq_shovel/src/rabbit_local_shovel.erl @@ -42,7 +42,8 @@ ack/3, nack/3, forward/3, - status/1 + status/1, + pending_count/1 ]). -export([ @@ -437,6 +438,9 @@ add_routing(Msg0, Dest) -> status(_) -> running. +pending_count(_State) -> + 0. + %% Internal parse_parameter(_, _, none) -> diff --git a/deps/rabbitmq_shovel/src/rabbit_shovel_behaviour.erl b/deps/rabbitmq_shovel/src/rabbit_shovel_behaviour.erl index 929a34c5ccf1..e66d3d2e4d70 100644 --- a/deps/rabbitmq_shovel/src/rabbit_shovel_behaviour.erl +++ b/deps/rabbitmq_shovel/src/rabbit_shovel_behaviour.erl @@ -85,6 +85,7 @@ -callback forward(Tag :: tag(), Msg :: mc:state(), state()) -> state() | {stop, any()}. -callback status(state()) -> rabbit_shovel_status:shovel_status(). +-callback pending_count(state()) -> non_neg_integer(). -spec parse(atom(), binary(), {source | destination, proplists:proplist()}) -> source_config() | dest_config(). @@ -164,12 +165,12 @@ incr_forwarded(State = #{dest := Dest}) -> State#{dest => maps:put(forwarded, maps:get(forwarded, Dest, 0) + 1, Dest)}. -spec metrics(state()) -> rabbit_shovel_status:metrics(). -metrics(_State = #{source := Source, - dest := Dest}) -> +metrics(#{source := Source, + dest := #{module := Mod}} = State) -> #{remaining => maps:get(remaining, Source, unlimited), - remaining_unacked => maps:get(remaining_unacked, Source, 0), - pending => maps:get(pending, Dest, 0), - forwarded => maps:get(forwarded, Dest, 0)}. + remaining_unacked => maps:get(remaining_unacked, Source, 0), + pending => Mod:pending_count(State), + forwarded => maps:get(forwarded, maps:get(dest, State), 0)}. %% Common functions diff --git a/deps/rabbitmq_shovel/test/pending_count_SUITE.erl b/deps/rabbitmq_shovel/test/pending_count_SUITE.erl new file mode 100644 index 000000000000..d84222d25453 --- /dev/null +++ b/deps/rabbitmq_shovel/test/pending_count_SUITE.erl @@ -0,0 +1,144 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. All rights reserved. +%% + +-module(pending_count_SUITE). + +-compile(export_all). + +-include_lib("eunit/include/eunit.hrl"). +-include_lib("rabbit/include/mc.hrl"). +-include("../include/rabbit_shovel.hrl"). + +%%%=================================================================== +%%% Common Test callbacks +%%%=================================================================== + +all() -> + [ + {group, pending_count_tests} + ]. + +groups() -> + [ + {pending_count_tests, [], [ + amqp091_pending_count_empty_queue, + amqp091_pending_count_with_messages, + amqp091_pending_count_after_drain, + amqp10_pending_count_empty_list, + amqp10_pending_count_with_messages, + amqp10_pending_count_after_clear, + local_pending_count_empty_queue, + local_pending_count_after_settle, + behaviour_metrics_includes_pending, + behaviour_pending_count_delegation + ]} + ]. + +init_per_suite(Config) -> + Config. + +end_per_suite(_Config) -> + ok. + +init_per_group(_Group, Config) -> + Config. + +end_per_group(_Group, _Config) -> + ok. + +init_per_testcase(_TestCase, Config) -> + Config. + +end_per_testcase(_TestCase, _Config) -> + meck:unload(), + ok. + +%%%=================================================================== +%%% Test cases +%%%=================================================================== + +%% Test AMQP 0.9.1 pending_count functionality +amqp091_pending_count_empty_queue(_Config) -> + %% Test that pending_count returns 0 when no messages are pending + State = #{dest => #{}}, + ?assertEqual(0, rabbit_amqp091_shovel:pending_count(State)). + +amqp091_pending_count_with_messages(_Config) -> + %% Test that pending_count returns correct count when messages are pending + PendingQueue = queue:from_list([{1, msg1}, {2, msg2}, {3, msg3}]), + State = #{dest => #{pending => PendingQueue}}, + ?assertEqual(3, rabbit_amqp091_shovel:pending_count(State)). + +amqp091_pending_count_after_drain(_Config) -> + %% Test that pending_count returns 0 after messages are drained + EmptyQueue = queue:new(), + State = #{dest => #{pending => EmptyQueue}}, + ?assertEqual(0, rabbit_amqp091_shovel:pending_count(State)). + +%% Test AMQP 1.0 pending_count functionality +amqp10_pending_count_empty_list(_Config) -> + %% Test that pending_count returns 0 when no messages are pending + State = #{dest => #{}}, + ?assertEqual(0, rabbit_amqp10_shovel:pending_count(State)). + +amqp10_pending_count_with_messages(_Config) -> + %% Test that pending_count returns correct count when messages are pending + PendingList = [{1, msg1}, {2, msg2}], + State = #{dest => #{pending => PendingList}}, + ?assertEqual(2, rabbit_amqp10_shovel:pending_count(State)). + +amqp10_pending_count_after_clear(_Config) -> + %% Test that pending_count returns 0 after pending list is cleared + State = #{dest => #{pending => []}}, + ?assertEqual(0, rabbit_amqp10_shovel:pending_count(State)). + +%% Test Local shovel pending_count functionality +local_pending_count_empty_queue(_Config) -> + %% Test that pending_count returns 0 when unacked message queue is empty + EmptyQueue = lqueue:new(), + State = #{source => #{current => #{unacked_message_q => EmptyQueue}}}, + ?assertEqual(0, rabbit_local_shovel:pending_count(State)). + + +local_pending_count_after_settle(_Config) -> + %% Test that pending_count returns 0 when state doesn't contain unacked queue + State = #{source => #{current => #{}}}, + ?assertEqual(0, rabbit_local_shovel:pending_count(State)). + +%% Test behaviour module integration +behaviour_metrics_includes_pending(_Config) -> + %% Mock the destination module's pending_count and status functions + meck:new(rabbit_amqp091_shovel, [passthrough]), + meck:expect(rabbit_amqp091_shovel, pending_count, fun(_) -> 5 end), + meck:expect(rabbit_amqp091_shovel, status, fun(_) -> running end), + + State = #{source => #{remaining => 10, remaining_unacked => 3}, + dest => #{module => rabbit_amqp091_shovel, forwarded => 7}}, + + {_Status, Metrics} = rabbit_shovel_behaviour:status(State), + + ?assertMatch(#{remaining := 10, + remaining_unacked := 3, + pending := 5, + forwarded := 7}, Metrics), + + ?assert(meck:validate(rabbit_amqp091_shovel)). + +behaviour_pending_count_delegation(_Config) -> + %% Test that the behaviour module correctly delegates to the specific implementation + meck:new(rabbit_amqp10_shovel, [passthrough]), + meck:expect(rabbit_amqp10_shovel, pending_count, fun(_State) -> 3 end), + meck:expect(rabbit_amqp10_shovel, status, fun(_State) -> running end), + + State = #{dest => #{module => rabbit_amqp10_shovel}}, + + %% This would be called indirectly through status/1 + {_Status, Metrics} = rabbit_shovel_behaviour:status(#{source => #{}, + dest => maps:get(dest, State)}), + + ?assertEqual(3, maps:get(pending, Metrics)), + ?assert(meck:validate(rabbit_amqp10_shovel)).