Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion deps/rabbitmq_shovel/src/rabbit_amqp091_shovel.erl
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@
ack/3,
nack/3,
status/1,
forward/3
forward/3,
pending_count/1
]).

%% Function references should not be stored on the metadata store.
Expand Down Expand Up @@ -360,6 +361,10 @@ status(#{dest := #{blocked_by := BlockReasons}}) when BlockReasons =/= [] ->
status(_) ->
running.

pending_count(#{dest := Dest}) ->
Pending = maps:get(pending, Dest, queue:new()),
queue:len(Pending).

add_pending(Elem, State = #{dest := Dest}) ->
Pending = maps:get(pending, Dest, queue:new()),
State#{dest => Dest#{pending => queue:in(Elem, Pending)}}.
Expand Down
7 changes: 6 additions & 1 deletion deps/rabbitmq_shovel/src/rabbit_amqp10_shovel.erl
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@
ack/3,
nack/3,
status/1,
forward/3
forward/3,
pending_count/1
]).

-import(rabbit_misc, [pget/2, pget/3]).
Expand Down Expand Up @@ -317,6 +318,10 @@ status(_) ->
%% Destination not yet connected
ignore.

pending_count(#{dest := Dest}) ->
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pending messages in AMQP0.9.1 are counted in the destination side, thus in AMQP1.0 shovels they should be dest -> unacked

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

my understanding is that in case of AMQP 1.0 the dest -> unacked field is only used in on-confirm ack-mode. But there can be pending messages in case of other ack-modes too when there is no more link credit to send the message to the dest. In this case the whole message is buffered in the shovel process. My understanding is that the pending metric only counts the number of buffered ie unsent messages. Unacked additionally counts messages that were sent but not yet acked by the dest.

There is also the metric remaining_unacked, but I'm not sure it is counted correctly in case of on-confirm. It is decremented when the message is sent (the same way as for other ack-modes) and not when the message is accepted/rejected by the dest.

Pending = maps:get(pending, Dest, []),
length(Pending).

-spec forward(Tag :: tag(), Mc :: mc:state(), state()) ->
state() | {stop, any()}.
forward(_Tag, _Mc,
Expand Down
6 changes: 5 additions & 1 deletion deps/rabbitmq_shovel/src/rabbit_local_shovel.erl
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@
ack/3,
nack/3,
forward/3,
status/1
status/1,
pending_count/1
]).

-export([
Expand Down Expand Up @@ -437,6 +438,9 @@ add_routing(Msg0, Dest) ->
status(_) ->
running.

pending_count(_State) ->
0.

%% Internal

parse_parameter(_, _, none) ->
Expand Down
11 changes: 6 additions & 5 deletions deps/rabbitmq_shovel/src/rabbit_shovel_behaviour.erl
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@
-callback forward(Tag :: tag(), Msg :: mc:state(), state()) ->
state() | {stop, any()}.
-callback status(state()) -> rabbit_shovel_status:shovel_status().
-callback pending_count(state()) -> non_neg_integer().

-spec parse(atom(), binary(), {source | destination, proplists:proplist()}) ->
source_config() | dest_config().
Expand Down Expand Up @@ -164,12 +165,12 @@ incr_forwarded(State = #{dest := Dest}) ->
State#{dest => maps:put(forwarded, maps:get(forwarded, Dest, 0) + 1, Dest)}.

-spec metrics(state()) -> rabbit_shovel_status:metrics().
metrics(_State = #{source := Source,
dest := Dest}) ->
metrics(#{source := Source,
dest := #{module := Mod}} = State) ->
#{remaining => maps:get(remaining, Source, unlimited),
remaining_unacked => maps:get(remaining_unacked, Source, 0),
pending => maps:get(pending, Dest, 0),
forwarded => maps:get(forwarded, Dest, 0)}.
remaining_unacked => maps:get(remaining_unacked, Source, 0),
pending => Mod:pending_count(State),
forwarded => maps:get(forwarded, maps:get(dest, State), 0)}.


%% Common functions
Expand Down
144 changes: 144 additions & 0 deletions deps/rabbitmq_shovel/test/pending_count_SUITE.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
%% This Source Code Form is subject to the terms of the Mozilla Public
%% License, v. 2.0. If a copy of the MPL was not distributed with this
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
%%
%% Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
%%

-module(pending_count_SUITE).

-compile(export_all).

-include_lib("eunit/include/eunit.hrl").
-include_lib("rabbit/include/mc.hrl").
-include("../include/rabbit_shovel.hrl").

%%%===================================================================
%%% Common Test callbacks
%%%===================================================================

all() ->
[
{group, pending_count_tests}
].

groups() ->
[
{pending_count_tests, [], [
amqp091_pending_count_empty_queue,
amqp091_pending_count_with_messages,
amqp091_pending_count_after_drain,
amqp10_pending_count_empty_list,
amqp10_pending_count_with_messages,
amqp10_pending_count_after_clear,
local_pending_count_empty_queue,
local_pending_count_after_settle,
behaviour_metrics_includes_pending,
behaviour_pending_count_delegation
]}
].

init_per_suite(Config) ->
Config.

end_per_suite(_Config) ->
ok.

init_per_group(_Group, Config) ->
Config.

end_per_group(_Group, _Config) ->
ok.

init_per_testcase(_TestCase, Config) ->
Config.

end_per_testcase(_TestCase, _Config) ->
meck:unload(),
ok.

%%%===================================================================
%%% Test cases
%%%===================================================================

%% Test AMQP 0.9.1 pending_count functionality
amqp091_pending_count_empty_queue(_Config) ->
%% Test that pending_count returns 0 when no messages are pending
State = #{dest => #{}},
?assertEqual(0, rabbit_amqp091_shovel:pending_count(State)).

amqp091_pending_count_with_messages(_Config) ->
%% Test that pending_count returns correct count when messages are pending
PendingQueue = queue:from_list([{1, msg1}, {2, msg2}, {3, msg3}]),
State = #{dest => #{pending => PendingQueue}},
?assertEqual(3, rabbit_amqp091_shovel:pending_count(State)).

amqp091_pending_count_after_drain(_Config) ->
%% Test that pending_count returns 0 after messages are drained
EmptyQueue = queue:new(),
State = #{dest => #{pending => EmptyQueue}},
?assertEqual(0, rabbit_amqp091_shovel:pending_count(State)).

%% Test AMQP 1.0 pending_count functionality
amqp10_pending_count_empty_list(_Config) ->
%% Test that pending_count returns 0 when no messages are pending
State = #{dest => #{}},
?assertEqual(0, rabbit_amqp10_shovel:pending_count(State)).

amqp10_pending_count_with_messages(_Config) ->
%% Test that pending_count returns correct count when messages are pending
PendingList = [{1, msg1}, {2, msg2}],
State = #{dest => #{pending => PendingList}},
?assertEqual(2, rabbit_amqp10_shovel:pending_count(State)).

amqp10_pending_count_after_clear(_Config) ->
%% Test that pending_count returns 0 after pending list is cleared
State = #{dest => #{pending => []}},
?assertEqual(0, rabbit_amqp10_shovel:pending_count(State)).

%% Test Local shovel pending_count functionality
local_pending_count_empty_queue(_Config) ->
%% Test that pending_count returns 0 when unacked message queue is empty
EmptyQueue = lqueue:new(),
State = #{source => #{current => #{unacked_message_q => EmptyQueue}}},
?assertEqual(0, rabbit_local_shovel:pending_count(State)).


local_pending_count_after_settle(_Config) ->
%% Test that pending_count returns 0 when state doesn't contain unacked queue
State = #{source => #{current => #{}}},
?assertEqual(0, rabbit_local_shovel:pending_count(State)).

%% Test behaviour module integration
behaviour_metrics_includes_pending(_Config) ->
%% Mock the destination module's pending_count and status functions
meck:new(rabbit_amqp091_shovel, [passthrough]),
meck:expect(rabbit_amqp091_shovel, pending_count, fun(_) -> 5 end),
meck:expect(rabbit_amqp091_shovel, status, fun(_) -> running end),

State = #{source => #{remaining => 10, remaining_unacked => 3},
dest => #{module => rabbit_amqp091_shovel, forwarded => 7}},

{_Status, Metrics} = rabbit_shovel_behaviour:status(State),

?assertMatch(#{remaining := 10,
remaining_unacked := 3,
pending := 5,
forwarded := 7}, Metrics),

?assert(meck:validate(rabbit_amqp091_shovel)).

behaviour_pending_count_delegation(_Config) ->
%% Test that the behaviour module correctly delegates to the specific implementation
meck:new(rabbit_amqp10_shovel, [passthrough]),
meck:expect(rabbit_amqp10_shovel, pending_count, fun(_State) -> 3 end),
meck:expect(rabbit_amqp10_shovel, status, fun(_State) -> running end),

State = #{dest => #{module => rabbit_amqp10_shovel}},

%% This would be called indirectly through status/1
{_Status, Metrics} = rabbit_shovel_behaviour:status(#{source => #{},
dest => maps:get(dest, State)}),

?assertEqual(3, maps:get(pending, Metrics)),
?assert(meck:validate(rabbit_amqp10_shovel)).