Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion RabbitMQ.AMQP.Client/IConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ namespace RabbitMQ.AMQP.Client;

public class ConnectionException(string? message) : Exception(message);

public interface IConnection : IResourceStatus, IClosable
public interface IConnection : ILifeCycle
{
IManagement Management();

Expand Down
2 changes: 1 addition & 1 deletion RabbitMQ.AMQP.Client/IConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ namespace RabbitMQ.AMQP.Client;
public class ConsumerException(string message) : Exception(message);
public delegate void MessageHandler(IContext context, IMessage message);

public interface IConsumer : IResourceStatus, IClosable
public interface IConsumer : ILifeCycle
{
void Pause();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ public enum State
Closed,
}


public class Error(string? errorCode, string? description)
{
public string? Description { get; } = description;
Expand All @@ -20,19 +21,15 @@ public override string ToString()
}
}

public interface IClosable // TODO: Create an abstract class with the event and the State property
{
Task CloseAsync();

}

public delegate void LifeCycleCallBack(object sender, State previousState, State currentState, Error? failureCause);

public interface IResourceStatus
public interface ILifeCycle
{
Task CloseAsync();

public State State { get; }



event LifeCycleCallBack ChangeState;

}
2 changes: 1 addition & 1 deletion RabbitMQ.AMQP.Client/IManagement.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ public class ModelException(string message) : Exception(message);

public class PreconditionFailedException(string message) : Exception(message);

public interface IManagement : IResourceStatus, IClosable
public interface IManagement : ILifeCycle
{
IQueueSpecification Queue();
IQueueSpecification Queue(string name);
Expand Down
2 changes: 1 addition & 1 deletion RabbitMQ.AMQP.Client/IPublisher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ public class OutcomeDescriptor(ulong code, string description, OutcomeState stat

public delegate void OutcomeDescriptorCallback(IMessage message, OutcomeDescriptor outcomeDescriptor);

public interface IPublisher : IResourceStatus, IClosable
public interface IPublisher : ILifeCycle
{
Task Publish(IMessage message,
OutcomeDescriptorCallback outcomeCallback); // TODO: Add CancellationToken and callBack
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,18 @@ namespace RabbitMQ.AMQP.Client.Impl;

public class AmqpClosedException(string message) : Exception(message);

public abstract class AbstractResourceStatus : IResourceStatus
public abstract class AbstractLifeCycle : ILifeCycle
{
protected virtual Task OpenAsync()
{
OnNewStatus(State.Open, null);
return Task.CompletedTask;
}

public abstract Task CloseAsync();

public State State { get; internal set; } = State.Closed;

protected void ThrowIfClosed()
{
if (State == State.Closed)
Expand All @@ -13,6 +22,9 @@ protected void ThrowIfClosed()
}
}

// wait until the close operation is completed
protected readonly TaskCompletionSource<bool> ConnectionCloseTaskCompletionSource =
new(TaskCreationOptions.RunContinuationsAsynchronously);

protected void OnNewStatus(State newState, Error? error)
{
Expand Down
99 changes: 40 additions & 59 deletions RabbitMQ.AMQP.Client/Impl/AmqpConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ await Management.Queue(spec).Declare()
/// AmqpConnection is the concrete implementation of <see cref="IConnection"/>
/// It is a wrapper around the AMQP.Net Lite <see cref="Connection"/> class
/// </summary>
public class AmqpConnection : AbstractResourceStatus, IConnection
public class AmqpConnection : AbstractLifeCycle, IConnection
{
private const string ConnectionNotRecoveredCode = "CONNECTION_NOT_RECOVERED";
private const string ConnectionNotRecoveredMessage = "Connection not recovered";
Expand All @@ -43,14 +43,12 @@ public class AmqpConnection : AbstractResourceStatus, IConnection
// The native AMQP.Net Lite connection
private Connection? _nativeConnection;

private readonly AmqpManagement _management = new();
private readonly AmqpManagement _management;
private readonly RecordingTopologyListener _recordingTopologyListener = new();

private readonly TaskCompletionSource<bool> _connectionCloseTaskCompletionSource =
new(TaskCreationOptions.RunContinuationsAsynchronously);

private readonly ConnectionSettings _connectionSettings;
internal readonly AmqpSessionManagement NativePubSubSessions;
internal readonly AmqpSessionManagement _nativePubSubSessions;

// TODO: Implement the semaphore to avoid multiple connections
// private readonly SemaphoreSlim _semaphore = new(1, 1);
Expand All @@ -76,34 +74,18 @@ public ReadOnlyCollection<IPublisher> GetPublishers()
/// Creates a new instance of <see cref="AmqpConnection"/>
/// Through the Connection is possible to create:
/// - Management. See <see cref="AmqpManagement"/>
/// - Publishers and Consumers: TODO: Implement
/// - Publishers and Consumers: See <see cref="AmqpPublisherBuilder"/> and <see cref="AmqpConsumerBuilder"/>
/// </summary>
/// <param name="connectionSettings"></param>
/// <returns></returns>
public static async Task<IConnection> CreateAsync(ConnectionSettings connectionSettings)
{
var connection = new AmqpConnection(connectionSettings);
await connection.ConnectAsync()
await connection.OpenAsync()
.ConfigureAwait(false);
return connection;
}

private void PauseAllPublishers()
{
foreach (AmqpPublisher publisher in Publishers.Values)
{
publisher.PausePublishing();
}
}

private void ResumeAllPublishers()
{
foreach (AmqpPublisher publisher in Publishers.Values)
{
publisher.ResumePublishing();
}
}


/// <summary>
/// Closes all the publishers. It is called when the connection is closed.
Expand Down Expand Up @@ -133,7 +115,9 @@ await consumer.CloseAsync()
private AmqpConnection(ConnectionSettings connectionSettings)
{
_connectionSettings = connectionSettings;
NativePubSubSessions = new AmqpSessionManagement(this, 1);
_nativePubSubSessions = new AmqpSessionManagement(this, 1);
_management =
new AmqpManagement(new AmqpManagementParameters(this).TopologyListener(_recordingTopologyListener));
}

public IManagement Management()
Expand All @@ -146,50 +130,49 @@ public IConsumerBuilder ConsumerBuilder()
return new AmqpConsumerBuilder(this);
}

private Task ConnectAsync()
protected override Task OpenAsync()
{
EnsureConnection();
OnNewStatus(State.Open, null);
return Task.CompletedTask;
return base.OpenAsync();
}

private void EnsureConnection()
{
// await _semaphore.WaitAsync();
try
{
if (_nativeConnection == null || _nativeConnection.IsClosed)
if (_nativeConnection is { IsClosed: false })
{
var open = new Open
{
HostName = $"vhost:{_connectionSettings.VirtualHost()}",
Properties = new Fields()
{
[new Symbol("connection_name")] = _connectionSettings.ConnectionName(),
}
};

var manualReset = new ManualResetEvent(false);
// TODO ConnectionFactory.CreateAsync
_nativeConnection = new Connection(_connectionSettings.Address, null, open, (connection, open1) =>
{
manualReset.Set();
Trace.WriteLine(TraceLevel.Information, $"Connection opened. Info: {ToString()}");
OnNewStatus(State.Open, null);
});
return;
}

manualReset.WaitOne(TimeSpan.FromSeconds(5));
if (_nativeConnection.IsClosed)
var open = new Open
{
HostName = $"vhost:{_connectionSettings.VirtualHost()}",
Properties = new Fields()
{
throw new ConnectionException(
$"Connection failed. Info: {ToString()}, error: {_nativeConnection.Error}");
[new Symbol("connection_name")] = _connectionSettings.ConnectionName(),
}
};

_management.Init(
new AmqpManagementParameters(this).TopologyListener(_recordingTopologyListener));
var manualReset = new ManualResetEvent(false);
_nativeConnection = new Connection(_connectionSettings.Address, null, open, (connection, open1) =>
{
manualReset.Set();
Trace.WriteLine(TraceLevel.Verbose, $"Connection opened. Info: {ToString()}");
OnNewStatus(State.Open, null);
});

_nativeConnection.Closed += MaybeRecoverConnection();
manualReset.WaitOne(TimeSpan.FromSeconds(5));
if (_nativeConnection.IsClosed)
{
throw new ConnectionException(
$"Connection failed. Info: {ToString()}, error: {_nativeConnection.Error}");
}

_management.Init();

_nativeConnection.Closed += MaybeRecoverConnection();
}

catch (AmqpException e)
Expand Down Expand Up @@ -310,7 +293,7 @@ await _recordingTopologyListener.Accept(visitor)
_semaphoreClose.Release();
}

_connectionCloseTaskCompletionSource.SetResult(true);
ConnectionCloseTaskCompletionSource.SetResult(true);
};
}

Expand All @@ -327,7 +310,7 @@ public IPublisherBuilder PublisherBuilder()
}


public async Task CloseAsync()
public override async Task CloseAsync()
{
await _semaphoreClose.WaitAsync()
.ConfigureAwait(false);
Expand All @@ -347,21 +330,19 @@ await _semaphoreClose.WaitAsync()

await _management.CloseAsync()
.ConfigureAwait(false);

if (_nativeConnection is { IsClosed: false })
{
await _nativeConnection.CloseAsync()
.ConfigureAwait(false);
}


}
finally
{
_semaphoreClose.Release();
}

await _connectionCloseTaskCompletionSource.Task.WaitAsync(TimeSpan.FromSeconds(10))
await ConnectionCloseTaskCompletionSource.Task.WaitAsync(TimeSpan.FromSeconds(10))
.ConfigureAwait(false);

OnNewStatus(State.Closed, null);
Expand All @@ -370,7 +351,7 @@ await _connectionCloseTaskCompletionSource.Task.WaitAsync(TimeSpan.FromSeconds(1

public override string ToString()
{
var info = $"AmqpConnection{{ConnectionSettings='{_connectionSettings}', Status='{State.ToString()}'}}";
string info = $"AmqpConnection{{ConnectionSettings='{_connectionSettings}', Status='{State.ToString()}'}}";
return info;
}
}
16 changes: 9 additions & 7 deletions RabbitMQ.AMQP.Client/Impl/AmqpConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

namespace RabbitMQ.AMQP.Client.Impl;

public class AmqpConsumer : AbstractResourceStatus, IConsumer
public class AmqpConsumer : AbstractLifeCycle, IConsumer
{
private readonly AmqpConnection _connection;
private readonly string _address;
Expand All @@ -20,17 +20,17 @@ public AmqpConsumer(AmqpConnection connection, string address,
_messageHandler = messageHandler;
_initialCredits = initialCredits;
_filters = filters;
Connect();
_connection.Consumers.TryAdd(Id, this); // TODO: Close all consumers on connection close
OpenAsync();
_connection.Consumers.TryAdd(Id, this);
}


private void Connect()
protected sealed override Task OpenAsync()
{
try
{
var attachCompleted = new ManualResetEvent(false);
_receiverLink = new ReceiverLink(_connection.NativePubSubSessions.GetOrCreateSession(), Id,
_receiverLink = new ReceiverLink(_connection._nativePubSubSessions.GetOrCreateSession(), Id,
Utils.CreateAttach(_address, DeliveryMode.AtLeastOnce, Id, _filters),
(link, attach) => { attachCompleted.Set(); });

Expand All @@ -48,6 +48,8 @@ private void Connect()
{
throw new ConsumerException($"Failed to create receiver link, {e}");
}

return Task.CompletedTask;
}

private void ProcessMessages()
Expand All @@ -61,7 +63,7 @@ private void ProcessMessages()
});
}

public string Id { get; } = Guid.NewGuid().ToString();
private string Id { get; } = Guid.NewGuid().ToString();

public void Pause()
{
Expand All @@ -79,7 +81,7 @@ public void Unpause()
}


public async Task CloseAsync()
public override async Task CloseAsync()
{
if (_receiverLink == null)
{
Expand Down
Loading