2222
2323-define (QUEUE , lqueue ).
2424
25- -define (UNSENT_MESSAGE_LIMIT , 200 ).
25+ -define (KEY_UNSENT_MESSAGE_LIMIT , classic_queue_consumer_unsent_message_limit ).
26+ -define (DEFAULT_UNSENT_MESSAGE_LIMIT , 200 ).
2627
2728% % Utilisation average calculations are all in μs.
2829-define (USE_AVG_HALF_LIFE , 1000000.0 ).
7273
7374-spec new () -> state ().
7475
75- new () -> # state {consumers = priority_queue :new (),
76- use = {active ,
77- erlang :monotonic_time (micro_seconds ),
78- 1.0 }}.
76+ new () ->
77+ Val = application :get_env (rabbit ,
78+ ? KEY_UNSENT_MESSAGE_LIMIT ,
79+ ? DEFAULT_UNSENT_MESSAGE_LIMIT ),
80+ persistent_term :put (? KEY_UNSENT_MESSAGE_LIMIT , Val ),
81+ # state {consumers = priority_queue :new (),
82+ use = {active ,
83+ erlang :monotonic_time (microsecond ),
84+ 1.0 }}.
7985
8086-spec max_active_priority (state ()) -> integer () | 'infinity' | 'empty' .
8187
@@ -286,7 +292,6 @@ deliver_to_consumer(FetchFun,
286292 E = {ChPid , Consumer = # consumer {tag = CTag }},
287293 QName ) ->
288294 C = # cr {link_states = LinkStates } = lookup_ch (ChPid ),
289- ChBlocked = is_ch_blocked (C ),
290295 case LinkStates of
291296 #{CTag := # link_state {delivery_count = DeliveryCount0 ,
292297 credit = Credit } = LinkState0 } ->
@@ -308,22 +313,24 @@ deliver_to_consumer(FetchFun,
308313 block_consumer (C , E ),
309314 undelivered
310315 end ;
311- _ when ChBlocked ->
312- % % not a link credit consumer, use credit flow
313- block_consumer (C , E ),
314- undelivered ;
315316 _ ->
316317 % % not a link credit consumer, use credit flow
317- case rabbit_limiter :can_send (C # cr .limiter ,
318- Consumer # consumer .ack_required ,
319- CTag ) of
320- {suspend , Limiter } ->
321- block_consumer (C # cr {limiter = Limiter }, E ),
318+ case is_ch_blocked (C ) of
319+ true ->
320+ block_consumer (C , E ),
322321 undelivered ;
323- {continue , Limiter } ->
324- {delivered , deliver_to_consumer (
325- FetchFun , Consumer ,
326- C # cr {limiter = Limiter }, QName )}
322+ false ->
323+ case rabbit_limiter :can_send (C # cr .limiter ,
324+ Consumer # consumer .ack_required ,
325+ CTag ) of
326+ {suspend , Limiter } ->
327+ block_consumer (C # cr {limiter = Limiter }, E ),
328+ undelivered ;
329+ {continue , Limiter } ->
330+ {delivered , deliver_to_consumer (
331+ FetchFun , Consumer ,
332+ C # cr {limiter = Limiter }, QName )}
333+ end
327334 end
328335 end .
329336
@@ -653,7 +660,8 @@ block_consumer(C = #cr{blocked_consumers = Blocked}, QEntry) ->
653660 update_ch_record (C # cr {blocked_consumers = add_consumer (QEntry , Blocked )}).
654661
655662is_ch_blocked (# cr {unsent_message_count = Count , limiter = Limiter }) ->
656- Count >= ? UNSENT_MESSAGE_LIMIT orelse rabbit_limiter :is_suspended (Limiter ).
663+ UnsentMessageLimit = persistent_term :get (? KEY_UNSENT_MESSAGE_LIMIT ),
664+ Count >= UnsentMessageLimit orelse rabbit_limiter :is_suspended (Limiter ).
657665
658666tags (CList ) -> [CTag || {_P , {_ChPid , # consumer {tag = CTag }}} <- CList ].
659667
0 commit comments