diff --git a/projects/client/ApigenBootstrap/RabbitMQ.Client.ApigenBootstrap.csproj b/projects/client/ApigenBootstrap/RabbitMQ.Client.ApigenBootstrap.csproj index 497575f879..ed0948a4ae 100644 --- a/projects/client/ApigenBootstrap/RabbitMQ.Client.ApigenBootstrap.csproj +++ b/projects/client/ApigenBootstrap/RabbitMQ.Client.ApigenBootstrap.csproj @@ -66,6 +66,9 @@ src\client\api\PublicationAddress.cs + + src\client\api\QueueDeclareResult.cs + src\client\api\ShutdownEventArgs.cs diff --git a/projects/client/RabbitMQ.Client/src/client/api/IModel.cs b/projects/client/RabbitMQ.Client/src/client/api/IModel.cs index a4c6671f9f..690c928f48 100644 --- a/projects/client/RabbitMQ.Client/src/client/api/IModel.cs +++ b/projects/client/RabbitMQ.Client/src/client/api/IModel.cs @@ -258,6 +258,11 @@ void ExchangeUnbind(string destination, string QueueDeclare(string queue, bool durable, bool exclusive, bool autoDelete, IDictionary arguments); + ///(Spec method) Declare a queue. + [AmqpMethodDoNotImplement(null)] + QueueDeclareResult QueueDeclareFull(string queue, bool passive, bool durable, bool exclusive, + bool autoDelete, bool nowait, IDictionary arguments); + ///(Spec method) Bind a queue to an exchange. [AmqpMethodDoNotImplement(null)] void QueueBind(string queue, @@ -665,15 +670,21 @@ void _Private_ExchangeUnbind(string destination, ///Used to send a Queue.Declare method. Called by the ///public declare method. [AmqpMethodMapping(null, "queue", "declare")] - [return: AmqpFieldMapping(null, "queue")] - string _Private_QueueDeclare(string queue, - bool passive, - bool durable, - bool exclusive, - bool autoDelete, - [AmqpNowaitArgument(null)] - bool nowait, - IDictionary arguments); + [AmqpForceOneWay] + void _Private_QueueDeclare(string queue, + bool passive, + bool durable, + bool exclusive, + bool autoDelete, + [AmqpNowaitArgument(null)] + bool nowait, + IDictionary arguments); + + ///Handle incoming Queue.DeclareOk methods. Routes the + ///information to a waiting Queue.DeclareOk continuation. + void HandleQueueDeclareOk(string queue, + uint messageCount, + uint consumerCount); ///Used to send a Queue.Bind method. Called by the ///public bind method. diff --git a/projects/client/RabbitMQ.Client/src/client/api/QueueDeclareResult.cs b/projects/client/RabbitMQ.Client/src/client/api/QueueDeclareResult.cs new file mode 100644 index 0000000000..0687dc3db0 --- /dev/null +++ b/projects/client/RabbitMQ.Client/src/client/api/QueueDeclareResult.cs @@ -0,0 +1,21 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; + +namespace RabbitMQ.Client +{ + public class QueueDeclareResult + { + public string Queue { get; private set; } + public uint MessageCount { get; private set; } + public uint ConsumerCount { get; private set; } + + public QueueDeclareResult(string queue, uint messageCount, uint consumerCount) + { + this.Queue = queue; + this.MessageCount = messageCount; + this.ConsumerCount = consumerCount; + } + } +} diff --git a/projects/client/RabbitMQ.Client/src/client/impl/ModelBase.cs b/projects/client/RabbitMQ.Client/src/client/impl/ModelBase.cs index cc40d4823c..3216317ad9 100644 --- a/projects/client/RabbitMQ.Client/src/client/impl/ModelBase.cs +++ b/projects/client/RabbitMQ.Client/src/client/impl/ModelBase.cs @@ -858,22 +858,46 @@ public string QueueDeclare() public string QueueDeclarePassive(string queue) { - return _Private_QueueDeclare(queue, true, false, false, false, false, null); + return QueueDeclareFull(queue, true, false, false, false, false, null).Queue; } public string QueueDeclare(string queue, bool durable, bool exclusive, bool autoDelete, IDictionary arguments) { - return _Private_QueueDeclare(queue, false, durable, exclusive, autoDelete, false, arguments); + return QueueDeclareFull(queue, false, durable, exclusive, autoDelete, false, arguments).Queue; } - public abstract string _Private_QueueDeclare(string queue, - bool passive, - bool durable, - bool exclusive, - bool autoDelete, - bool nowait, - IDictionary arguments); + public class QueueDeclareRpcContinuation : SimpleBlockingRpcContinuation + { + public QueueDeclareResult m_result; + public QueueDeclareRpcContinuation() { } + } + public QueueDeclareResult QueueDeclareFull(string queue, bool passive, bool durable, bool exclusive, + bool autoDelete, bool nowait, IDictionary arguments) + { + QueueDeclareRpcContinuation k = new QueueDeclareRpcContinuation(); + Enqueue(k); + try + { + _Private_QueueDeclare(queue, false, durable, exclusive, autoDelete, false, arguments); + } + catch (AlreadyClosedException) + { + // Ignored, since the continuation will be told about + // the closure via an OperationInterruptedException because + // of the shutdown event propagation. + } + k.GetReply(); + return k.m_result; + } + + public abstract void _Private_QueueDeclare(string queue, + bool passive, + bool durable, + bool exclusive, + bool autoDelete, + bool nowait, + IDictionary arguments); public void QueueBind(string queue, string exchange, @@ -1473,6 +1497,17 @@ public abstract void _Private_ConnectionClose(ushort replyCode, public abstract void _Private_ConnectionCloseOk(); + public void HandleQueueDeclareOk(string queue, + uint messageCount, + uint consumerCount) + { + QueueDeclareRpcContinuation k = (QueueDeclareRpcContinuation)m_continuationQueue.Next(); + k.m_result = new QueueDeclareResult(queue, + messageCount, + consumerCount); + k.HandleCommand(null); // release the continuation. + } + public override string ToString() { return m_session.ToString(); }