Skip to content
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions deps/rabbit/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ define PROJECT_ENV
{vhost_restart_strategy, continue},
%% {global, prefetch count}
{default_consumer_prefetch, {false, 0}},
{classic_queue_consumer_unsent_message_limit, 200},
%% interval at which the channel can perform periodic actions
{channel_tick_interval, 60000},
%% Default max message size is 16 MB
Expand Down
10 changes: 10 additions & 0 deletions deps/rabbit/priv/schema/rabbit.schema
Original file line number Diff line number Diff line change
Expand Up @@ -1591,6 +1591,16 @@ end}.
{validators, ["non_zero_positive_integer"]}
]}.

%% Unsent Message Limit for classic queue consumer channels upon which
%% the channel blocked.
%%
%% {classic_queue_consumer_unsent_message_limit, 200},

{mapping, "classic_queue.consumer_unsent_message_limit", "rabbit.classic_queue_consumer_unsent_message_limit", [
{datatype, integer},
{validators, ["non_zero_positive_integer"]}
]}.

%% Product name & version overrides.

{mapping, "product.name", "rabbit.product_name", [
Expand Down
4 changes: 3 additions & 1 deletion deps/rabbit/src/rabbit_queue_consumers.erl
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@

-define(QUEUE, lqueue).

-define(UNSENT_MESSAGE_LIMIT, 200).
-define(UNSENT_MESSAGE_LIMIT, persistent_term:get(unsent_message_limit)).

%% Utilisation average calculations are all in μs.
-define(USE_AVG_HALF_LIFE, 1000000.0).
Expand Down Expand Up @@ -168,6 +168,8 @@ add(ChPid, CTag, NoAck, LimiterPid, LimiterActive,
delivery_count = InitialDeliveryCount}}}
end,
update_ch_record(C),
persistent_term:put(unsent_message_limit,
application:get_env(rabbit, classic_queue_consumer_unsent_message_limit, 200)),
Copy link
Member

@ansd ansd Jul 29, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not very important, but would it be better to place this into rabbit_queue_consumers:new/0 instead because that's easier to reason about and called less often?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we define the default of 200 at only one place?
Either here or in the Makefile.
I know that other RabbitMQ settings also define a default value 2 times (1 in the Makefile and 1 in the code as done here), but I never understood why that's done.
FWIW, I prefer this setting to not be exposed in rabbitmq.conf and only to bet set in advanced.config. This way we can get rid of the Makefile changes entirely including the Makefile default setting.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that commonly set configuration goes into rabbitmq.conf which requires writing Cuttlefish translations.
Rarely or advanced config goes - as the name suggests - into advanced.config file.
This setting, given it has never been configurable, is for me a rare / advanced configuration.
Other internal credit flow settings such as msg_store_credit_disc_bound or credit_flow_default_credit are not exposed either in rabbitmq.conf.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ansd using rabbit_queue_consumers:new/0 wont work here. We want this to be dynamic and configurable in real time, when new consumers attach to a queue. Which is why we're adding to rabbit_queue_consumers:add/9. This suggestion breaks the purpose of the change - we want capability to "accelerate" consumers on existing queues.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ansd credit-flow configs use advanced.config solely because they're configured as native Erlang terms/tuples. This config is just a simple integer, hence must be set in rabbitmq.conf. Which is much easier for operators to update/manage this config. Not sure I get the value/advantage of making configuring this unnecessarily more complex for users?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We want this to be dynamic and configurable in real time, when new consumers attach to a queue.

When exactly would you increase or decrease this value dynamically at runtime globally for all consumers running on this node?

Consumer = #consumer{tag = CTag,
ack_required = not NoAck,
prefetch = parse_prefetch_count(ModeOrPrefetch),
Expand Down
7 changes: 7 additions & 0 deletions deps/rabbit/test/config_schema_SUITE_data/rabbit.snippets
Original file line number Diff line number Diff line change
Expand Up @@ -858,6 +858,13 @@ credential_validator.regexp = ^abc\\d+",
]}],
[]},

{rabbit_classic_queue_consumer_unsent_message_limit,
"classic_queue.consumer_unsent_message_limit = 200",
[{rabbit, [
{classic_queue_consumer_unsent_message_limit, 200}
]}],
[]},

{rabbit_msg_store_shutdown_timeout,
"message_store_shutdown_timeout = 600000",
[{rabbit, [
Expand Down