Skip to content

Commit ec4c4a4

Browse files
committed
STOMP: add support for consumer priorities
x-priority header allows to specify the consumer priority
1 parent 194d4ba commit ec4c4a4

File tree

3 files changed

+48
-1
lines changed

3 files changed

+48
-1
lines changed

deps/rabbitmq_stomp/include/rabbit_stomp_headers.hrl

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
-define(HEADER_X_STREAM_FILTER, "x-stream-filter").
3131
-define(HEADER_X_STREAM_MATCH_UNFILTERED, "x-stream-match-unfiltered").
3232
-define(HEADER_PRIORITY, "priority").
33+
-define(HEADER_X_PRIORITY, "x-priority").
3334
-define(HEADER_RECEIPT, "receipt").
3435
-define(HEADER_REDELIVERED, "redelivered").
3536
-define(HEADER_REPLY_TO, "reply-to").

deps/rabbitmq_stomp/src/rabbit_stomp_processor.erl

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -718,7 +718,8 @@ do_subscribe(Destination, DestHdr, Frame,
718718
subscribe_arguments(Frame) ->
719719
subscribe_arguments([?HEADER_X_STREAM_OFFSET,
720720
?HEADER_X_STREAM_FILTER,
721-
?HEADER_X_STREAM_MATCH_UNFILTERED], Frame, []).
721+
?HEADER_X_STREAM_MATCH_UNFILTERED,
722+
?HEADER_X_PRIORITY], Frame, []).
722723

723724
subscribe_arguments([], _Frame , Acc) ->
724725
Acc;
@@ -749,6 +750,14 @@ subscribe_argument(?HEADER_X_STREAM_MATCH_UNFILTERED, Frame, Acc) ->
749750
[{list_to_binary(?HEADER_X_STREAM_MATCH_UNFILTERED), bool, MU}] ++ Acc;
750751
not_found ->
751752
Acc
753+
end;
754+
subscribe_argument(?HEADER_X_PRIORITY, Frame, Acc) ->
755+
Priority = rabbit_stomp_frame:integer_header(Frame, ?HEADER_X_PRIORITY),
756+
case Priority of
757+
{ok, P} ->
758+
[{list_to_binary(?HEADER_X_PRIORITY), byte, P}] ++ Acc;
759+
not_found ->
760+
Acc
752761
end.
753762

754763
check_subscription_access(Destination = {topic, _Topic},

deps/rabbitmq_stomp/test/system_SUITE.erl

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ groups() ->
2828
publish_unauthorized_error,
2929
subscribe_error,
3030
subscribe,
31+
subscribe_with_x_priority,
3132
unsubscribe_ack,
3233
subscribe_ack,
3334
send,
@@ -161,6 +162,42 @@ subscribe(Config) ->
161162
{ok, _Client2, _, [<<"hello">>]} = stomp_receive(Client1, "MESSAGE"),
162163
ok.
163164

165+
subscribe_with_x_priority(Config) ->
166+
Version = ?config(version, Config),
167+
StompPort = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_stomp),
168+
Channel = ?config(amqp_channel, Config),
169+
ClientA = ?config(stomp_client, Config),
170+
#'queue.declare_ok'{} =
171+
amqp_channel:call(Channel, #'queue.declare'{queue = ?QUEUE,
172+
durable = true,
173+
arguments = [{<<"x-queue-type">>, longstr, <<"quorum">>},
174+
{<<"x-single-active-consumer">>, bool, true}
175+
]}),
176+
177+
%% subscribe and wait for receipt
178+
rabbit_stomp_client:send(
179+
ClientA, "SUBSCRIBE", [{"destination", ?DESTINATION}, {"receipt", "foo"}]),
180+
{ok, _ClientA1, _, _} = stomp_receive(ClientA, "RECEIPT"),
181+
182+
%% subscribe with a higher priority and wait for receipt
183+
{ok, ClientB} = rabbit_stomp_client:connect(Version, StompPort),
184+
rabbit_stomp_client:send(
185+
ClientB, "SUBSCRIBE", [{"destination", ?DESTINATION},
186+
{"receipt", "foo"},
187+
{"x-priority", 10}
188+
]),
189+
{ok, ClientB1, _, _} = stomp_receive(ClientB, "RECEIPT"),
190+
191+
%% send from amqp
192+
Method = #'basic.publish'{exchange = <<"">>, routing_key = ?QUEUE},
193+
194+
amqp_channel:call(Channel, Method, #amqp_msg{props = #'P_basic'{},
195+
payload = <<"hello">>}),
196+
197+
%% ClientB should receive the message since it has a higher priority
198+
{ok, _ClientB2, _, [<<"hello">>]} = stomp_receive(ClientB1, "MESSAGE"),
199+
ok.
200+
164201
unsubscribe_ack(Config) ->
165202
Channel = ?config(amqp_channel, Config),
166203
Client = ?config(stomp_client, Config),

0 commit comments

Comments
 (0)