diff --git a/RabbitMQ.AMQP.Client/IConsumerBuilder.cs b/RabbitMQ.AMQP.Client/IConsumerBuilder.cs
index d3eb7169..db6c42fd 100644
--- a/RabbitMQ.AMQP.Client/IConsumerBuilder.cs
+++ b/RabbitMQ.AMQP.Client/IConsumerBuilder.cs
@@ -2,6 +2,7 @@
// 2.0, and the Mozilla Public License, version 2.0.
// Copyright (c) 2017-2023 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
+using System;
using System.Threading;
using System.Threading.Tasks;
@@ -24,25 +25,39 @@ public interface IConsumerBuilder
IConsumerBuilder InitialCredits(int initialCredits);
+ ///
+ /// SubscriptionListener interface callback to add behavior before a subscription is created.
+ /// This callback is meant for stream consumers:
+ /// it can be used to dynamically set the offset the consumer attaches to in the stream.
+ /// It is called when the consumer is first created and when the client has to re-subscribe
+ /// (e.g. after a disconnection).
+ ///
+ /// Contains the listenerContext, see
+ ///
+ IConsumerBuilder SubscriptionListener(Action listenerContext);
+
IStreamOptions Stream();
Task BuildAndStartAsync(CancellationToken cancellationToken = default);
+
public interface IStreamOptions
{
IStreamOptions Offset(long offset);
-
- // IStreamOptions offset(Instant timestamp);
-
IStreamOptions Offset(StreamOffsetSpecification specification);
-
- IStreamOptions Offset(string interval);
-
IStreamOptions FilterValues(string[] values);
-
IStreamOptions FilterMatchUnfiltered(bool matchUnfiltered);
-
IConsumerBuilder Builder();
}
+
+
+ ///
+ /// ListenerContext is a helper class that holds the contexts for the listener
+ ///
+ /// Stream Options that the user can change during the SubscriptionListener
+ public record ListenerContext(IStreamOptions StreamOptions)
+ {
+ public IStreamOptions StreamOptions { get; } = StreamOptions;
+ }
}
}
diff --git a/RabbitMQ.AMQP.Client/IEntities.cs b/RabbitMQ.AMQP.Client/IEntities.cs
index c9d99205..6366fe9c 100644
--- a/RabbitMQ.AMQP.Client/IEntities.cs
+++ b/RabbitMQ.AMQP.Client/IEntities.cs
@@ -67,6 +67,7 @@ public interface IQueueSpecification : IEntityInfoSpecification
IQueueSpecification MaxLengthBytes(ByteCapacity maxLengthBytes);
+ // TODO: Add more tests for SingleActiveConsumer
IQueueSpecification SingleActiveConsumer(bool singleActiveConsumer);
IQueueSpecification Expires(TimeSpan expiration);
diff --git a/RabbitMQ.AMQP.Client/Impl/AmqpConsumer.cs b/RabbitMQ.AMQP.Client/Impl/AmqpConsumer.cs
index f878f724..abc768ba 100644
--- a/RabbitMQ.AMQP.Client/Impl/AmqpConsumer.cs
+++ b/RabbitMQ.AMQP.Client/Impl/AmqpConsumer.cs
@@ -21,28 +21,18 @@ private enum PauseStatus
PAUSED,
}
- private readonly AmqpConnection _connection;
- private readonly string _address;
- private readonly MessageHandler _messageHandler;
- private readonly int _initialCredits;
- private readonly Map _filters;
private readonly Guid _id = Guid.NewGuid();
private ReceiverLink? _receiverLink;
private PauseStatus _pauseStatus = PauseStatus.UNPAUSED;
private readonly UnsettledMessageCounter _unsettledMessageCounter = new();
+ private readonly ConsumerConfiguration _configuration;
- public AmqpConsumer(AmqpConnection connection, string address,
- MessageHandler messageHandler, int initialCredits, Map filters)
+ public AmqpConsumer(ConsumerConfiguration configuration)
{
- _connection = connection;
- _address = address;
- _messageHandler = messageHandler;
- _initialCredits = initialCredits;
- _filters = filters;
-
- if (false == _connection.Consumers.TryAdd(_id, this))
+ _configuration = configuration;
+ if (false == _configuration.Connection.Consumers.TryAdd(_id, this))
{
// TODO error?
}
@@ -52,9 +42,20 @@ public override async Task OpenAsync()
{
try
{
- TaskCompletionSource attachCompletedTcs = new(TaskCreationOptions.RunContinuationsAsynchronously);
+ TaskCompletionSource attachCompletedTcs =
+ new(TaskCreationOptions.RunContinuationsAsynchronously);
+
+ // this is an event to get the filters to the listener context
+ // it _must_ be here because in case of reconnect the original filters could be not valid anymore
+ // so the function must be called every time the consumer is opened normally or by reconnection
+ // if ListenerContext is null the function will do nothing
+ // ListenerContext will override only the filters the selected filters.
+ _configuration.ListenerContext?.Invoke(
+ new IConsumerBuilder.ListenerContext(new ListenerStreamOptions(_configuration.Filters)));
- Attach attach = Utils.CreateAttach(_address, DeliveryMode.AtLeastOnce, _id, _filters);
+
+ Attach attach = Utils.CreateAttach(_configuration.Address, DeliveryMode.AtLeastOnce, _id,
+ _configuration.Filters);
void onAttached(ILink argLink, Attach argAttach)
{
@@ -74,7 +75,7 @@ void onAttached(ILink argLink, Attach argAttach)
ReceiverLink? tmpReceiverLink = null;
Task receiverLinkTask = Task.Run(async () =>
{
- Session session = await _connection._nativePubSubSessions.GetOrCreateSessionAsync()
+ Session session = await _configuration.Connection._nativePubSubSessions.GetOrCreateSessionAsync()
.ConfigureAwait(false);
tmpReceiverLink = new ReceiverLink(session, _id.ToString(), attach, onAttached);
});
@@ -89,7 +90,7 @@ await receiverLinkTask.WaitAsync(waitSpan)
.ConfigureAwait(false);
System.Diagnostics.Debug.Assert(tmpReceiverLink != null);
- System.Diagnostics.Debug.Assert(Object.ReferenceEquals(_receiverLink, tmpReceiverLink));
+ System.Diagnostics.Debug.Assert(object.ReferenceEquals(_receiverLink, tmpReceiverLink));
if (_receiverLink is null)
{
@@ -103,7 +104,7 @@ await receiverLinkTask.WaitAsync(waitSpan)
}
else
{
- _receiverLink.SetCredit(_initialCredits);
+ _receiverLink.SetCredit(_configuration.InitialCredits);
// TODO save / cancel task
_ = Task.Run(ProcessMessages);
@@ -150,7 +151,10 @@ private async Task ProcessMessages()
// TODO catch exceptions thrown by handlers,
// then call exception handler?
- await _messageHandler(context, amqpMessage).ConfigureAwait(false);
+ if (_configuration.Handler != null)
+ {
+ await _configuration.Handler(context, amqpMessage).ConfigureAwait(false);
+ }
}
}
catch (Exception e)
@@ -173,20 +177,24 @@ public void Pause()
if (_receiverLink is null)
{
// TODO create "internal bug" exception type?
- throw new InvalidOperationException("_receiverLink is null, report via https://github.com/rabbitmq/rabbitmq-amqp-dotnet-client/issues");
+ throw new InvalidOperationException(
+ "_receiverLink is null, report via https://github.com/rabbitmq/rabbitmq-amqp-dotnet-client/issues");
}
- if ((int)PauseStatus.UNPAUSED == Interlocked.CompareExchange(ref Unsafe.As(ref _pauseStatus),
- (int)PauseStatus.PAUSING, (int)PauseStatus.UNPAUSED))
+ if ((int)PauseStatus.UNPAUSED == Interlocked.CompareExchange(
+ ref Unsafe.As(ref _pauseStatus),
+ (int)PauseStatus.PAUSING, (int)PauseStatus.UNPAUSED))
{
_receiverLink.SetCredit(credit: 0);
- if ((int)PauseStatus.PAUSING != Interlocked.CompareExchange(ref Unsafe.As(ref _pauseStatus),
- (int)PauseStatus.PAUSED, (int)PauseStatus.PAUSING))
+ if ((int)PauseStatus.PAUSING != Interlocked.CompareExchange(
+ ref Unsafe.As(ref _pauseStatus),
+ (int)PauseStatus.PAUSED, (int)PauseStatus.PAUSING))
{
_pauseStatus = PauseStatus.UNPAUSED;
// TODO create "internal bug" exception type?
- throw new InvalidOperationException("error transitioning from PAUSING -> PAUSED, report via https://github.com/rabbitmq/rabbitmq-amqp-dotnet-client/issues");
+ throw new InvalidOperationException(
+ "error transitioning from PAUSING -> PAUSED, report via https://github.com/rabbitmq/rabbitmq-amqp-dotnet-client/issues");
}
}
else
@@ -197,10 +205,7 @@ public void Pause()
public long UnsettledMessageCount
{
- get
- {
- return _unsettledMessageCounter.Get();
- }
+ get { return _unsettledMessageCounter.Get(); }
}
public void Unpause()
@@ -208,13 +213,15 @@ public void Unpause()
if (_receiverLink is null)
{
// TODO create "internal bug" exception type?
- throw new InvalidOperationException("_receiverLink is null, report via https://github.com/rabbitmq/rabbitmq-amqp-dotnet-client/issues");
+ throw new InvalidOperationException(
+ "_receiverLink is null, report via https://github.com/rabbitmq/rabbitmq-amqp-dotnet-client/issues");
}
- if ((int)PauseStatus.PAUSED == Interlocked.CompareExchange(ref Unsafe.As(ref _pauseStatus),
- (int)PauseStatus.UNPAUSED, (int)PauseStatus.PAUSED))
+ if ((int)PauseStatus.PAUSED == Interlocked.CompareExchange(
+ ref Unsafe.As(ref _pauseStatus),
+ (int)PauseStatus.UNPAUSED, (int)PauseStatus.PAUSED))
{
- _receiverLink.SetCredit(credit: _initialCredits);
+ _receiverLink.SetCredit(credit: _configuration.InitialCredits);
}
else
{
@@ -240,19 +247,20 @@ await _receiverLink.CloseAsync(TimeSpan.FromSeconds(5))
}
catch (Exception ex)
{
- Trace.WriteLine(TraceLevel.Warning, "Failed to close receiver link. The consumer will be closed anyway", ex);
+ Trace.WriteLine(TraceLevel.Warning, "Failed to close receiver link. The consumer will be closed anyway",
+ ex);
}
_receiverLink = null;
OnNewStatus(State.Closed, null);
- _connection.Consumers.TryRemove(_id, out _);
+ _configuration.Connection.Consumers.TryRemove(_id, out _);
}
public override string ToString()
{
- return $"Consumer{{Address='{_address}', " +
+ return $"Consumer{{Address='{_configuration.Address}', " +
$"id={_id}, " +
- $"Connection='{_connection}', " +
+ $"Connection='{_configuration.Connection}', " +
$"State='{State}'}}";
}
}
diff --git a/RabbitMQ.AMQP.Client/Impl/AmqpConsumerBuilder.cs b/RabbitMQ.AMQP.Client/Impl/AmqpConsumerBuilder.cs
index 84704265..e315bf65 100644
--- a/RabbitMQ.AMQP.Client/Impl/AmqpConsumerBuilder.cs
+++ b/RabbitMQ.AMQP.Client/Impl/AmqpConsumerBuilder.cs
@@ -2,6 +2,7 @@
// 2.0, and the Mozilla Public License, version 2.0.
// Copyright (c) 2017-2023 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
+using System;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
@@ -9,17 +10,30 @@
namespace RabbitMQ.AMQP.Client.Impl
{
+ ///
+ /// ConsumerConfiguration is a helper class that holds the configuration for the consumer
+ ///
+ public class ConsumerConfiguration
+ {
+ public AmqpConnection Connection { get; set; } = null!;
+ public string Address { get; set; } = "";
+ public int InitialCredits { get; set; } = 10;
+ public Map Filters { get; set; } = new();
+ public MessageHandler? Handler { get; set; }
+ public Action? ListenerContext = null;
+ }
+
+ ///
+ /// The builder class for create the consumer.
+ /// The builder is called by the connection
+ ///
public class AmqpConsumerBuilder : IConsumerBuilder
{
- private readonly AmqpConnection _connection;
- private string _queue = "";
- private int _initialCredits = 10;
- private readonly Map _filters = new Map();
- private MessageHandler? _handler;
+ private readonly ConsumerConfiguration _configuration = new();
public AmqpConsumerBuilder(AmqpConnection connection)
{
- _connection = connection;
+ _configuration.Connection = connection;
}
public IConsumerBuilder Queue(IQueueSpecification queueSpec)
@@ -29,37 +43,45 @@ public IConsumerBuilder Queue(IQueueSpecification queueSpec)
public IConsumerBuilder Queue(string queueName)
{
- _queue = queueName;
+ string address = new AddressBuilder().Queue(queueName).Address();
+ _configuration.Address = address;
return this;
}
public IConsumerBuilder MessageHandler(MessageHandler handler)
{
- _handler = handler;
+ _configuration.Handler = handler;
return this;
}
public IConsumerBuilder InitialCredits(int initialCredits)
{
- _initialCredits = initialCredits;
+ _configuration.InitialCredits = initialCredits;
+ return this;
+ }
+
+ public IConsumerBuilder SubscriptionListener(Action context)
+ {
+ _configuration.ListenerContext = context;
return this;
}
+
public IConsumerBuilder.IStreamOptions Stream()
{
- return new DefaultStreamOptions(this, _filters);
+ return new ConsumerBuilderStreamOptions(this, _configuration.Filters);
}
+
public async Task BuildAndStartAsync(CancellationToken cancellationToken = default)
{
- if (_handler is null)
+ if (_configuration.Handler is null)
{
throw new ConsumerException("Message handler is not set");
}
- string address = new AddressBuilder().Queue(_queue).Address();
- AmqpConsumer consumer = new(_connection, address, _handler, _initialCredits, _filters);
+ AmqpConsumer consumer = new(_configuration);
// TODO pass cancellationToken
await consumer.OpenAsync()
@@ -69,14 +91,18 @@ await consumer.OpenAsync()
}
}
- public class DefaultStreamOptions : IConsumerBuilder.IStreamOptions
+
+ ///
+ /// The base class for the stream options.
+ /// The class set the right filters used to create the consumer
+ /// See also and
+ ///
+ public abstract class StreamOptions : IConsumerBuilder.IStreamOptions
{
- private readonly IConsumerBuilder _consumerBuilder;
private readonly Map _filters;
- public DefaultStreamOptions(IConsumerBuilder consumerBuilder, Map filters)
+ protected StreamOptions(Map filters)
{
- _consumerBuilder = consumerBuilder;
_filters = filters;
}
@@ -86,57 +112,75 @@ public IConsumerBuilder.IStreamOptions Offset(long offset)
return this;
}
- // public IConsumerBuilder.IStreamOptions Offset(Instant timestamp)
- // {
- // notNull(timestamp, "Timestamp offset cannot be null");
- // this.offsetSpecification(JSType.Date.from(timestamp));
- // return this;
- // }
-
public IConsumerBuilder.IStreamOptions Offset(StreamOffsetSpecification specification)
{
- // notNull(specification, "Offset specification cannot be null");
OffsetSpecification(specification.ToString().ToLower());
return this;
}
public IConsumerBuilder.IStreamOptions Offset(string interval)
{
- // notNull(interval, "Interval offset cannot be null");
- // if (!Utils.validateMaxAge(interval))
- // {
- // throw new IllegalArgumentException(
- // "Invalid value for interval: "
- // + interval
- // + ". "
- // + "Valid examples are: 1Y, 7D, 10m. See https://www.rabbitmq.com/docs/streams#retention.");
- // }
-
OffsetSpecification(interval);
return this;
}
+ private void OffsetSpecification(object value)
+ {
+ _filters[new Symbol("rabbitmq:stream-offset-spec")] = value;
+ }
+
public IConsumerBuilder.IStreamOptions FilterValues(string[] values)
{
_filters[new Symbol("rabbitmq:stream-filter")] = values.ToList();
return this;
}
-
public IConsumerBuilder.IStreamOptions FilterMatchUnfiltered(bool matchUnfiltered)
{
_filters[new Symbol("rabbitmq:stream-match-unfiltered")] = matchUnfiltered;
return this;
}
- public IConsumerBuilder Builder()
+ public abstract IConsumerBuilder Builder();
+ }
+
+
+ ///
+ /// The stream options for the Subscribe Listener event.
+ /// For the user perspective, it is used to set the stream options for the listener
+ ///
+ public class ListenerStreamOptions : StreamOptions
+ {
+ public ListenerStreamOptions(Map filters) : base(filters)
{
- return _consumerBuilder;
}
- private void OffsetSpecification(object value)
+ ///
+ /// This method is not implemented for the listener stream options
+ /// Since it is not needed for the listener
+ ///
+ ///
+ ///
+ public override IConsumerBuilder Builder() => throw new NotImplementedException();
+ }
+
+ ///
+ /// The class that implements the stream options for the consumer builder
+ /// It is used to set the stream options for the consumer builder
+ ///
+ public class ConsumerBuilderStreamOptions : StreamOptions
+ {
+ private readonly IConsumerBuilder _consumerBuilder;
+
+ public ConsumerBuilderStreamOptions(IConsumerBuilder consumerBuilder, Map filters) : base(filters)
{
- _filters[new Symbol("rabbitmq:stream-offset-spec")] = value;
+ _consumerBuilder = consumerBuilder;
+ }
+
+
+ public override IConsumerBuilder Builder()
+ {
+ return _consumerBuilder;
}
}
}
diff --git a/RabbitMQ.AMQP.Client/PublicAPI.Unshipped.txt b/RabbitMQ.AMQP.Client/PublicAPI.Unshipped.txt
index 519edd8c..46998d31 100644
--- a/RabbitMQ.AMQP.Client/PublicAPI.Unshipped.txt
+++ b/RabbitMQ.AMQP.Client/PublicAPI.Unshipped.txt
@@ -1,5 +1,6 @@
#nullable enable
abstract RabbitMQ.AMQP.Client.Impl.AbstractLifeCycle.CloseAsync() -> System.Threading.Tasks.Task!
+abstract RabbitMQ.AMQP.Client.Impl.StreamOptions.Builder() -> RabbitMQ.AMQP.Client.IConsumerBuilder!
const RabbitMQ.AMQP.Client.Impl.Consts.Bindings = "bindings" -> string!
const RabbitMQ.AMQP.Client.Impl.Consts.DefaultMaxFrameSize = 0 -> uint
const RabbitMQ.AMQP.Client.Impl.Consts.Exchanges = "exchanges" -> string!
@@ -22,6 +23,8 @@ override RabbitMQ.AMQP.Client.Impl.BackOffDelayPolicy.ToString() -> string!
override RabbitMQ.AMQP.Client.Impl.ConnectionSettings.Equals(object? obj) -> bool
override RabbitMQ.AMQP.Client.Impl.ConnectionSettings.GetHashCode() -> int
override RabbitMQ.AMQP.Client.Impl.ConnectionSettings.ToString() -> string!
+override RabbitMQ.AMQP.Client.Impl.ConsumerBuilderStreamOptions.Builder() -> RabbitMQ.AMQP.Client.IConsumerBuilder!
+override RabbitMQ.AMQP.Client.Impl.ListenerStreamOptions.Builder() -> RabbitMQ.AMQP.Client.IConsumerBuilder!
override RabbitMQ.AMQP.Client.Impl.RecoveryConfiguration.ToString() -> string!
override RabbitMQ.AMQP.Client.SaslMechanism.Equals(object? obj) -> bool
override RabbitMQ.AMQP.Client.SaslMechanism.GetHashCode() -> int
@@ -117,11 +120,14 @@ RabbitMQ.AMQP.Client.IConsumerBuilder.IStreamOptions.FilterMatchUnfiltered(bool
RabbitMQ.AMQP.Client.IConsumerBuilder.IStreamOptions.FilterValues(string![]! values) -> RabbitMQ.AMQP.Client.IConsumerBuilder.IStreamOptions!
RabbitMQ.AMQP.Client.IConsumerBuilder.IStreamOptions.Offset(long offset) -> RabbitMQ.AMQP.Client.IConsumerBuilder.IStreamOptions!
RabbitMQ.AMQP.Client.IConsumerBuilder.IStreamOptions.Offset(RabbitMQ.AMQP.Client.StreamOffsetSpecification specification) -> RabbitMQ.AMQP.Client.IConsumerBuilder.IStreamOptions!
-RabbitMQ.AMQP.Client.IConsumerBuilder.IStreamOptions.Offset(string! interval) -> RabbitMQ.AMQP.Client.IConsumerBuilder.IStreamOptions!
+RabbitMQ.AMQP.Client.IConsumerBuilder.ListenerContext
+RabbitMQ.AMQP.Client.IConsumerBuilder.ListenerContext.ListenerContext(RabbitMQ.AMQP.Client.IConsumerBuilder.IStreamOptions! StreamOptions) -> void
+RabbitMQ.AMQP.Client.IConsumerBuilder.ListenerContext.StreamOptions.get -> RabbitMQ.AMQP.Client.IConsumerBuilder.IStreamOptions!
RabbitMQ.AMQP.Client.IConsumerBuilder.MessageHandler(RabbitMQ.AMQP.Client.MessageHandler! handler) -> RabbitMQ.AMQP.Client.IConsumerBuilder!
RabbitMQ.AMQP.Client.IConsumerBuilder.Queue(RabbitMQ.AMQP.Client.IQueueSpecification! queueSpecification) -> RabbitMQ.AMQP.Client.IConsumerBuilder!
RabbitMQ.AMQP.Client.IConsumerBuilder.Queue(string! queueName) -> RabbitMQ.AMQP.Client.IConsumerBuilder!
RabbitMQ.AMQP.Client.IConsumerBuilder.Stream() -> RabbitMQ.AMQP.Client.IConsumerBuilder.IStreamOptions!
+RabbitMQ.AMQP.Client.IConsumerBuilder.SubscriptionListener(System.Action! listenerContext) -> RabbitMQ.AMQP.Client.IConsumerBuilder!
RabbitMQ.AMQP.Client.IContext
RabbitMQ.AMQP.Client.IContext.AcceptAsync() -> System.Threading.Tasks.Task!
RabbitMQ.AMQP.Client.IContext.DiscardAsync() -> System.Threading.Tasks.Task!
@@ -226,7 +232,7 @@ RabbitMQ.AMQP.Client.Impl.AmqpConnection.Id.set -> void
RabbitMQ.AMQP.Client.Impl.AmqpConnection.Management() -> RabbitMQ.AMQP.Client.IManagement!
RabbitMQ.AMQP.Client.Impl.AmqpConnection.PublisherBuilder() -> RabbitMQ.AMQP.Client.IPublisherBuilder!
RabbitMQ.AMQP.Client.Impl.AmqpConsumer
-RabbitMQ.AMQP.Client.Impl.AmqpConsumer.AmqpConsumer(RabbitMQ.AMQP.Client.Impl.AmqpConnection! connection, string! address, RabbitMQ.AMQP.Client.MessageHandler! messageHandler, int initialCredits, Amqp.Types.Map! filters) -> void
+RabbitMQ.AMQP.Client.Impl.AmqpConsumer.AmqpConsumer(RabbitMQ.AMQP.Client.Impl.ConsumerConfiguration! configuration) -> void
RabbitMQ.AMQP.Client.Impl.AmqpConsumer.Pause() -> void
RabbitMQ.AMQP.Client.Impl.AmqpConsumer.Unpause() -> void
RabbitMQ.AMQP.Client.Impl.AmqpConsumer.UnsettledMessageCount.get -> long
@@ -238,6 +244,7 @@ RabbitMQ.AMQP.Client.Impl.AmqpConsumerBuilder.MessageHandler(RabbitMQ.AMQP.Clien
RabbitMQ.AMQP.Client.Impl.AmqpConsumerBuilder.Queue(RabbitMQ.AMQP.Client.IQueueSpecification! queueSpec) -> RabbitMQ.AMQP.Client.IConsumerBuilder!
RabbitMQ.AMQP.Client.Impl.AmqpConsumerBuilder.Queue(string! queueName) -> RabbitMQ.AMQP.Client.IConsumerBuilder!
RabbitMQ.AMQP.Client.Impl.AmqpConsumerBuilder.Stream() -> RabbitMQ.AMQP.Client.IConsumerBuilder.IStreamOptions!
+RabbitMQ.AMQP.Client.Impl.AmqpConsumerBuilder.SubscriptionListener(System.Action! context) -> RabbitMQ.AMQP.Client.IConsumerBuilder!
RabbitMQ.AMQP.Client.Impl.AmqpEnvironment
RabbitMQ.AMQP.Client.Impl.AmqpEnvironment.CloseAsync() -> System.Threading.Tasks.Task!
RabbitMQ.AMQP.Client.Impl.AmqpEnvironment.CreateConnectionAsync() -> System.Threading.Tasks.Task!
@@ -376,6 +383,21 @@ RabbitMQ.AMQP.Client.Impl.ConnectionSettings.UseSsl.get -> bool
RabbitMQ.AMQP.Client.Impl.ConnectionSettings.VirtualHost.get -> string!
RabbitMQ.AMQP.Client.Impl.Consts
RabbitMQ.AMQP.Client.Impl.Consts.Consts() -> void
+RabbitMQ.AMQP.Client.Impl.ConsumerBuilderStreamOptions
+RabbitMQ.AMQP.Client.Impl.ConsumerBuilderStreamOptions.ConsumerBuilderStreamOptions(RabbitMQ.AMQP.Client.IConsumerBuilder! consumerBuilder, Amqp.Types.Map! filters) -> void
+RabbitMQ.AMQP.Client.Impl.ConsumerConfiguration
+RabbitMQ.AMQP.Client.Impl.ConsumerConfiguration.Address.get -> string!
+RabbitMQ.AMQP.Client.Impl.ConsumerConfiguration.Address.set -> void
+RabbitMQ.AMQP.Client.Impl.ConsumerConfiguration.Connection.get -> RabbitMQ.AMQP.Client.Impl.AmqpConnection!
+RabbitMQ.AMQP.Client.Impl.ConsumerConfiguration.Connection.set -> void
+RabbitMQ.AMQP.Client.Impl.ConsumerConfiguration.ConsumerConfiguration() -> void
+RabbitMQ.AMQP.Client.Impl.ConsumerConfiguration.Filters.get -> Amqp.Types.Map!
+RabbitMQ.AMQP.Client.Impl.ConsumerConfiguration.Filters.set -> void
+RabbitMQ.AMQP.Client.Impl.ConsumerConfiguration.Handler.get -> RabbitMQ.AMQP.Client.MessageHandler?
+RabbitMQ.AMQP.Client.Impl.ConsumerConfiguration.Handler.set -> void
+RabbitMQ.AMQP.Client.Impl.ConsumerConfiguration.InitialCredits.get -> int
+RabbitMQ.AMQP.Client.Impl.ConsumerConfiguration.InitialCredits.set -> void
+RabbitMQ.AMQP.Client.Impl.ConsumerConfiguration.ListenerContext -> System.Action?
RabbitMQ.AMQP.Client.Impl.DefaultQueueInfo
RabbitMQ.AMQP.Client.Impl.DefaultQueueInfo.Arguments() -> System.Collections.Generic.Dictionary!
RabbitMQ.AMQP.Client.Impl.DefaultQueueInfo.AutoDelete() -> bool
@@ -387,18 +409,12 @@ RabbitMQ.AMQP.Client.Impl.DefaultQueueInfo.MessageCount() -> ulong
RabbitMQ.AMQP.Client.Impl.DefaultQueueInfo.Name() -> string!
RabbitMQ.AMQP.Client.Impl.DefaultQueueInfo.Replicas() -> System.Collections.Generic.List!
RabbitMQ.AMQP.Client.Impl.DefaultQueueInfo.Type() -> RabbitMQ.AMQP.Client.QueueType
-RabbitMQ.AMQP.Client.Impl.DefaultStreamOptions
-RabbitMQ.AMQP.Client.Impl.DefaultStreamOptions.Builder() -> RabbitMQ.AMQP.Client.IConsumerBuilder!
-RabbitMQ.AMQP.Client.Impl.DefaultStreamOptions.DefaultStreamOptions(RabbitMQ.AMQP.Client.IConsumerBuilder! consumerBuilder, Amqp.Types.Map! filters) -> void
-RabbitMQ.AMQP.Client.Impl.DefaultStreamOptions.FilterMatchUnfiltered(bool matchUnfiltered) -> RabbitMQ.AMQP.Client.IConsumerBuilder.IStreamOptions!
-RabbitMQ.AMQP.Client.Impl.DefaultStreamOptions.FilterValues(string![]! values) -> RabbitMQ.AMQP.Client.IConsumerBuilder.IStreamOptions!
-RabbitMQ.AMQP.Client.Impl.DefaultStreamOptions.Offset(long offset) -> RabbitMQ.AMQP.Client.IConsumerBuilder.IStreamOptions!
-RabbitMQ.AMQP.Client.Impl.DefaultStreamOptions.Offset(RabbitMQ.AMQP.Client.StreamOffsetSpecification specification) -> RabbitMQ.AMQP.Client.IConsumerBuilder.IStreamOptions!
-RabbitMQ.AMQP.Client.Impl.DefaultStreamOptions.Offset(string! interval) -> RabbitMQ.AMQP.Client.IConsumerBuilder.IStreamOptions!
RabbitMQ.AMQP.Client.Impl.FieldNotSetException
RabbitMQ.AMQP.Client.Impl.FieldNotSetException.FieldNotSetException() -> void
RabbitMQ.AMQP.Client.Impl.InvalidCodeException
RabbitMQ.AMQP.Client.Impl.InvalidCodeException.InvalidCodeException(string! message) -> void
+RabbitMQ.AMQP.Client.Impl.ListenerStreamOptions
+RabbitMQ.AMQP.Client.Impl.ListenerStreamOptions.ListenerStreamOptions(Amqp.Types.Map! filters) -> void
RabbitMQ.AMQP.Client.Impl.RecoveryConfiguration
RabbitMQ.AMQP.Client.Impl.RecoveryConfiguration.Activated(bool activated) -> RabbitMQ.AMQP.Client.IRecoveryConfiguration!
RabbitMQ.AMQP.Client.Impl.RecoveryConfiguration.BackOffDelayPolicy(RabbitMQ.AMQP.Client.IBackOffDelayPolicy! backOffDelayPolicy) -> RabbitMQ.AMQP.Client.IRecoveryConfiguration!
@@ -406,6 +422,13 @@ RabbitMQ.AMQP.Client.Impl.RecoveryConfiguration.GetBackOffDelayPolicy() -> Rabbi
RabbitMQ.AMQP.Client.Impl.RecoveryConfiguration.IsActivate() -> bool
RabbitMQ.AMQP.Client.Impl.RecoveryConfiguration.IsTopologyActive() -> bool
RabbitMQ.AMQP.Client.Impl.RecoveryConfiguration.Topology(bool activated) -> RabbitMQ.AMQP.Client.IRecoveryConfiguration!
+RabbitMQ.AMQP.Client.Impl.StreamOptions
+RabbitMQ.AMQP.Client.Impl.StreamOptions.FilterMatchUnfiltered(bool matchUnfiltered) -> RabbitMQ.AMQP.Client.IConsumerBuilder.IStreamOptions!
+RabbitMQ.AMQP.Client.Impl.StreamOptions.FilterValues(string![]! values) -> RabbitMQ.AMQP.Client.IConsumerBuilder.IStreamOptions!
+RabbitMQ.AMQP.Client.Impl.StreamOptions.Offset(long offset) -> RabbitMQ.AMQP.Client.IConsumerBuilder.IStreamOptions!
+RabbitMQ.AMQP.Client.Impl.StreamOptions.Offset(RabbitMQ.AMQP.Client.StreamOffsetSpecification specification) -> RabbitMQ.AMQP.Client.IConsumerBuilder.IStreamOptions!
+RabbitMQ.AMQP.Client.Impl.StreamOptions.Offset(string! interval) -> RabbitMQ.AMQP.Client.IConsumerBuilder.IStreamOptions!
+RabbitMQ.AMQP.Client.Impl.StreamOptions.StreamOptions(Amqp.Types.Map! filters) -> void
RabbitMQ.AMQP.Client.Impl.TlsSettings
RabbitMQ.AMQP.Client.Impl.TlsSettings.AcceptablePolicyErrors.get -> System.Net.Security.SslPolicyErrors
RabbitMQ.AMQP.Client.Impl.TlsSettings.AcceptablePolicyErrors.set -> void
diff --git a/Tests/Consumer/StreamConsumerTests.cs b/Tests/Consumer/StreamConsumerTests.cs
new file mode 100644
index 00000000..d61d2da8
--- /dev/null
+++ b/Tests/Consumer/StreamConsumerTests.cs
@@ -0,0 +1,146 @@
+using System;
+using System.Threading;
+using System.Threading.Tasks;
+using RabbitMQ.AMQP.Client;
+using RabbitMQ.AMQP.Client.Impl;
+using Xunit;
+using Xunit.Abstractions;
+
+namespace Tests.Consumer;
+
+///
+/// These tests are only for the streaming part of the consumer.
+/// we'd need to test the consumer itself in a different use cases
+/// like restarting from an offset with and without the subscription listener
+///
+public class StreamConsumerTests(ITestOutputHelper testOutputHelper) : IntegrationTest(testOutputHelper)
+{
+ ///
+ /// Given a stream, if the connection is killed the consumer will restart consuming from the beginning
+ /// because of Offset(StreamOffsetSpecification.First)
+ /// so the total number of consumed messages should be 10 two times = 20
+ ///
+ [Fact]
+ public async Task StreamConsumerBuilderShouldRestartFromTheBeginning()
+ {
+ Assert.NotNull(_connection);
+ Assert.NotNull(_management);
+
+ ManualResetEventSlim manualResetEvent = new(false);
+ var q = _management.Queue(_queueName).Stream().Queue();
+ await q.DeclareAsync();
+ await PublishAsync(q, 10);
+
+ int totalConsumed = 0;
+ IConsumer consumer = await _connection.ConsumerBuilder()
+ .Queue(_queueName).InitialCredits(10).MessageHandler(
+ async (context, message) =>
+ {
+ Interlocked.Increment(ref totalConsumed);
+ await context.AcceptAsync();
+ if (message.MessageId() == "9")
+ {
+ manualResetEvent.Set();
+ }
+ }
+ ).Stream().Offset(StreamOffsetSpecification.First).Builder().BuildAndStartAsync();
+
+ manualResetEvent.Wait(TimeSpan.FromSeconds(5));
+ manualResetEvent.Reset();
+ await SystemUtils.WaitUntilConnectionIsKilled(_containerId);
+ manualResetEvent.Wait(TimeSpan.FromSeconds(5));
+ Assert.Equal(20, totalConsumed);
+ await consumer.CloseAsync();
+ }
+
+
+ ///
+ /// This is a standard case for the stream consumer with SubscriptionListener
+ /// The consumer should start from the offset 5 and consume 5 messages
+ /// Since: ctx.StreamOptions.Offset(5)
+ ///
+ [Fact]
+ public async Task StreamConsumerBuilderShouldStartFromTheListenerConfiguration()
+ {
+ Assert.NotNull(_connection);
+ Assert.NotNull(_management);
+ ManualResetEventSlim manualResetEvent = new(false);
+ var q = _management.Queue(_queueName).Stream().Queue();
+ await q.DeclareAsync();
+ await PublishAsync(q, 10);
+ int totalConsumed = 0;
+ IConsumer consumer = await _connection.ConsumerBuilder()
+ .Queue(_queueName).Stream().FilterMatchUnfiltered(true).Offset(StreamOffsetSpecification.First).Builder()
+ .InitialCredits(10).MessageHandler(
+ async (context, message) =>
+ {
+ Interlocked.Increment(ref totalConsumed);
+ await context.AcceptAsync();
+ if (message.MessageId() == "9")
+ {
+ manualResetEvent.Set();
+ }
+ }
+ ).Stream().Builder().SubscriptionListener(
+ ctx => { ctx.StreamOptions.Offset(5); }
+ ).BuildAndStartAsync();
+
+ manualResetEvent.Wait(TimeSpan.FromSeconds(5));
+ Assert.Equal(5, totalConsumed);
+ await consumer.CloseAsync();
+ }
+
+ ///
+ /// In this test we simulate a listener that changes the offset after the connection is killed
+ /// We simulate this by changing the offset from 4 to 6 like loading the offset from an external storage
+ /// Each time the consumer is created the listener will change the offset and must be called to set the new offset
+ ///
+ [Fact]
+ public async Task StreamConsumerBuilderShouldStartFromTheListenerConfigurationWhenConnectionIsKilled()
+ {
+ Assert.NotNull(_connection);
+ Assert.NotNull(_management);
+ ManualResetEventSlim manualResetEvent = new(false);
+ var q = _management.Queue(_queueName).Stream().Queue();
+ await q.DeclareAsync();
+ await PublishAsync(q, 10);
+ int totalConsumed = 0;
+ int startFrom = 2;
+ IConsumer consumer = await _connection.ConsumerBuilder()
+ .Queue(_queueName).InitialCredits(10).MessageHandler(
+ async (context, message) =>
+ {
+ Interlocked.Increment(ref totalConsumed);
+ await context.AcceptAsync();
+ if (message.MessageId() == "9")
+ {
+ manualResetEvent.Set();
+ }
+ }
+ ).Stream()
+ .Offset(StreamOffsetSpecification
+ .First) // in this case this value is ignored because of the listener will replace it
+ .Builder().SubscriptionListener(
+ ctx =>
+ {
+ // Here we simulate a listener that changes the offset after the connection is killed
+ // Like loading the offset from an external storage
+ // so first start from 4 then start from 6
+ // Given 10 messages, we should consume 6 messages first time
+ // then 4 messages the second time the total should be 10
+ startFrom += 2;
+ ctx.StreamOptions.Offset(startFrom);
+ // In this case we should not be able to call the builder
+ Assert.Throws(() => ctx.StreamOptions.Builder());
+ }
+ ).BuildAndStartAsync();
+
+ manualResetEvent.Wait(TimeSpan.FromSeconds(5));
+ Assert.Equal(6, totalConsumed);
+ manualResetEvent.Reset();
+ await SystemUtils.WaitUntilConnectionIsKilled(_containerId);
+ manualResetEvent.Wait(TimeSpan.FromSeconds(5));
+ Assert.Equal(10, totalConsumed);
+ await consumer.CloseAsync();
+ }
+}