3030 }}
3131 }).
3232
33- -define (PROTOCOL , amqp10 ).
34- -define (HIBERNATE_AFTER , 6_000 ).
35- -define (CREDIT_REPLY_TIMEOUT , 30_000 ).
33+ % % This is the link credit that we grant to sending clients.
34+ % % We are free to choose whatever we want, sending clients must obey.
35+ % % Default soft limits / credits in deps/rabbit/Makefile are:
36+ % % 32 for quorum queues
37+ % % 256 for streams
38+ % % 400 for classic queues
39+ % % If link target is a queue (rather than an exchange), we could use one of these depending
40+ % % on target queue type. For the time being just use a static value that's something in between.
41+ % % In future, we could dynamically grow (or shrink) the link credit we grant depending on how fast
42+ % % target queue(s) actually confirm messages: see paper "Credit-Based Flow Control for ATM Networks"
43+ % % from 1995, section 4.2 "Static vs. adaptive credit control" for pros and cons.
44+ -define (DEFAULT_MAX_LINK_CREDIT , 128 ).
45+ % % Initial and maximum link credit that we grant to a sending queue.
46+ % % Only when we sent sufficient messages to the writer proc, we will again grant
47+ % % credits to the sending queue. We have this limit in place to ensure that our
48+ % % session proc won't be flooded with messages by the sending queue, especially
49+ % % if we are throttled sending messages to the client either by the writer proc
50+ % % or by remote-incoming window (i.e. session flow control).
51+ -define (DEFAULT_MAX_QUEUE_CREDIT , 256 ).
52+ -define (DEFAULT_MAX_INCOMING_WINDOW , 400 ).
53+ -define (MAX_LINK_CREDIT , persistent_term :get (max_link_credit )).
54+ -define (MAX_MANAGEMENT_LINK_CREDIT , 8 ).
55+ -define (MANAGEMENT_NODE_ADDRESS , <<" /management" >>).
3656-define (UINT_OUTGOING_WINDOW , {uint , ? UINT_MAX }).
37- -define (MAX_INCOMING_WINDOW , 400 ).
3857% % "The next-outgoing-id MAY be initialized to an arbitrary value" [2.5.6]
3958-define (INITIAL_OUTGOING_TRANSFER_ID , ? UINT_MAX - 3 ).
4059% % "Note that, despite its name, the delivery-count is not a count but a
4160% % sequence number initialized at an arbitrary point by the sender." [2.6.7]
4261-define (INITIAL_DELIVERY_COUNT , ? UINT_MAX - 4 ).
4362-define (INITIAL_OUTGOING_DELIVERY_ID , 0 ).
4463-define (DEFAULT_MAX_HANDLE , ? UINT_MAX ).
64+ -define (UINT (N ), {uint , N }).
4565% % [3.4]
4666-define (OUTCOMES , [? V_1_0_SYMBOL_ACCEPTED ,
4767 ? V_1_0_SYMBOL_REJECTED ,
4868 ? V_1_0_SYMBOL_RELEASED ,
4969 ? V_1_0_SYMBOL_MODIFIED ]).
50- -define (MAX_PERMISSION_CACHE_SIZE , 12 ).
51- -define (PROCESS_GROUP_NAME , amqp_sessions ).
52- -define (UINT (N ), {uint , N }).
53- % % This is the link credit that we grant to sending clients.
54- % % We are free to choose whatever we want, sending clients must obey.
55- % % Default soft limits / credits in deps/rabbit/Makefile are:
56- % % 32 for quorum queues
57- % % 256 for streams
58- % % 400 for classic queues
59- % % If link target is a queue (rather than an exchange), we could use one of these depending
60- % % on target queue type. For the time being just use a static value that's something in between.
61- % % In future, we could dynamically grow (or shrink) the link credit we grant depending on how fast
62- % % target queue(s) actually confirm messages: see paper "Credit-Based Flow Control for ATM Networks"
63- % % from 1995, section 4.2 "Static vs. adaptive credit control" for pros and cons.
64- -define (LINK_CREDIT_RCV , 128 ).
65- -define (MANAGEMENT_LINK_CREDIT_RCV , 8 ).
66- -define (MANAGEMENT_NODE_ADDRESS , <<" /management" >>).
6770-define (DEFAULT_EXCHANGE_NAME , <<>>).
68- % % This is the maximum credit we grant to a sending queue.
69- % % Only when we sent sufficient messages to the writer proc, we will again grant credits
70- % % to the sending queue. We have this limit in place to ensure that our session proc won't be flooded
71- % % with messages by the sending queue, especially if we are throttled sending messages to the client
72- % % either by the writer proc or by remote-incoming window (i.e. session flow control).
73- -define (LINK_CREDIT_RCV_FROM_QUEUE_MAX , 256 ).
71+ -define (PROTOCOL , amqp10 ).
72+ -define (PROCESS_GROUP_NAME , amqp_sessions ).
73+ -define (MAX_PERMISSION_CACHE_SIZE , 12 ).
74+ -define (HIBERNATE_AFTER , 6_000 ).
75+ -define (CREDIT_REPLY_TIMEOUT , 30_000 ).
7476
7577-export ([start_link /8 ,
7678 process_frame /2 ,
172174-record (queue_flow_ctl , {
173175 delivery_count :: sequence_no (),
174176 % % We cap the actual credit we grant to the sending queue.
175- % % If client_flow_ctl.credit is larger than LINK_CREDIT_RCV_FROM_QUEUE_MAX ,
177+ % % If client_flow_ctl.credit is larger than max_queue_credit ,
176178 % % we will top up in batches to the sending queue.
177- credit :: 0 .. ? LINK_CREDIT_RCV_FROM_QUEUE_MAX ,
179+ credit :: rabbit_queue_type : credit () ,
178180 drain :: boolean ()
179181 }).
180182
251253 incoming_window_margin = 0 :: non_neg_integer (),
252254 resource_alarms :: sets :set (rabbit_alarm :resource_alarm_source ()),
253255 trace_state :: rabbit_trace :state (),
254- conn_name :: binary ()
256+ conn_name :: binary (),
257+ max_incoming_window :: pos_integer ()
255258 }).
256259
257260-record (state , {
@@ -375,11 +378,22 @@ init({ReaderPid, WriterPid, ChannelNum, MaxFrameSize, User, Vhost, ConnName,
375378 Alarms0 = rabbit_alarm :register (self (), {? MODULE , conserve_resources , []}),
376379 Alarms = sets :from_list (Alarms0 , [{version , 2 }]),
377380
378- NextOutgoingId = ? INITIAL_OUTGOING_TRANSFER_ID ,
381+ MaxLinkCredit = application :get_env (
382+ rabbit , max_link_credit , ? DEFAULT_MAX_LINK_CREDIT ),
383+ MaxQueueCredit = application :get_env (
384+ rabbit , max_queue_credit , ? DEFAULT_MAX_QUEUE_CREDIT ),
385+ MaxIncomingWindow = application :get_env (
386+ rabbit , max_incoming_window , ? DEFAULT_MAX_INCOMING_WINDOW ),
387+ true = is_valid_max (MaxLinkCredit ),
388+ true = is_valid_max (MaxQueueCredit ),
389+ true = is_valid_max (MaxIncomingWindow ),
390+ ok = persistent_term :put (max_link_credit , MaxLinkCredit ),
391+ ok = persistent_term :put (max_queue_credit , MaxQueueCredit ),
379392 IncomingWindow = case sets :is_empty (Alarms ) of
380- true -> ? MAX_INCOMING_WINDOW ;
393+ true -> MaxIncomingWindow ;
381394 false -> 0
382395 end ,
396+ NextOutgoingId = ? INITIAL_OUTGOING_TRANSFER_ID ,
383397
384398 HandleMax = case HandleMax0 of
385399 ? UINT (Max ) -> Max ;
@@ -406,7 +420,8 @@ init({ReaderPid, WriterPid, ChannelNum, MaxFrameSize, User, Vhost, ConnName,
406420 channel_num = ChannelNum ,
407421 resource_alarms = Alarms ,
408422 trace_state = rabbit_trace :init (Vhost ),
409- conn_name = ConnName
423+ conn_name = ConnName ,
424+ max_incoming_window = MaxIncomingWindow
410425 }}}.
411426
412427terminate (_Reason , # state {incoming_links = IncomingLinks ,
@@ -491,7 +506,9 @@ handle_cast({conserve_resources, Alarm, Conserve},
491506 cfg = # cfg {resource_alarms = Alarms0 ,
492507 incoming_window_margin = Margin0 ,
493508 writer_pid = WriterPid ,
494- channel_num = Ch } = Cfg
509+ channel_num = Ch ,
510+ max_incoming_window = MaxIncomingWindow
511+ } = Cfg
495512 } = State0 ) ->
496513 Alarms = case Conserve of
497514 true -> sets :add_element (Alarm , Alarms0 );
@@ -504,11 +521,11 @@ handle_cast({conserve_resources, Alarm, Conserve},
504521 % % Notify the client to not send us any more TRANSFERs. Since we decrase
505522 % % our incoming window dynamically, there might be incoming in-flight
506523 % % TRANSFERs. So, let's be lax and allow for some excess TRANSFERs.
507- {true , 0 , ? MAX_INCOMING_WINDOW };
524+ {true , 0 , MaxIncomingWindow };
508525 {false , true } ->
509526 % % All alarms cleared.
510527 % % Notify the client that it can resume sending us TRANSFERs.
511- {true , ? MAX_INCOMING_WINDOW , 0 };
528+ {true , MaxIncomingWindow , 0 };
512529 _ ->
513530 {false , IncomingWindow0 , Margin0 }
514531 end ,
@@ -882,7 +899,7 @@ handle_control(#'v1_0.attach'{
882899 MaxMessageSize = persistent_term :get (max_message_size ),
883900 Link = # management_link {name = LinkName ,
884901 delivery_count = DeliveryCountInt ,
885- credit = ? MANAGEMENT_LINK_CREDIT_RCV ,
902+ credit = ? MAX_MANAGEMENT_LINK_CREDIT ,
886903 max_message_size = MaxMessageSize },
887904 State = State0 # state {management_link_pairs = Pairs ,
888905 incoming_management_links = maps :put (HandleInt , Link , Links )},
@@ -899,7 +916,7 @@ handle_control(#'v1_0.attach'{
899916 properties = Properties },
900917 Flow = # 'v1_0.flow' {handle = Handle ,
901918 delivery_count = DeliveryCount ,
902- link_credit = ? UINT (? MANAGEMENT_LINK_CREDIT_RCV )},
919+ link_credit = ? UINT (? MAX_MANAGEMENT_LINK_CREDIT )},
903920 reply0 ([Reply , Flow ], State );
904921
905922handle_control (# 'v1_0.attach' {
@@ -978,7 +995,7 @@ handle_control(#'v1_0.attach'{role = ?AMQP_ROLE_SENDER,
978995 routing_key = RoutingKey ,
979996 queue_name_bin = QNameBin ,
980997 delivery_count = DeliveryCountInt ,
981- credit = ? LINK_CREDIT_RCV },
998+ credit = ? MAX_LINK_CREDIT },
982999 _Outcomes = outcomes (Source ),
9831000 Reply = # 'v1_0.attach' {
9841001 name = LinkName ,
@@ -992,7 +1009,7 @@ handle_control(#'v1_0.attach'{role = ?AMQP_ROLE_SENDER,
9921009 max_message_size = {ulong , persistent_term :get (max_message_size )}},
9931010 Flow = # 'v1_0.flow' {handle = Handle ,
9941011 delivery_count = DeliveryCount ,
995- link_credit = ? UINT (? LINK_CREDIT_RCV )},
1012+ link_credit = ? UINT (? MAX_LINK_CREDIT )},
9961013 % %TODO check that handle is not in use for any other open links.
9971014 % %"The handle MUST NOT be used for other open links. An attempt to attach
9981015 % % using a handle which is already associated with a link MUST be responded to
@@ -1790,7 +1807,8 @@ session_flow_control_received_transfer(
17901807 incoming_window = InWindow0 ,
17911808 remote_outgoing_window = RemoteOutgoingWindow ,
17921809 cfg = # cfg {incoming_window_margin = Margin ,
1793- resource_alarms = Alarms }
1810+ resource_alarms = Alarms ,
1811+ max_incoming_window = MaxIncomingWindow }
17941812 } = State ) ->
17951813 InWindow1 = InWindow0 - 1 ,
17961814 case InWindow1 < - Margin of
@@ -1802,12 +1820,12 @@ session_flow_control_received_transfer(
18021820 false ->
18031821 ok
18041822 end ,
1805- {Flows , InWindow } = case InWindow1 =< (? MAX_INCOMING_WINDOW div 2 ) andalso
1823+ {Flows , InWindow } = case InWindow1 =< (MaxIncomingWindow div 2 ) andalso
18061824 sets :is_empty (Alarms ) of
18071825 true ->
18081826 % % We've reached halfway and there are no
18091827 % % disk or memory alarm, open the window.
1810- {[# 'v1_0.flow' {}], ? MAX_INCOMING_WINDOW };
1828+ {[# 'v1_0.flow' {}], MaxIncomingWindow };
18111829 false ->
18121830 {[], InWindow1 }
18131831 end ,
@@ -1864,11 +1882,13 @@ settle_op_from_outcome(Outcome) ->
18641882 " Unrecognised state: ~tp in DISPOSITION" ,
18651883 [Outcome ]).
18661884
1867- -spec flow ({uint , link_handle ()}, sequence_no ()) -> # 'v1_0.flow' {}.
1885+ -spec flow ({uint , link_handle ()}, sequence_no ()) ->
1886+ # 'v1_0.flow' {}.
18681887flow (Handle , DeliveryCount ) ->
1869- flow (Handle , DeliveryCount , ? LINK_CREDIT_RCV ).
1888+ flow (Handle , DeliveryCount , ? MAX_LINK_CREDIT ).
18701889
1871- -spec flow ({uint , link_handle ()}, sequence_no (), non_neg_integer ()) -> # 'v1_0.flow' {}.
1890+ -spec flow ({uint , link_handle ()}, sequence_no (), rabbit_queue_type :credit ()) ->
1891+ # 'v1_0.flow' {}.
18721892flow (Handle , DeliveryCount , LinkCredit ) ->
18731893 # 'v1_0.flow' {handle = Handle ,
18741894 delivery_count = ? UINT (DeliveryCount ),
@@ -2394,7 +2414,7 @@ released(DeliveryId) ->
23942414maybe_grant_link_credit (Credit , DeliveryCount , NumUnconfirmed , Handle ) ->
23952415 case grant_link_credit (Credit , NumUnconfirmed ) of
23962416 true ->
2397- {? LINK_CREDIT_RCV , [flow (Handle , DeliveryCount )]};
2417+ {? MAX_LINK_CREDIT , [flow (Handle , DeliveryCount )]};
23982418 false ->
23992419 {Credit , []}
24002420 end .
@@ -2407,20 +2427,21 @@ maybe_grant_link_credit(
24072427 AccMap ) ->
24082428 case grant_link_credit (Credit , map_size (U )) of
24092429 true ->
2410- {Link # incoming_link {credit = ? LINK_CREDIT_RCV },
2430+ {Link # incoming_link {credit = ? MAX_LINK_CREDIT },
24112431 AccMap #{HandleInt => DeliveryCount }};
24122432 false ->
24132433 {Link , AccMap }
24142434 end .
24152435
24162436grant_link_credit (Credit , NumUnconfirmed ) ->
2417- Credit =< ? LINK_CREDIT_RCV / 2 andalso
2418- NumUnconfirmed < ? LINK_CREDIT_RCV .
2437+ MaxLinkCredit = ? MAX_LINK_CREDIT ,
2438+ Credit =< MaxLinkCredit div 2 andalso
2439+ NumUnconfirmed < MaxLinkCredit .
24192440
24202441maybe_grant_mgmt_link_credit (Credit , DeliveryCount , Handle )
2421- when Credit =< ? MANAGEMENT_LINK_CREDIT_RCV / 2 ->
2422- {? MANAGEMENT_LINK_CREDIT_RCV ,
2423- [flow (Handle , DeliveryCount , ? MANAGEMENT_LINK_CREDIT_RCV )]};
2442+ when Credit =< ? MAX_MANAGEMENT_LINK_CREDIT div 2 ->
2443+ {? MAX_MANAGEMENT_LINK_CREDIT ,
2444+ [flow (Handle , DeliveryCount , ? MAX_MANAGEMENT_LINK_CREDIT )]};
24242445maybe_grant_mgmt_link_credit (Credit , _ , _ ) ->
24252446 {Credit , []}.
24262447
@@ -3406,10 +3427,16 @@ error_not_found(Resource) ->
34063427 condition = ? V_1_0_AMQP_ERROR_NOT_FOUND ,
34073428 description = {utf8 , Description }}.
34083429
3430+ is_valid_max (Val ) ->
3431+ is_integer (Val ) andalso
3432+ Val > 0 andalso
3433+ Val =< ? UINT_MAX .
3434+
34093435-spec cap_credit (rabbit_queue_type :credit ()) ->
3410- 0 .. ? LINK_CREDIT_RCV_FROM_QUEUE_MAX .
3436+ rabbit_queue_type : credit () .
34113437cap_credit (DesiredCredit ) ->
3412- min (DesiredCredit , ? LINK_CREDIT_RCV_FROM_QUEUE_MAX ).
3438+ MaxCredit = persistent_term :get (max_queue_credit ),
3439+ min (DesiredCredit , MaxCredit ).
34133440
34143441ensure_mc_cluster_compat (Mc ) ->
34153442 IsEnabled = rabbit_feature_flags :is_enabled (message_containers_store_amqp_v1 ),
0 commit comments