diff --git a/RabbitMQ.AMQP.Client/IPublisher.cs b/RabbitMQ.AMQP.Client/IPublisher.cs index d39a473..1f9cd8b 100644 --- a/RabbitMQ.AMQP.Client/IPublisher.cs +++ b/RabbitMQ.AMQP.Client/IPublisher.cs @@ -2,37 +2,40 @@ // and the Mozilla Public License, version 2.0. // Copyright (c) 2017-2024 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. -using System; using System.Threading; using System.Threading.Tasks; namespace RabbitMQ.AMQP.Client { - public class PublisherException : Exception - { - public PublisherException(string message) : base(message) - { - } - } - /// /// Represents the status of a publish operation. - /// Accepted: The message was accepted for publication. - /// Rejected: The message was rejected by the broker. - /// Released: The message was released by the broker. + /// See AMQP Outcomes. /// public enum OutcomeState { + /// + /// The message has been accepted by the broker. + /// Accepted, + + /// + /// At least one queue the message was routed to rejected the message. This happens when the + /// queue length is exceeded and the queue's overflow behaviour is set to reject-publish or when + /// a target classic queue is unavailable. + /// Rejected, - Released, + + /// + /// The broker could not route the message to any queue. + /// This is likely to be due to a topology misconfiguration. + /// + Released } /// - /// PublishOutcome represents the outcome of a publish operation. - /// It contains the state of the outcome and an error if the outcome is not successful. + /// Represents the outcome of a publish operation. + /// It contains the state of the outcome and an error if the outcome is not successful. /// - public class PublishOutcome { public PublishOutcome(OutcomeState state, Error? error) @@ -41,11 +44,21 @@ public PublishOutcome(OutcomeState state, Error? error) Error = error; } + /// + /// The . + /// public OutcomeState State { get; } + /// + /// The , if any. + /// public Error? Error { get; } } + /// + /// Represents the result of a publish operation. + /// It contains the and the original . + /// public class PublishResult { public PublishResult(IMessage message, PublishOutcome outcome) @@ -60,8 +73,8 @@ public PublishResult(IMessage message, PublishOutcome outcome) } /// - /// Interface for publishing messages to an AMQP broker. - /// Implementations of this interface are expected to be thread-safe. + /// Interface for publishing messages to an AMQP broker. + /// Implementations of this interface are expected to be thread-safe. /// public interface IPublisher : ILifeCycle { diff --git a/RabbitMQ.AMQP.Client/Impl/AmqpConsumer.cs b/RabbitMQ.AMQP.Client/Impl/AmqpConsumer.cs index c69298e..10330ac 100644 --- a/RabbitMQ.AMQP.Client/Impl/AmqpConsumer.cs +++ b/RabbitMQ.AMQP.Client/Impl/AmqpConsumer.cs @@ -14,6 +14,9 @@ namespace RabbitMQ.AMQP.Client.Impl { + /// + /// Implementation of . + /// public class AmqpConsumer : AbstractReconnectLifeCycle, IConsumer { private enum PauseStatus diff --git a/RabbitMQ.AMQP.Client/Impl/AmqpPublisher.cs b/RabbitMQ.AMQP.Client/Impl/AmqpPublisher.cs index c2a101f..405e8de 100644 --- a/RabbitMQ.AMQP.Client/Impl/AmqpPublisher.cs +++ b/RabbitMQ.AMQP.Client/Impl/AmqpPublisher.cs @@ -13,6 +13,9 @@ namespace RabbitMQ.AMQP.Client.Impl { + /// + /// Implementation of . + /// public class AmqpPublisher : AbstractReconnectLifeCycle, IPublisher { private readonly AmqpConnection _connection; @@ -30,6 +33,10 @@ public AmqpPublisher(AmqpConnection connection, string? address, IMetricsReporte _connection.AddPublisher(_id, this); } + /// + /// Open this publisher + /// + /// A representing the async operation. public override async Task OpenAsync() { try @@ -91,13 +98,11 @@ await base.OpenAsync() } /// - /// Publishes a message to the broker in an asynchronous manner. - /// The PublishResult is synchronous. In order to increase the performance - /// you can use more tasks to publish messages in parallel + /// Publishes a message to the broker asynchronously. /// /// /// - /// + /// A representating the await-able result of the publish operation. /// /// /// @@ -122,85 +127,89 @@ public Task PublishAsync(IMessage message, CancellationToken canc TaskCompletionSource publishResultTcs = Utils.CreateTaskCompletionSource(); - try + Message nativeMessage = ((AmqpMessage)message).NativeMessage; + + void OutcomeCallback(ILink sender, Message inMessage, Outcome outcome, object state) { - Message nativeMessage = ((AmqpMessage)message).NativeMessage; + // Note: sometimes `inMessage` is null 🤔 + Debug.Assert(Object.ReferenceEquals(this, state)); - void OutcomeCallback(ILink sender, Message inMessage, Outcome outcome, object state) + if (false == Object.ReferenceEquals(_senderLink, sender)) { - // Note: sometimes `message` is null 🤔 - Debug.Assert(Object.ReferenceEquals(this, state)); - - if (false == Object.ReferenceEquals(_senderLink, sender)) - { - // TODO log this case? - } - - PublishOutcome publishOutcome; - switch (outcome) - { - case Rejected rejectedOutcome: - { - const OutcomeState publishState = OutcomeState.Rejected; - publishOutcome = new PublishOutcome(publishState, - Utils.ConvertError(rejectedOutcome.Error)); - _metricsReporter?.PublishDisposition(IMetricsReporter.PublishDispositionValue.REJECTED); - break; - } - case Released: - { - const OutcomeState publishState = OutcomeState.Released; - publishOutcome = new PublishOutcome(publishState, null); - _metricsReporter?.PublishDisposition(IMetricsReporter.PublishDispositionValue.RELEASED); - break; - } - case Accepted: - { - const OutcomeState publishState = OutcomeState.Accepted; - publishOutcome = new PublishOutcome(publishState, null); - _metricsReporter?.PublishDisposition(IMetricsReporter.PublishDispositionValue.ACCEPTED); - break; - } - default: - { - throw new NotSupportedException(); - } - } + // TODO log this case? + } - // TODO cancellation token - if (_metricsReporter is not null && stopwatch is not null) - { - stopwatch.Stop(); - _metricsReporter.Published(stopwatch.Elapsed); - } + PublishOutcome publishOutcome; + switch (outcome) + { + case Rejected rejectedOutcome: + { + const OutcomeState publishState = OutcomeState.Rejected; + publishOutcome = new PublishOutcome(publishState, + Utils.ConvertError(rejectedOutcome.Error)); + _metricsReporter?.PublishDisposition(IMetricsReporter.PublishDispositionValue.REJECTED); + break; + } + case Released: + { + const OutcomeState publishState = OutcomeState.Released; + publishOutcome = new PublishOutcome(publishState, null); + _metricsReporter?.PublishDisposition(IMetricsReporter.PublishDispositionValue.RELEASED); + break; + } + case Accepted: + { + const OutcomeState publishState = OutcomeState.Accepted; + publishOutcome = new PublishOutcome(publishState, null); + _metricsReporter?.PublishDisposition(IMetricsReporter.PublishDispositionValue.ACCEPTED); + break; + } + default: + { + throw new NotSupportedException(); + } + } - var publishResult = new PublishResult(message, publishOutcome); - publishResultTcs.SetResult(publishResult); + // TODO cancellation token + if (_metricsReporter is not null && stopwatch is not null) + { + stopwatch.Stop(); + _metricsReporter.Published(stopwatch.Elapsed); } - /* - * Note: do NOT use SendAsync here as it prevents the Closed event from - * firing on the native connection. Bizarre, I know! - */ - _senderLink.Send(nativeMessage, OutcomeCallback, this); + var publishResult = new PublishResult(message, publishOutcome); + publishResultTcs.SetResult(publishResult); + } - return publishResultTcs.Task; + /* + * Note: do NOT use SendAsync here as it prevents the Closed event from + * firing on the native connection. Bizarre, I know! + */ + try + { + _senderLink.Send(nativeMessage, OutcomeCallback, this); } - catch (AmqpException ex) + catch (AmqpException amqpException) { stopwatch?.Stop(); _metricsReporter?.PublishDisposition(IMetricsReporter.PublishDispositionValue.REJECTED); - var publishOutcome = new PublishOutcome(OutcomeState.Rejected, Utils.ConvertError(ex.Error)); + var publishOutcome = new PublishOutcome(OutcomeState.Rejected, Utils.ConvertError(amqpException.Error)); var publishResult = new PublishResult(message, publishOutcome); publishResultTcs.SetResult(publishResult); - return publishResultTcs.Task; } - catch (Exception e) + catch (Exception ex) { - throw new PublisherException($"{ToString()} Failed to publish message, {e}"); + var publisherException = new PublisherException($"{ToString()} Failed to publish message", ex); + publishResultTcs.SetException(publisherException); } + + return publishResultTcs.Task; } + /// + /// Close this publisher + /// + /// A representing the async operation. public override async Task CloseAsync() { if (_senderLink is null) diff --git a/RabbitMQ.AMQP.Client/PublicAPI.Unshipped.txt b/RabbitMQ.AMQP.Client/PublicAPI.Unshipped.txt index ec25aa4..8a170bc 100644 --- a/RabbitMQ.AMQP.Client/PublicAPI.Unshipped.txt +++ b/RabbitMQ.AMQP.Client/PublicAPI.Unshipped.txt @@ -725,6 +725,7 @@ RabbitMQ.AMQP.Client.PreconditionFailedException RabbitMQ.AMQP.Client.PreconditionFailedException.PreconditionFailedException(string! message) -> void RabbitMQ.AMQP.Client.PublisherException RabbitMQ.AMQP.Client.PublisherException.PublisherException(string! message) -> void +RabbitMQ.AMQP.Client.PublisherException.PublisherException(string! message, System.Exception! innerException) -> void RabbitMQ.AMQP.Client.PublishOutcome RabbitMQ.AMQP.Client.PublishOutcome.Error.get -> RabbitMQ.AMQP.Client.Error? RabbitMQ.AMQP.Client.PublishOutcome.PublishOutcome(RabbitMQ.AMQP.Client.OutcomeState state, RabbitMQ.AMQP.Client.Error? error) -> void diff --git a/RabbitMQ.AMQP.Client/PublisherException.cs b/RabbitMQ.AMQP.Client/PublisherException.cs new file mode 100644 index 0000000..10acfdf --- /dev/null +++ b/RabbitMQ.AMQP.Client/PublisherException.cs @@ -0,0 +1,19 @@ +// This source code is dual-licensed under the Apache License, version 2.0, +// and the Mozilla Public License, version 2.0. +// Copyright (c) 2017-2024 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. + +using System; + +namespace RabbitMQ.AMQP.Client +{ + public class PublisherException : Exception + { + public PublisherException(string message) : base(message) + { + } + + public PublisherException(string message, Exception innerException) : base(message, innerException) + { + } + } +}