diff --git a/README.md b/README.md
index a52bb766..0d27efda 100644
--- a/README.md
+++ b/README.md
@@ -1,6 +1,8 @@
-# RabbitMQ Amqp1.0 DotNet Client
+# RabbitMQ AMQP 1.0 DotNet Client
-See the [internal documentation](https://docs.google.com/document/d/1afO2ugGpTIZYUeXH_0GtMxedV51ZzmsbC3-mRdoSI_o/edit#heading=h.kqd38uu4iku)
+
+This library is in early stages of development.
+It is meant to be used with RabbitMQ 4.0.
diff --git a/RabbitMQ.AMQP.Client/IClosable.cs b/RabbitMQ.AMQP.Client/IClosable.cs
index 0a67165b..84c1b5b2 100644
--- a/RabbitMQ.AMQP.Client/IClosable.cs
+++ b/RabbitMQ.AMQP.Client/IClosable.cs
@@ -1,10 +1,12 @@
namespace RabbitMQ.AMQP.Client;
-public enum Status
+public enum State
{
- Closed,
- Reconneting,
+ // Opening,
Open,
+ Reconnecting,
+ Closing,
+ Closed,
}
public class Error
@@ -15,13 +17,11 @@ public class Error
public interface IClosable
{
- public Status Status { get; }
+ public State State { get; }
Task CloseAsync();
- public delegate void ChangeStatusCallBack(object sender, Status from, Status to, Error? error);
+ public delegate void LifeCycleCallBack(object sender, State previousState, State currentState, Error? failureCause);
- event ChangeStatusCallBack ChangeStatus;
-
-
+ event LifeCycleCallBack ChangeState;
}
\ No newline at end of file
diff --git a/RabbitMQ.AMQP.Client/IEntities.cs b/RabbitMQ.AMQP.Client/IEntities.cs
index e029bec0..1a48f0e7 100644
--- a/RabbitMQ.AMQP.Client/IEntities.cs
+++ b/RabbitMQ.AMQP.Client/IEntities.cs
@@ -4,6 +4,11 @@ public interface IEntityInfo
{
}
+
+///
+/// Generic interface for declaring entities
+///
+///
public interface IEntityDeclaration where T : IEntityInfo
{
Task Declare();
diff --git a/RabbitMQ.AMQP.Client/IManagement.cs b/RabbitMQ.AMQP.Client/IManagement.cs
index 3b2fa4b5..ad5a8f7f 100644
--- a/RabbitMQ.AMQP.Client/IManagement.cs
+++ b/RabbitMQ.AMQP.Client/IManagement.cs
@@ -2,7 +2,7 @@ namespace RabbitMQ.AMQP.Client;
public class ModelException(string message) : Exception(message);
-public class PreconditionFailException(string message) : Exception(message);
+public class PreconditionFailedException(string message) : Exception(message);
public interface IManagement : IClosable
{
diff --git a/RabbitMQ.AMQP.Client/IRecoveryConfiguration.cs b/RabbitMQ.AMQP.Client/IRecoveryConfiguration.cs
index 0a88a8b0..7afeaaac 100644
--- a/RabbitMQ.AMQP.Client/IRecoveryConfiguration.cs
+++ b/RabbitMQ.AMQP.Client/IRecoveryConfiguration.cs
@@ -1,15 +1,63 @@
namespace RabbitMQ.AMQP.Client;
+
+///
+/// Interface for the recovery configuration.
+///
public interface IRecoveryConfiguration
{
+ ///
+ /// Define if the recovery is activated.
+ /// If is not activated the connection will not try to reconnect.
+ ///
+ ///
+ ///
IRecoveryConfiguration Activated(bool activated);
bool IsActivate();
- // IRecoveryConfiguration BackOffDelayPolicy(BackOffDelayPolicy backOffDelayPolicy);
+ ///
+ /// Define the backoff delay policy.
+ /// It is used when the connection is trying to reconnect.
+ ///
+ ///
+ ///
+ IRecoveryConfiguration BackOffDelayPolicy(IBackOffDelayPolicy backOffDelayPolicy);
+ ///
+ /// Define if the recovery of the topology is activated.
+ /// When Activated the connection will try to recover the topology after a reconnection.
+ /// It is valid only if the recovery is activated.
+ ///
+ ///
+ ///
IRecoveryConfiguration Topology(bool activated);
bool IsTopologyActive();
+}
+
+///
+/// Interface for the backoff delay policy.
+/// Used during the recovery of the connection.
+///
+public interface IBackOffDelayPolicy
+{
+ ///
+ /// Get the next delay in milliseconds.
+ ///
+ ///
+ int Delay();
+
+ ///
+ /// Reset the backoff delay policy.
+ ///
+ void Reset();
+
+ ///
+ /// Define if the backoff delay policy is active.
+ /// Can be used to disable the backoff delay policy after a certain number of retries.
+ /// or when the user wants to disable the backoff delay policy.
+ ///
+ bool IsActive { get; }
}
\ No newline at end of file
diff --git a/RabbitMQ.AMQP.Client/ITopologyListener.cs b/RabbitMQ.AMQP.Client/ITopologyListener.cs
index 82586414..0cf94f0e 100644
--- a/RabbitMQ.AMQP.Client/ITopologyListener.cs
+++ b/RabbitMQ.AMQP.Client/ITopologyListener.cs
@@ -5,4 +5,8 @@ public interface ITopologyListener
void QueueDeclared(IQueueSpecification specification);
void QueueDeleted(string name);
+
+ void Clear();
+
+ int QueueCount();
}
\ No newline at end of file
diff --git a/RabbitMQ.AMQP.Client/Impl/AmqpConnection.cs b/RabbitMQ.AMQP.Client/Impl/AmqpConnection.cs
index 9071a7ee..66662c91 100644
--- a/RabbitMQ.AMQP.Client/Impl/AmqpConnection.cs
+++ b/RabbitMQ.AMQP.Client/Impl/AmqpConnection.cs
@@ -8,32 +8,39 @@ internal class Visitor(AmqpManagement management) : IVisitor
{
private AmqpManagement Management { get; } = management;
-
- public void VisitQueues(List queueSpec)
+ public async Task VisitQueues(List queueSpec)
{
foreach (var spec in queueSpec)
{
Trace.WriteLine(TraceLevel.Information, $"Recovering queue {spec.Name}");
- Management.Queue(spec).Declare();
+ await Management.Queue(spec).Declare();
}
}
}
-
///
/// AmqpConnection is the concrete implementation of
/// It is a wrapper around the AMQP.Net Lite class
///
public class AmqpConnection : IConnection
{
+ private const string ConnectionNotRecoveredCode = "CONNECTION_NOT_RECOVERED";
+ private const string ConnectionNotRecoveredMessage = "Connection not recovered";
+
// The native AMQP.Net Lite connection
private Connection? _nativeConnection;
private readonly AmqpManagement _management = new();
+
+
private readonly RecordingTopologyListener _recordingTopologyListener = new();
+
private readonly ConnectionSettings _connectionSettings;
///
/// Creates a new instance of
+ /// Through the Connection is possible to create:
+ /// - Management. See
+ /// - Publishers and Consumers: TODO: Implement
///
///
///
@@ -41,6 +48,7 @@ public static async Task CreateAsync(ConnectionSettings connecti
{
var connection = new AmqpConnection(connectionSettings);
await connection.EnsureConnectionAsync();
+
return connection;
}
@@ -86,75 +94,116 @@ [new Symbol("connection_name")] = _connectionSettings.ConnectionName(),
_nativeConnection.AddClosedCallback(MaybeRecoverConnection());
}
- OnNewStatus(Status.Open, null);
+ OnNewStatus(State.Open, null);
}
catch (AmqpException e)
{
- throw new ConnectionException("AmqpException: Connection failed", e);
+ throw new ConnectionException($"AmqpException: Connection failed. Info: {ToString()} ", e);
}
catch (OperationCanceledException e)
{
// wrong virtual host
- throw new ConnectionException("OperationCanceledException: Connection failed", e);
+ throw new ConnectionException($"OperationCanceledException: Connection failed. Info: {ToString()}", e);
}
catch (NotSupportedException e)
{
// wrong schema
- throw new ConnectionException("NotSupportedException: Connection failed", e);
+ throw new ConnectionException($"NotSupportedException: Connection failed. Info: {ToString()}", e);
}
}
-
- private void OnNewStatus(Status newStatus, Error? error)
+
+ private void OnNewStatus(State newState, Error? error)
{
- if (Status == newStatus) return;
- var oldStatus = Status;
- Status = newStatus;
- ChangeStatus?.Invoke(this, oldStatus, newStatus, error);
+ if (State == newState) return;
+ var oldStatus = State;
+ State = newState;
+ ChangeState?.Invoke(this, oldStatus, newState, error);
}
private ClosedCallback MaybeRecoverConnection()
{
- return (sender, error) =>
+ return async (sender, error) =>
{
if (error != null)
{
- // TODO: Implement Dump Interface
- Trace.WriteLine(TraceLevel.Warning, $"connection is closed unexpected" +
- $"{sender} {error} {Status} " +
- $"{_nativeConnection!.IsClosed}");
+ Trace.WriteLine(TraceLevel.Warning, $"connection is closed unexpectedly. " +
+ $"Info: {ToString()}");
if (!_connectionSettings.RecoveryConfiguration.IsActivate())
{
- OnNewStatus(Status.Closed, Utils.ConvertError(error));
+ OnNewStatus(State.Closed, Utils.ConvertError(error));
return;
}
+ // TODO: Block the publishers and consumers
+ OnNewStatus(State.Reconnecting, Utils.ConvertError(error));
- OnNewStatus(Status.Reconneting, Utils.ConvertError(error));
-
- Thread.Sleep(1000);
- // TODO: Replace with Backoff pattern
- var t = Task.Run(async () =>
+ await Task.Run(async () =>
{
- Trace.WriteLine(TraceLevel.Information, "Recovering connection");
- await EnsureConnectionAsync();
- Trace.WriteLine(TraceLevel.Information, "Recovering topology");
+ var connected = false;
+ // as first step we try to recover the connection
+ // so the connected flag is false
+ while (!connected &&
+ // we have to check if the recovery is active.
+ // The user may want to disable the recovery mechanism
+ // the user can use the lifecycle callback to handle the error
+ _connectionSettings.RecoveryConfiguration.IsActivate() &&
+ // we have to check if the backoff policy is active
+ // the user may want to disable the backoff policy or
+ // the backoff policy is not active due of some condition
+ // for example: Reaching the maximum number of retries and avoid the forever loop
+ _connectionSettings.RecoveryConfiguration.GetBackOffDelayPolicy().IsActive)
+ {
+ try
+ {
+ var next = _connectionSettings.RecoveryConfiguration.GetBackOffDelayPolicy().Delay();
+ Trace.WriteLine(TraceLevel.Information,
+ $"Trying Recovering connection in {next} milliseconds. Info: {ToString()})");
+ await Task.Delay(
+ TimeSpan.FromMilliseconds(next));
+
+ await EnsureConnectionAsync();
+ connected = true;
+ }
+ catch (Exception e)
+ {
+ Trace.WriteLine(TraceLevel.Warning,
+ $"Error trying to recover connection {e}. Info: {this}");
+ }
+ }
+
+ _connectionSettings.RecoveryConfiguration.GetBackOffDelayPolicy().Reset();
+ var connectionDescription = connected ? "recovered" : "not recovered";
+ Trace.WriteLine(TraceLevel.Information,
+ $"Connection {connectionDescription}. Info: {ToString()}");
+
+ if (!connected)
+ {
+ Trace.WriteLine(TraceLevel.Verbose, $"connection is closed. Info: {ToString()}");
+ OnNewStatus(State.Closed, new Error()
+ {
+ Description =
+ $"{ConnectionNotRecoveredMessage}, recover status: {_connectionSettings.RecoveryConfiguration}",
+ ErrorCode = ConnectionNotRecoveredCode
+ });
+ return;
+ }
+
+
if (_connectionSettings.RecoveryConfiguration.IsTopologyActive())
{
- _recordingTopologyListener.Accept(new Visitor(_management));
+ Trace.WriteLine(TraceLevel.Information, $"Recovering topology. Info: {ToString()}");
+ await _recordingTopologyListener.Accept(new Visitor(_management));
}
});
- t.WaitAsync(TimeSpan.FromSeconds(10));
return;
}
- Trace.WriteLine(TraceLevel.Verbose, $"connection is closed" +
- $"{sender} {error} {Status} " +
- $"{_nativeConnection!.IsClosed}");
- OnNewStatus(Status.Closed, Utils.ConvertError(error));
+ Trace.WriteLine(TraceLevel.Verbose, $"connection is closed. Info: {ToString()}");
+ OnNewStatus(State.Closed, Utils.ConvertError(error));
};
}
@@ -166,12 +215,20 @@ private ClosedCallback MaybeRecoverConnection()
public async Task CloseAsync()
{
- OnNewStatus(Status.Closed, null);
+ _recordingTopologyListener.Clear();
+ if (State == State.Closed) return;
+ OnNewStatus(State.Closing, null);
if (_nativeConnection is { IsClosed: false }) await _nativeConnection.CloseAsync();
await _management.CloseAsync();
}
- public event IClosable.ChangeStatusCallBack? ChangeStatus;
+ public event IClosable.LifeCycleCallBack? ChangeState;
+
+ public State State { get; private set; } = State.Closed;
- public Status Status { get; private set; } = Status.Closed;
+ public override string ToString()
+ {
+ var info = $"AmqpConnection{{ConnectionSettings='{_connectionSettings}', Status='{State.ToString()}'}}";
+ return info;
+ }
}
\ No newline at end of file
diff --git a/RabbitMQ.AMQP.Client/Impl/AmqpManagement.cs b/RabbitMQ.AMQP.Client/Impl/AmqpManagement.cs
index b4b08054..a9cd43be 100644
--- a/RabbitMQ.AMQP.Client/Impl/AmqpManagement.cs
+++ b/RabbitMQ.AMQP.Client/Impl/AmqpManagement.cs
@@ -14,6 +14,8 @@ namespace RabbitMQ.AMQP.Client.Impl;
///
public class AmqpManagement : IManagement
{
+ // The requests are stored in a dictionary with the correlationId as the key
+ // The correlationId is used to match the request with the response
private readonly ConcurrentDictionary> _requests = new();
// private static readonly long IdSequence = 0;
@@ -32,7 +34,7 @@ public class AmqpManagement : IManagement
private const string ReplyTo = "$me";
- public virtual Status Status { get; protected set; } = Status.Closed;
+ public virtual State State { get; protected set; } = State.Closed;
public IQueueSpecification Queue()
@@ -71,7 +73,7 @@ public ITopologyListener TopologyListener()
internal void Init(AmqpManagementParameters parameters)
{
- if (Status == Status.Open)
+ if (State == State.Open)
return;
_amqpConnection = parameters.Connection();
@@ -85,45 +87,47 @@ internal void Init(AmqpManagementParameters parameters)
// TODO: find a better way to ensure that the sender link is open before the receiver link
Thread.Sleep(500);
EnsureReceiverLink();
- _ = Task.Run(async () =>
- {
- try
- {
- while (_managementSession.IsClosed == false &&
- _amqpConnection.NativeConnection()!.IsClosed == false)
- {
- if (_receiverLink == null) continue;
- var msg = await _receiverLink.ReceiveAsync();
- if (msg == null)
- {
- Trace.WriteLine(TraceLevel.Warning, "Received null message");
- continue;
- }
-
- _receiverLink.Accept(msg);
- HandleResponseMessage(msg);
- msg.Dispose();
- }
- }
- catch (Exception e)
- {
- Trace.WriteLine(TraceLevel.Error,
- $"Receiver link error in management session {e}. Receiver link closed: {_receiverLink?.IsClosed}");
- }
-
- Trace.WriteLine(TraceLevel.Information, "AMQP Management session closed");
- });
+ _ = Task.Run(async () => { await ProcessResponses(); });
_managementSession.Closed += (sender, error) =>
{
Trace.WriteLine(TraceLevel.Warning, $"Management session closed " +
$"sender: {sender} error: {error} " +
- $"Amqp Status:{Status} senderLink closed: {_senderLink?.IsClosed}" +
+ $"Amqp Status:{State} senderLink closed: {_senderLink?.IsClosed}" +
$"_receiverLink closed: {_receiverLink?.IsClosed} " +
$"_managementSession is closed: {_managementSession.IsClosed}" +
$"native connection is closed: {_amqpConnection.NativeConnection()!.IsClosed}");
- OnNewStatus(Status.Closed, Utils.ConvertError(error));
+ OnNewStatus(State.Closed, Utils.ConvertError(error));
};
- OnNewStatus(Status.Open, null);
+ OnNewStatus(State.Open, null);
+ }
+
+ private async Task ProcessResponses()
+ {
+ try
+ {
+ while (_managementSession?.IsClosed == false &&
+ _amqpConnection?.NativeConnection()!.IsClosed == false)
+ {
+ if (_receiverLink == null) continue;
+ var msg = await _receiverLink.ReceiveAsync();
+ if (msg == null)
+ {
+ Trace.WriteLine(TraceLevel.Warning, "Received null message");
+ continue;
+ }
+
+ _receiverLink.Accept(msg);
+ HandleResponseMessage(msg);
+ msg.Dispose();
+ }
+ }
+ catch (Exception e)
+ {
+ Trace.WriteLine(TraceLevel.Error,
+ $"Receiver link error in management session {e}. Receiver link closed: {_receiverLink?.IsClosed}");
+ }
+
+ Trace.WriteLine(TraceLevel.Information, "AMQP Management session closed");
}
@@ -157,11 +161,11 @@ private void EnsureReceiverLink()
}
}
- private void OnNewStatus(Status newStatus, Error? error)
+ private void OnNewStatus(State newState, Error? error)
{
- var oldStatus = Status;
- Status = newStatus;
- ChangeStatus?.Invoke(this, oldStatus, Status, error);
+ var oldStatus = State;
+ State = newState;
+ ChangeState?.Invoke(this, oldStatus, State, error);
}
private void EnsureSenderLink()
@@ -207,12 +211,12 @@ protected void HandleResponseMessage(Message msg)
{
if (mre.TrySetResult(msg))
{
- Trace.WriteLine(TraceLevel.Information, $"Set result for {msg.Properties.CorrelationId}");
+ Trace.WriteLine(TraceLevel.Verbose, $"Set result for: {msg.Properties.CorrelationId}");
}
}
else
{
- Trace.WriteLine(TraceLevel.Error, $"No request found for message {msg.Properties.CorrelationId}");
+ Trace.WriteLine(TraceLevel.Error, $"No request found for message: {msg.Properties.CorrelationId}");
}
}
@@ -238,16 +242,32 @@ internal async ValueTask Request(string id, object? body, string path,
return await Request(message, expectedResponseCodes, timeout);
}
+ ///
+ /// Core function to send a request and wait for the response
+ /// The request is an AMQP message with the following properties:
+ /// - Properties.MessageId: Mandatory to identify the request
+ /// - Properties.To: The path of the request, for example "/queues/my-queue"
+ /// - Properties.Subject: The method of the request, for example "PUT"
+ /// - Properties.ReplyTo: The address where the response will be sent. Default is: "$me"
+ /// - Body: The body of the request. For example the QueueSpec to create a queue
+ ///
+ /// Request Message. Contains all the info to create/delete a resource
+ /// The response codes expected for a specific call. See Code* Constants
+ /// Default timeout for a request
+ /// A message with the Info response. For example in case of Queue creation is DefaultQueueInfo
+ /// Application errors, see
internal async ValueTask Request(Message message, int[] expectedResponseCodes, TimeSpan? timeout = null)
{
- if (Status != Status.Open)
+ if (State != State.Open)
{
throw new ModelException("Management is not open");
}
TaskCompletionSource mre = new(TaskCreationOptions.RunContinuationsAsynchronously);
+ // Add TaskCompletionSource to the dictionary
_requests.TryAdd(message.Properties.MessageId, mre);
- using var cts = new CancellationTokenSource(timeout ?? TimeSpan.FromSeconds(5));
+ using var cts =
+ new CancellationTokenSource(timeout ?? TimeSpan.FromSeconds(5)); // TODO: make the timeout configurable
await using (cts.Token.Register(
() =>
{
@@ -256,23 +276,39 @@ internal async ValueTask Request(Message message, int[] expectedRespons
}))
{
await InternalSendAsync(message);
+
+ // The response is handled in a separate thread, see ProcessResponses method in the Init method
var result = await mre.Task.WaitAsync(cts.Token);
+ // Check the responses and throw exceptions if needed.
+
CheckResponse(message, expectedResponseCodes, result);
return result;
}
}
+ ///
+ /// Check the response of a request and throw exceptions if needed
+ ///
+ /// The message sent
+ /// The expected response codes
+ /// The message received from the server
+ ///
+ ///
+ ///
internal void CheckResponse(Message sentMessage, int[] expectedResponseCodes, Message receivedMessage)
{
+ // Check if the response code is a number
+ // by protocol the response code is in the Subject property
if (!int.TryParse(receivedMessage.Properties.Subject, out var responseCode))
throw new ModelException($"Response code is not a number {receivedMessage.Properties.Subject}");
switch (responseCode)
{
case Code409:
- throw new PreconditionFailException($"Precondition Fail. Message: {receivedMessage.Body}");
+ throw new PreconditionFailedException($"Precondition Fail. Message: {receivedMessage.Body}");
}
+ // Check if the correlationId is the same as the messageId
if (sentMessage.Properties.MessageId != receivedMessage.Properties.CorrelationId)
throw new ModelException(
$"CorrelationId does not match, expected {sentMessage.Properties.MessageId} but got {receivedMessage.Properties.CorrelationId}");
@@ -294,14 +330,14 @@ protected virtual async Task InternalSendAsync(Message message)
public async Task CloseAsync()
{
- Status = Status.Closed;
+ State = State.Closed;
if (_managementSession is { IsClosed: false })
{
await _managementSession.CloseAsync();
}
}
- public event IClosable.ChangeStatusCallBack? ChangeStatus;
+ public event IClosable.LifeCycleCallBack? ChangeState;
}
public class InvalidCodeException(string message) : Exception(message);
\ No newline at end of file
diff --git a/RabbitMQ.AMQP.Client/Impl/AmqpQueueSpecification.cs b/RabbitMQ.AMQP.Client/Impl/AmqpQueueSpecification.cs
index aaf6667a..e5783ff7 100644
--- a/RabbitMQ.AMQP.Client/Impl/AmqpQueueSpecification.cs
+++ b/RabbitMQ.AMQP.Client/Impl/AmqpQueueSpecification.cs
@@ -138,7 +138,6 @@ public IQueueSpecification AutoDelete(bool autoDelete)
}
-
public IQueueSpecification Arguments(Dictionary