Skip to content

Conversation

acogoluegnes
Copy link
Contributor

@acogoluegnes acogoluegnes commented Nov 17, 2021

  • new ConsumerUpdate command in the stream protocol to notify consumers of their state (active or not) and to let the active consumer decide where to consume from
  • consumer groups state part of the stream coordinator state
  • consumer group logic in a dedicated coordinator (sub-module of the stream coordinator state machine)
  • monitoring of the consumer connection processes in the coordinator to update the state accordingly when processes crashes
  • new list_stream_consumer_groups and list_stream_group_consumers CLI commands to list the groups of a given virtual host (virtual host, stream, and name of the group) and to list the consumers of a given group (owning connection, state, etc), respectively
  • new active and activity_status fields in the consumer metrics (for management plugin) and list_stream_consumers CLI command
  • new stream_single_active_consumer feature flag
    • must be enabled to accept any single active consumer subscription in the stream plugin network adapter
    • must be enabled to call the single active consumer-related functions in the stream coordinator
  • machine version bump from 2 to 3 in the stream coordinator

Stream Java Client documentation:

References #3753

References rabbitmq/rabbitmq-stream-java-client#46

@acogoluegnes acogoluegnes force-pushed the stream-single-active-consumer branch 3 times, most recently from dec5949 to fc735bd Compare December 9, 2021 09:15
@acogoluegnes acogoluegnes force-pushed the stream-single-active-consumer branch from 425f184 to 3e47849 Compare December 16, 2021 16:11
@acogoluegnes acogoluegnes force-pushed the stream-single-active-consumer branch from 30221d4 to 273ee4d Compare January 3, 2022 10:27
@acogoluegnes acogoluegnes force-pushed the stream-single-active-consumer branch 2 times, most recently from fee80a1 to b7b2311 Compare January 27, 2022 08:41
@acogoluegnes acogoluegnes force-pushed the stream-single-active-consumer branch 3 times, most recently from fd04c1d to 414dead Compare February 3, 2022 07:52
@acogoluegnes
Copy link
Contributor Author

acogoluegnes commented Feb 4, 2022

A/C:

Start the broker

Remove the Docker image if it's already there locally, to make sure to pull the latest image later:

docker rmi pivotalrabbitmq/rabbitmq-stream

Start the Broker:

docker run -it --rm --name rabbitmq -p 5552:5552 -p 5672:5672 -p 15672:15672 -e RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS='-rabbitmq_stream advertised_host localhost' pivotalrabbitmq/rabbitmq-stream

EDIT after merging: the Docker image to use was pivotalrabbitmq/rabbitmq-stream:sac before merging.

Get the Code

NB: requires JDK11+

Get the stream Java client, compile it, get the examples project:

cd /tmp
git clone [email protected]:rabbitmq/rabbitmq-stream-java-client.git
cd rabbitmq-stream-java-client
git checkout single-active-consumer
./mvnw clean install -DskipTests
./mvnw clean package -Dmaven.test.skip -P performance-tool
cd /tmp
git clone https://github.com/acogoluegnes/rabbitmq-stream-single-active-consumer.git
cd rabbitmq-stream-single-active-consumer

Single Active Consumer with 1 Stream

Start the consumer program:

./mvnw -q compile exec:java -Dexec.mainClass=com.rabbitmq.stream.SingleActiveConsumer

The consumer program creates a single-active-consumer stream and starts 3 consumers that subscribe to it. These consumers form a group (they set up the single-active-consumer flag and use the same reference/name). Each consumer has its own connection.

NB: more information on the single active consumer support in the stream Java client documentation

Start publishing with stream-perf-test:

java -jar /tmp/rabbitmq-stream-java-client/target/stream-perf-test.jar --rate 1 -x 1 -y 0 --streams single-active-consumer

(The performance tool output a warning about the creation of the stream, it's normal)

The consumer program should output something for each message:

Created stream single-active-consumer
Starting consumer 0
Starting consumer 1
Starting consumer 2
Consumer 0 receive a message.
Consumer 0 receive a message.
Consumer 0 receive a message.
...

List the stream consumers with rabbitmqctl list_stream_consumers:

docker exec rabbitmq rabbitmqctl list_stream_consumers \
  connection_pid,subscription_id,stream,messages_consumed,offset,offset_lag,active,activity_status
Listing stream consumers ...
┌────────────────┬─────────────────┬────────────────────────┬───────────────────┬────────┬────────────┬────────┬─────────────────┐
│ connection_pid │ subscription_id │ stream                 │ messages_consumed │ offset │ offset_lag │ active │ activity_status │
├────────────────┼─────────────────┼────────────────────────┼───────────────────┼────────┼────────────┼────────┼─────────────────┤
│ <11402.1396.0> │ 0               │ single-active-consumer │ 0                 │ 0      │ 0          │ false  │ waiting         │
├────────────────┼─────────────────┼────────────────────────┼───────────────────┼────────┼────────────┼────────┼─────────────────┤
│ <11402.1384.0> │ 0               │ single-active-consumer │ 10                │ 9      │ 1          │ true   │ single_active   │
├────────────────┼─────────────────┼────────────────────────┼───────────────────┼────────┼────────────┼────────┼─────────────────┤
│ <11402.1408.0> │ 0               │ single-active-consumer │ 0                 │ 0      │ 0          │ false  │ waiting         │
└────────────────┴─────────────────┴────────────────────────┴───────────────────┴────────┴────────────┴────────┴─────────────────┘

You should see the 3 consumers. Only one should be active. If there were other consumers in this virtual host, we should see them as well, but we have registered only 3 of them.

List the consumer groups with rabbitmqctl list_stream_consumer_groups:

docker exec rabbitmq rabbitmqctl list_stream_consumer_groups
Listing stream consumer groups ...
┌────────────────────────┬───────────┬─────────────────┬───────────┐
│ stream                 │ reference │ partition_index │ consumers │
├────────────────────────┼───────────┼─────────────────┼───────────┤
│ single-active-consumer │ my-app    │ -1              │ 3         │
└────────────────────────┴───────────┴─────────────────┴───────────┘

You should see our group, with its stream and reference (my-app), and the number of consumers. The stream is not part of a super stream, so its partition index is -1.

List the consumers of the group with rabbitmqctl list_stream_group_consumers.

docker exec rabbitmq rabbitmqctl list_stream_group_consumers \
  --stream single-active-consumer --reference my-app
Listing group consumers ...
┌─────────────────┬─────────────────────────────────────┬──────────┐
│ subscription_id │ connection_name                     │ state    │
├─────────────────┼─────────────────────────────────────┼──────────┤
│ 0               │ 172.17.0.1:36112 -> 172.17.0.2:5552 │ active   │
├─────────────────┼─────────────────────────────────────┼──────────┤
│ 0               │ 172.17.0.1:36116 -> 172.17.0.2:5552 │ inactive │
├─────────────────┼─────────────────────────────────────┼──────────┤
│ 0               │ 172.17.0.1:36120 -> 172.17.0.2:5552 │ inactive │
└─────────────────┴─────────────────────────────────────┴──────────┘

You should see the 3 consumers of the group, with their subscription ID in the connection, their owning connection, and their state. This command lists only the consumers of a given group, not other consumers.

Go to the page of the stream in the management UI:

image

You should see the consumers with the same details as in the output of the rabbitmqctl list_stream_group_consumers command. Remember this is the page for AMQP queues, so there are some discrepancies between the AMQP model and the stream model (e.g. the channel links go to the stream connections, not channels).

Click on the link to the connection (channel column) of the active consumer:

image

This is a stream-specific page, so you should see extra information about the consumer (consumed messages, offset, etc). Check the state column says the consumer is active.

Go back to the single-active-consumer stream/queue page and go to the connection page of one of the inactive consumers:

image

Check the state column says the consumer is not active.

Now go back to the page of the connection of the active consumer and close the connection (link at the bottom of the page).

The output of the consumer program should say the next consumer took over:

...
Consumer 0 receive a message.
Consumer 0 receive a message.
Consumer 0 receive a message.
Consumer 1 receive a message.
Consumer 1 receive a message.
Consumer 1 receive a message.
Consumer 1 receive a message.
...

Check the consumers of the group with rabbitmqctl list_stream_group_consumers:

docker exec rabbitmq rabbitmqctl list_stream_group_consumers --stream single-active-consumer --reference my-app
Listing group consumers ...
┌─────────────────┬─────────────────────────────────────┬──────────┐
│ subscription_id │ connection_name                     │ state    │
├─────────────────┼─────────────────────────────────────┼──────────┤
│ 0               │ 172.17.0.1:36116 -> 172.17.0.2:5552 │ active   │
├─────────────────┼─────────────────────────────────────┼──────────┤
│ 0               │ 172.17.0.1:36120 -> 172.17.0.2:5552 │ inactive │
├─────────────────┼─────────────────────────────────────┼──────────┤
│ 0               │ 172.17.0.1:36126 -> 172.17.0.2:5552 │ inactive │
└─────────────────┴─────────────────────────────────────┴──────────┘

There should still be 3 consumers, because the stream client recovered the closed connection. The consumer next in line should be the active one now (in our example the consumer on port 36116 was the second in line, so it got promoted when the active consumer was closed).

Stop stream-perf-test and the consumer program (Ctrl-C them).

Delete the stream:

docker exec rabbitmq rabbitmqctl delete_queue single-active-consumer

Single Active Consumer with a Super Stream

Create an invoices super stream with 3 partitions:

docker exec rabbitmq rabbitmq-streams add_super_stream invoices --partitions 3

This should create an invoices direct exchange with 3 streams bound to it. Check this on the exchange detail page: http://localhost:15672/#/exchanges/%2F/invoices

image

Start a first instance of the consuming application:

./mvnw -q compile exec:java -Dexec.mainClass=com.rabbitmq.stream.SuperStreamConsumer -Dexec.arguments="instance-1"

The application starts a "composite consumer" for the super stream. This is a client-side feature. The client library registers a consumer to each partition (stream) of the super stream. Consider this program as an instance/VM/container/pod of a user application.

NB: more information on the single active consumer and super stream support in the stream Java client documentation

Start the publishing application:

./mvnw -q compile exec:java -Dexec.mainClass=com.rabbitmq.stream.SuperStreamProducer

The publishing application uses a "composite producer", which is also a client-side feature. The client library creates a producer for each partition and each message is routed based on a client-side routing strategy (in this case, hashing the ID of the message, which is an incrementing sequence). The application publishes a message every second, which means a message ends in one of the partition every 3 seconds (if the hashing is well-balanced, which it should be).

The consumer application should report messages:

Starting consumer instance-1
Consumer instance-1 received message 0 from stream invoices-0.
Consumer instance-1 received message 1 from stream invoices-0.
Consumer instance-1 received message 2 from stream invoices-1.
Consumer instance-1 received message 3 from stream invoices-1.
Consumer instance-1 received message 4 from stream invoices-2.
Consumer instance-1 received message 5 from stream invoices-0.
Consumer instance-1 received message 6 from stream invoices-0.
Consumer instance-1 received message 7 from stream invoices-2.
Consumer instance-1 received message 8 from stream invoices-2.
Consumer instance-1 received message 9 from stream invoices-2.
Consumer instance-1 received message 10 from stream invoices-0.
...

The application reports for each message its ID and the partition it comes from. The messages should be well-balanced between partitions (but it's not round-robin!).

List all the stream consumers with rabbitmqctl list_stream_consumers:

docker exec rabbitmq rabbitmqctl list_stream_consumers \
  connection_pid,subscription_id,stream,messages_consumed,offset,offset_lag,active,activity_status
Listing stream consumers ...
┌────────────────┬─────────────────┬────────────┬───────────────────┬────────┬────────────┬────────┬─────────────────┐
│ connection_pid │ subscription_id │ stream     │ messages_consumed │ offset │ offset_lag │ active │ activity_status │
├────────────────┼─────────────────┼────────────┼───────────────────┼────────┼────────────┼────────┼─────────────────┤
│ <11402.1070.0> │ 0               │ invoices-0 │ 87                │ 140    │ 1          │ true   │ single_active   │
├────────────────┼─────────────────┼────────────┼───────────────────┼────────┼────────────┼────────┼─────────────────┤
│ <11402.1076.0> │ 0               │ invoices-2 │ 78                │ 131    │ 1          │ true   │ single_active   │
├────────────────┼─────────────────┼────────────┼───────────────────┼────────┼────────────┼────────┼─────────────────┤
│ <11402.1082.0> │ 0               │ invoices-1 │ 82                │ 135    │ 1          │ true   │ single_active   │
└────────────────┴─────────────────┴────────────┴───────────────────┴────────┴────────────┴────────┴─────────────────┘

You should see the 3 consumers of the "composite consumer", one for each partition, all active (each is the only one on a given partition).

List the consumer groups with rabbitmqctl list_stream_consumer_groups:

docker exec rabbitmq rabbitmqctl list_stream_consumer_groups
Listing stream consumer groups ...
┌────────────┬───────────┬─────────────────┬───────────┐
│ stream     │ reference │ partition_index │ consumers │
├────────────┼───────────┼─────────────────┼───────────┤
│ invoices-0 │ my-app    │ 0               │ 1         │
├────────────┼───────────┼─────────────────┼───────────┤
│ invoices-1 │ my-app    │ 1               │ 1         │
├────────────┼───────────┼─────────────────┼───────────┤
│ invoices-2 │ my-app    │ 2               │ 1         │
└────────────┴───────────┴─────────────────┴───────────┘

A consumer group for each partition should show up. They should each report only one consumer (there's only 1 instance of the application so far).

Let's use rabbitmqctl list_stream_group_consumers for the group of the first partition:

docker exec rabbitmq rabbitmqctl list_stream_group_consumers --stream invoices-0 --reference my-app
Listing group consumers ...
┌─────────────────┬─────────────────────────────────────┬────────┐
│ subscription_id │ connection_name                     │ state  │
├─────────────────┼─────────────────────────────────────┼────────┤
│ 0               │ 172.17.0.1:36214 -> 172.17.0.2:5552 │ active │
└─────────────────┴─────────────────────────────────────┴────────┘

The group has only one consumer.

Start a second instance of the consuming application:

./mvnw -q compile exec:java -Dexec.mainClass=com.rabbitmq.stream.SuperStreamConsumer -Dexec.arguments="instance-2"

This second instance will also create a composite consumer and the broker will start dispatching messages from invoices-1 to it:

Starting consumer instance-2
Consumer instance-2 received message 642 from stream invoices-1.
Consumer instance-2 received message 645 from stream invoices-1.
Consumer instance-2 received message 649 from stream invoices-1.
Consumer instance-2 received message 650 from stream invoices-1.
...

So instance-2 gets messages from invoices-1. The first instance should then stop receiving messages from invoices-1:

...
Consumer instance-1 received message 635 from stream invoices-0.
Consumer instance-1 received message 636 from stream invoices-0.
Consumer instance-1 received message 637 from stream invoices-2.
Consumer instance-1 received message 638 from stream invoices-1.     <--- "last" messages from invoices-1
Consumer instance-1 received message 639 from stream invoices-0.
Consumer instance-1 received message 640 from stream invoices-2.
Consumer instance-1 received message 641 from stream invoices-0.
Consumer instance-1 received message 643 from stream invoices-0.
Consumer instance-1 received message 644 from stream invoices-0.
Consumer instance-1 received message 646 from stream invoices-0.
Consumer instance-1 received message 647 from stream invoices-2.
Consumer instance-1 received message 648 from stream invoices-0.
Consumer instance-1 received message 656 from stream invoices-0.

The broker rebalanced the dispatching of messages: the invoices-1 partition went from the first instance to the second instance.

List all the stream consumers:

docker exec rabbitmq rabbitmqctl list_stream_consumers \
  connection_pid,subscription_id,stream,messages_consumed,offset,offset_lag,active,activity_status
Listing stream consumers ...
┌────────────────┬─────────────────┬────────────┬───────────────────┬────────┬────────────┬────────┬─────────────────┐
│ connection_pid │ subscription_id │ stream     │ messages_consumed │ offset │ offset_lag │ active │ activity_status │
├────────────────┼─────────────────┼────────────┼───────────────────┼────────┼────────────┼────────┼─────────────────┤
│ <11402.2204.0> │ 0               │ invoices-1 │ 487               │ 1210   │ 1          │ true   │ single_active   │
├────────────────┼─────────────────┼────────────┼───────────────────┼────────┼────────────┼────────┼─────────────────┤
│ <11402.1076.0> │ 0               │ invoices-2 │ 763               │ 1283   │ 0          │ true   │ single_active   │
├────────────────┼─────────────────┼────────────┼───────────────────┼────────┼────────────┼────────┼─────────────────┤
│ <11402.2192.0> │ 0               │ invoices-0 │ 0                 │ 0      │ 0          │ false  │ waiting         │
├────────────────┼─────────────────┼────────────┼───────────────────┼────────┼────────────┼────────┼─────────────────┤
│ <11402.1070.0> │ 0               │ invoices-0 │ 741               │ 1244   │ 1          │ true   │ single_active   │
├────────────────┼─────────────────┼────────────┼───────────────────┼────────┼────────────┼────────┼─────────────────┤
│ <11402.1082.0> │ 0               │ invoices-1 │ 229               │ 379    │ 0          │ false  │ waiting         │
├────────────────┼─────────────────┼────────────┼───────────────────┼────────┼────────────┼────────┼─────────────────┤
│ <11402.2198.0> │ 0               │ invoices-2 │ 0                 │ 0      │ 0          │ false  │ waiting         │
└────────────────┴─────────────────┴────────────┴───────────────────┴────────┴────────────┴────────┴─────────────────┘

The list should confirm that one former active consumer (in our case the one from connection <11402.1082.0>) became inactive in favor of a new one (the one from connection <11402.2204.0> here).

Listing the consumer groups should confirm there are 2 consumers on each partition now:

docker exec rabbitmq rabbitmqctl list_stream_consumer_groups
Listing stream consumer groups ...
┌────────────┬───────────┬─────────────────┬───────────┐
│ stream     │ reference │ partition_index │ consumers │
├────────────┼───────────┼─────────────────┼───────────┤
│ invoices-0 │ my-app    │ 0               │ 2         │
├────────────┼───────────┼─────────────────┼───────────┤
│ invoices-1 │ my-app    │ 1               │ 2         │
├────────────┼───────────┼─────────────────┼───────────┤
│ invoices-2 │ my-app    │ 2               │ 2         │
└────────────┴───────────┴─────────────────┴───────────┘

Start now a third instance of the consuming application:

./mvnw -q compile exec:java -Dexec.mainClass=com.rabbitmq.stream.SuperStreamConsumer -Dexec.arguments="instance-3"

This third instance should get messages from invoices-2:

Starting consumer instance-3
Consumer instance-3 received message 388 from stream invoices-2.
Consumer instance-3 received message 390 from stream invoices-2.
Consumer instance-3 received message 391 from stream invoices-2.
Consumer instance-3 received message 392 from stream invoices-2.
Consumer instance-3 received message 393 from stream invoices-2.
Consumer instance-3 received message 397 from stream invoices-2.
...

The first instance should get only messages from invoices-0 now:

...
Consumer instance-1 received message 376 from stream invoices-2.
Consumer instance-1 received message 377 from stream invoices-0.
Consumer instance-1 received message 380 from stream invoices-2.     <--- "last" messages from invoices-2
Consumer instance-1 received message 384 from stream invoices-0.
Consumer instance-1 received message 385 from stream invoices-0.
Consumer instance-1 received message 394 from stream invoices-0.
Consumer instance-1 received message 395 from stream invoices-0.
Consumer instance-1 received message 398 from stream invoices-0.
Consumer instance-1 received message 399 from stream invoices-0.
Consumer instance-1 received message 402 from stream invoices-0.
...

list_stream_consumer_groups should show that each partition has 3 consumers:

docker exec rabbitmq rabbitmqctl list_stream_consumer_groups
┌────────────┬───────────┬─────────────────┬───────────┐
│ stream     │ reference │ partition_index │ consumers │
├────────────┼───────────┼─────────────────┼───────────┤
│ invoices-0 │ my-app    │ 0               │ 3         │
├────────────┼───────────┼─────────────────┼───────────┤
│ invoices-1 │ my-app    │ 1               │ 3         │
├────────────┼───────────┼─────────────────┼───────────┤
│ invoices-2 │ my-app    │ 2               │ 3         │
└────────────┴───────────┴─────────────────┴───────────┘

List the consumers of the consumers on the first partition:

docker exec rabbitmq rabbitmqctl list_stream_group_consumers --stream invoices-0 --reference my-app
┌─────────────────┬─────────────────────────────────────┬──────────┐
│ subscription_id │ connection_name                     │ state    │
├─────────────────┼─────────────────────────────────────┼──────────┤
│ 0               │ 172.17.0.1:46228 -> 172.17.0.2:5552 │ active   │
├─────────────────┼─────────────────────────────────────┼──────────┤
│ 0               │ 172.17.0.1:46248 -> 172.17.0.2:5552 │ inactive │
├─────────────────┼─────────────────────────────────────┼──────────┤
│ 0               │ 172.17.0.1:46266 -> 172.17.0.2:5552 │ inactive │
└─────────────────┴─────────────────────────────────────┴──────────┘

There should be 1 active consumer (the one from the first instance, which is active since the beginning) and 2 inactive consumers (the one from instance 2 and 3, that we started after instance 1).

Stop instance-1 (the first you started) with Ctrl-C. This should trigger a significant rebalancing:

  • instance-2: formerly invoices-1, now invoices-0 and invoices-2
  • instance-3: formerly invoices-2, now invoices-1

Listing the consumers of the group for invoices-0 should confirm instance-2 took over (it was the second in line):

docker exec rabbitmq rabbitmqctl list_stream_group_consumers --stream invoices-0 --reference my-app
Listing group consumers ...
┌─────────────────┬─────────────────────────────────────┬──────────┐
│ subscription_id │ connection_name                     │ state    │
├─────────────────┼─────────────────────────────────────┼──────────┤
│ 0               │ 172.17.0.1:46248 -> 172.17.0.2:5552 │ active   │
├─────────────────┼─────────────────────────────────────┼──────────┤
│ 0               │ 172.17.0.1:46266 -> 172.17.0.2:5552 │ inactive │
└─────────────────┴─────────────────────────────────────┴──────────┘

Stop instance-1 now. instance-2 should take now the 3 partitions.

Stop instance-3.

rabbitmqctl list_stream_consumer_groups should return nothing:

docker exec rabbitmq rabbitmqctl list_stream_consumer_groups
Listing stream consumer groups ...

Trying to list the consumers of the group on invoices-0 should return an error because the group does no longer exist:

docker exec rabbitmq rabbitmqctl list_stream_group_consumers --stream invoices-0 --reference my-app
Listing group consumers ...
Error:
The group does not exist

Stop the publisher.

Stop the broker Docker container.

@acogoluegnes acogoluegnes force-pushed the stream-single-active-consumer branch from dcc47fd to 6c1129e Compare February 7, 2022 09:56
@mergify
Copy link

mergify bot commented Mar 11, 2022

This pull request modifies the bazel build only. Should the makefiles be updated as well @acogoluegnes?

@HoloRin
Copy link
Contributor

HoloRin commented Mar 11, 2022

This pull request modifies the bazel build only. Should the makefiles be updated as well @acogoluegnes?

This is fine. New test suites require a build file entry, due to fine grained dependencies, which is not the case in erlang.mk

@acogoluegnes acogoluegnes force-pushed the stream-single-active-consumer branch from 6c1129e to f1beb12 Compare March 23, 2022 10:19
@acogoluegnes acogoluegnes marked this pull request as ready for review March 28, 2022 14:35
@acogoluegnes acogoluegnes force-pushed the stream-single-active-consumer branch from 905bf45 to 020d7be Compare March 30, 2022 09:29
@ansd ansd self-requested a review March 30, 2022 09:55
Copy link
Member

@ansd ansd left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you, looks very good overall. I did the acceptance. Worked for me.

I left some small inline comments.

More general questions are:

(1)
I'm missing declaring a SAC stream via AMQP / Management API. Will that be added in a future PR or should it be part of this PR or is that not planned at all?

Also when listing the queues in the Management UI, it would be nice to see the SAC feature icon (as displayed for other queue types).

(2)
In the example above:

Stop instance-1 (the first you started) with Ctrl-C. This should trigger a significant rebalancing:

instance-2: formerly invoices-1, now invoices-0 and invoices-2
instance-3: formerly invoices-2, now invoices-1

It would be nice if there would not be a "significant" rebalancing. So, in this case I would have expected instance-2 to continue with invoices-1 and instance-3 to continue with invoices-2 while instance 2 or 3 gets additionally invoices-0. I'm thinking about stream processing applications with more than only 3 instances and only 3 partitions. In this case if a single instance crashes, all other instances will need to stop their stream processing (for example aggregate calculations) and start with consuming from new stream partitions even though in above example they could have just continued if there wasn't a "significant" rebalancing happening.

@acogoluegnes
Copy link
Contributor Author

@ansd Thanks for the review. I'll have a look at the code comments. Some answers below.

(1) I'm missing declaring a SAC stream via AMQP / Management API. Will that be added in a future PR or should it be part of this PR or is that not planned at all?

There's no such thing as a SAC stream. An application sets a property when registering a consumer to declare it's a single active consumer. It must also sets the "name" of the consumer, that is some ID that the several consumer instances should share.

Also when listing the queues in the Management UI, it would be nice to see the SAC feature icon (as displayed for other queue types).

The SAC semantics between queues and streams are different. For a queue, all consumers are the same (they don't have a "name"), so the queue is declared as SAC-compliant and the SAC behavior is enforced for consumers registering to it.

It's different with streams. You declare a regular stream and consumers registers to it, and they can SAC or regular consumers.

(2) In the example above:

Stop instance-1 (the first you started) with Ctrl-C. This should trigger a significant rebalancing:
instance-2: formerly invoices-1, now invoices-0 and invoices-2
instance-3: formerly invoices-2, now invoices-1

It would be nice if there would not be a "significant" rebalancing. So, in this case I would have expected instance-2 to continue with invoices-1 and instance-3 to continue with invoices-2 while instance 2 or 3 gets additionally invoices-0. I'm thinking about stream processing applications with more than only 3 instances and only 3 partitions. In this case if a single instance crashes, all other instances will need to stop their stream processing (for example aggregate calculations) and start with consuming from new stream partitions even though in above example they could have just continued if there wasn't a "significant" rebalancing happening.

We use this simple and predictable rebalancing algorithm with a modulo on the list of consumers registered to a stream. It's stream-specific, which means there's no coordination between the streams composing the super stream. It gives good results without too much hassle, but it can get a big dumb for rebalancing. We're still to open to find another mechanism that good give better results, especially for the rebalancing.

@acogoluegnes acogoluegnes force-pushed the stream-single-active-consumer branch 2 times, most recently from fd83b9f to 66b5b37 Compare April 11, 2022 12:32
Copy link
Contributor

@kjnilsson kjnilsson left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks good, a few smaller fixes

@acogoluegnes acogoluegnes force-pushed the stream-single-active-consumer branch from ead0c9b to 23b8a5d Compare April 13, 2022 12:51
@MarcialRosales
Copy link
Contributor

MarcialRosales commented Apr 18, 2022

I did the acceptance and it worked for me too. I only have the following comments below.

Testing SAC =>

  1. when I launched the single active consumer app, I noticed these 7 connections: 1 locator, 3 producer(s), and 3 consumer(s). I did not expect the 3 producer(s) connections though.

  2. When I tested SAC with two separate applications instances on the same consumer group, I got the following warning message on the first instance. However, it did not affect the consumer and it continued consuming fine afterwards.

12:33:23.622 [pool-10-thread-1] WARN  c.r.stream.impl.StreamConsumer - Previous and new status are the same (true), there should be no consumer update in this case.

Testing Super Streams =>

  1. The "consumers" column on the queues tab did not show the number of consumers in the partitioned streams. It did show ?. I am not sure what that value should be, whether the total count of consumers (active and waiting) or 1 if it has an active consumer and 0 if it has no consumers at all.

  2. Have you considered having rabbitmqctl commands for super-streams such as this one that given the name of the super-stream and the consumer group, it gives me who is consuming from the super-stream without having to run this command once per partition?

rabbitmqctl list_stream_group_consumers --super-stream invoices --reference my-app
  1. The documentation looks very good. The RoutingStrategy in the super stream is very powerful. Maybe it is worth mentioning that it allows us to route the message to multiple partitions or none rather than the standard routing function that routes the message always to one partition. Although from the code sample you have provided it is clear that we are returning a list rather than a single value.

@acogoluegnes
Copy link
Contributor Author

My answers below.

Testing SAC =>

  1. when I launched the single active consumer app, I noticed these 7 connections: 1 locator, 3 producer(s), and 3 consumer(s). I did not expect the 3 producer(s) connections though.

This is expected, the producer connections handle the offset storage. There's 1 connection per consumer and tracking consumer to simulate independent application (in different OS processes). This is set up in the program:

try (Environment environment =
        Environment.builder()
            .maxConsumersByConnection(1)
            .maxTrackingConsumersByConnection(1)
            .build()) {
...
}
  1. When I tested SAC with two separate applications instances on the same consumer group, I got the following warning message on the first instance. However, it did not affect the consumer and it continued consuming fine afterwards.
12:33:23.622 [pool-10-thread-1] WARN  c.r.stream.impl.StreamConsumer - Previous and new status are the same (true), there should be no consumer update in this case.

This should not happen, can you tell me how to reproduce?

Testing Super Streams =>

  1. The "consumers" column on the queues tab did not show the number of consumers in the partitioned streams. It did show ?. I am not sure what that value should be, whether the total count of consumers (active and waiting) or 1 if it has an active consumer and 0 if it has no consumers at all.

This is a bug that impacts streams in general, so nothing specific to SAC or super streams.

Follow-up issue.

  1. Have you considered having rabbitmqctl commands for super-streams such as this one that given the name of the super-stream and the consumer group, it gives me who is consuming from the super-stream without having to run this command once per partition?
rabbitmqctl list_stream_group_consumers --super-stream invoices --reference my-app

This is a good point, there's no "global" way to monitor a super stream, some CLI commands would be a good start. This may be out of the scope of this PR for now.

  1. The documentation looks very good. The RoutingStrategy in the super stream is very powerful. Maybe it is worth mentioning that it allows us to route the message to multiple partitions or none rather than the standard routing function that routes the message always to one partition. Although from the code sample you have provided it is clear that we are returning a list rather than a single value.

Sounds good, will do.

acogoluegnes and others added 26 commits May 9, 2022 10:52
In the regular metrics ETS table, not the one from streams.

References #3753
It was broken after introducing the "connection label" field
for the list_stream_group_consumers CLI command.

References #3753
Block stream SAC functions if the feature flag is not enabled.

References #3753
Stream consumers can be active or not with SAC, so these 2 fields
are added to the stream metrics. This is the same as with
regular consumers.

References #3753
For stream consumer groups CLI commands. This is useful in case
new fields are needed in further versions. A new version
node can ask for new fields to an old version node. If
the old version node returns a known value for unknown fields
instead of failing, the new node can set up appropriate
default value for these fields in the result of the CLI commands.

References #3753
The UI handles the case where the 2 fields are not present.
This can happen in a mixed-version cluster, where a node
of a previous version returns records without the fields.
The UI uses default values (active = true, activity status = up),
which is valid as the consumers of the node are "standalone"
consumers (not part of a group).

References #3753
A formerly active consumer can have in-flight credit
requests when it becomes inactive. This commit checks
the state of consumer on credit requests and make sure
not to dispatch messages if it's inactive.
And do not do anything if it's not.

References #3753
Single active consumer must have a name, which is used as the reference
for storing offsets and as the name of the group the consumer belongs
to in case the stream is a partition of a super stream.

References #3753
As per @ansd's suggestion. This way the stream coordinator
does not have to know about the SAC commands.

References #3753
@acogoluegnes acogoluegnes force-pushed the stream-single-active-consumer branch from 6328b84 to 85b0625 Compare May 9, 2022 08:53
@acogoluegnes acogoluegnes merged commit f6f3f09 into master May 9, 2022
@michaelklishin michaelklishin added this to the 3.11.0 milestone May 9, 2022
@acogoluegnes acogoluegnes deleted the stream-single-active-consumer branch May 9, 2022 13:03
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants