Skip to content

Conversation

markusgust
Copy link
Contributor

@markusgust markusgust commented Sep 26, 2025

Proposed Changes

We observed that when a shovel connection for AMQP091 run into blocked state, pending messages stack up in a list of messages, which leads to the response of rabbitmq_shovel_status:status() can get very large and can cause memory leaks with many shovels (or one shovel using no-ack). It also breaks the management UI for shovel status.

steps to reproduce:

  • create a shovel from cluster-a to cluster-b.
  • simulate memory watermark on cluster-b.
  • shovel a couple messages, navigate to shovel status. should now break with 500 internal error. rabbitmq_shovel_status:status() on the source will show pending as a list of messages.
  • remove memory watermark, see messages flow again.
  • shovel status will still be 500, and rabbitmq_shovel_status:status() will show #{pending => {[],[]},

This PR ensures that the number of pending messages is an integer for all shovel protocols. I have tried confirming if the bug exist in AMQP1.0, however I get other issues then which I am currently investigating.

Types of Changes

What types of changes does your code introduce to this project?
Put an x in the boxes that apply

  • Bug fix (non-breaking change which fixes issue )
  • New feature (non-breaking change which adds functionality)
  • Breaking change (fix or feature that would cause an observable behavior change in existing systems)
  • Documentation improvements (corrections, new content, etc)
  • Cosmetic change (whitespace, formatting, etc)
  • Build system and/or CI

Checklist

Put an x in the boxes that apply.
You can also fill these out after creating the PR.
This is simply a reminder of what we are going to look for before merging your code.

Further Comments

If this is a relatively large or complex change, kick off the discussion by explaining why you chose the solution
you did and what alternatives you considered, etc.

@michaelklishin
Copy link
Collaborator

The core team has no objection to this shovel interface (Erlang behavior) change.

If we consider this a bug fix, it can go into 4.2.0 after the code freeze/RC1 release next week, too.

status(_) ->
running.

pending_count(#{source := #{current := #{unacked_message_q := UAMQ}}}) ->
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pending messages in AMQP0.9.1 are counted in the destination side, thus in local shovels they should be dest -> unconfirmed

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if I see it correctly there is no flow control between a local shovel and a queue so there can be no pending aka buffered messages for local shovels (according to my above definition of pending).

dest -> unconfirmed is only non-zero in case of on-confirm ack-mode. source -> unacked_message_q can be non-empty for on-confirm and on-publish.

%% Destination not yet connected
ignore.

pending_count(#{dest := Dest}) ->
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pending messages in AMQP0.9.1 are counted in the destination side, thus in AMQP1.0 shovels they should be dest -> unacked

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

my understanding is that in case of AMQP 1.0 the dest -> unacked field is only used in on-confirm ack-mode. But there can be pending messages in case of other ack-modes too when there is no more link credit to send the message to the dest. In this case the whole message is buffered in the shovel process. My understanding is that the pending metric only counts the number of buffered ie unsent messages. Unacked additionally counts messages that were sent but not yet acked by the dest.

There is also the metric remaining_unacked, but I'm not sure it is counted correctly in case of on-confirm. It is decremented when the message is sent (the same way as for other ack-modes) and not when the message is accepted/rejected by the dest.

Copy link
Contributor

@gomoripeti gomoripeti left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to sum up my comments I think

  • if the pending metric should be the count of messages buffered in the shovel this PR is correct for AMQP 0.9.1 and 1.0 but should always be zero for local shovels
  • if the pending metric should be the count of messages not yet acked to the source queue this PR is correct for local shovels but needs to be updated for AMQP 0.9.1 and 1.0

I think exposing the count of buffered messages is more relevant as those can take up considerable memory and less visible in source/dest connection and queue metrics.

%% Destination not yet connected
ignore.

pending_count(#{dest := Dest}) ->
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

my understanding is that in case of AMQP 1.0 the dest -> unacked field is only used in on-confirm ack-mode. But there can be pending messages in case of other ack-modes too when there is no more link credit to send the message to the dest. In this case the whole message is buffered in the shovel process. My understanding is that the pending metric only counts the number of buffered ie unsent messages. Unacked additionally counts messages that were sent but not yet acked by the dest.

There is also the metric remaining_unacked, but I'm not sure it is counted correctly in case of on-confirm. It is decremented when the message is sent (the same way as for other ack-modes) and not when the message is accepted/rejected by the dest.

status(_) ->
running.

pending_count(#{source := #{current := #{unacked_message_q := UAMQ}}}) ->
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if I see it correctly there is no flow control between a local shovel and a queue so there can be no pending aka buffered messages for local shovels (according to my above definition of pending).

dest -> unconfirmed is only non-zero in case of on-confirm ack-mode. source -> unacked_message_q can be non-empty for on-confirm and on-publish.

@dcorbacho
Copy link
Contributor

Thanks for the clarification @gomoripeti
If pending messages means the number of messages stored (whole) in the shovel, then the changes for AMQP1.0 and AMQP0.9.1 are correct. Local shovels then must always return 0, as the uamq just stores messages ids and it is not the equivalent to pending messages in other protocols.

Regarding to the remaining_unacked counter, it is correct for all three protocols. It's always decremented when a message is forwarded (regardless of it being later acked/nacked) and it is used to decide when to shutdown an auto-delete shovel (i.e. uses the 'delete-after' feature).

@markusgust markusgust marked this pull request as ready for review September 29, 2025 20:09
@markusgust
Copy link
Contributor Author

thank you @dcorbacho for your help and feedback, based on this I added some changes to what the local shovel returns.

@michaelklishin michaelklishin added this to the 4.3.0 milestone Oct 8, 2025
Copy link
Collaborator

@michaelklishin michaelklishin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Approving per discussion with Diana.

@michaelklishin michaelklishin merged commit 3c4707d into rabbitmq:main Oct 8, 2025
285 checks passed
michaelklishin added a commit that referenced this pull request Oct 8, 2025
Ensures pending counter in rabbit_shovel_status is always an integer (backport #14614)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants