Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,9 @@
<Compile Include="..\RabbitMQ.Client\src\client\api\PublicationAddress.cs">
<Link>src\client\api\PublicationAddress.cs</Link>
</Compile>
<Compile Include="..\RabbitMQ.Client\src\client\api\QueueDeclareResult.cs">
<Link>src\client\api\QueueDeclareResult.cs</Link>
</Compile>
<Compile Include="..\RabbitMQ.Client\src\client\api\ShutdownEventArgs.cs">
<Link>src\client\api\ShutdownEventArgs.cs</Link>
</Compile>
Expand Down
29 changes: 20 additions & 9 deletions projects/client/RabbitMQ.Client/src/client/api/IModel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,11 @@ void ExchangeUnbind(string destination,
string QueueDeclare(string queue, bool durable, bool exclusive,
bool autoDelete, IDictionary arguments);

///<summary>(Spec method) Declare a queue.</summary>
[AmqpMethodDoNotImplement(null)]
QueueDeclareResult QueueDeclareFull(string queue, bool passive, bool durable, bool exclusive,
bool autoDelete, bool nowait, IDictionary arguments);

///<summary>(Spec method) Bind a queue to an exchange.</summary>
[AmqpMethodDoNotImplement(null)]
void QueueBind(string queue,
Expand Down Expand Up @@ -665,15 +670,21 @@ void _Private_ExchangeUnbind(string destination,
///<summary>Used to send a Queue.Declare method. Called by the
///public declare method.</summary>
[AmqpMethodMapping(null, "queue", "declare")]
[return: AmqpFieldMapping(null, "queue")]
string _Private_QueueDeclare(string queue,
bool passive,
bool durable,
bool exclusive,
bool autoDelete,
[AmqpNowaitArgument(null)]
bool nowait,
IDictionary arguments);
[AmqpForceOneWay]
void _Private_QueueDeclare(string queue,
bool passive,
bool durable,
bool exclusive,
bool autoDelete,
[AmqpNowaitArgument(null)]
bool nowait,
IDictionary arguments);

///<summary>Handle incoming Queue.DeclareOk methods. Routes the
///information to a waiting Queue.DeclareOk continuation.</summary>
void HandleQueueDeclareOk(string queue,
uint messageCount,
uint consumerCount);

///<summary>Used to send a Queue.Bind method. Called by the
///public bind method.</summary>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;

namespace RabbitMQ.Client
{
public class QueueDeclareResult
{
public string Queue { get; private set; }
public uint MessageCount { get; private set; }
public uint ConsumerCount { get; private set; }

public QueueDeclareResult(string queue, uint messageCount, uint consumerCount)
{
this.Queue = queue;
this.MessageCount = messageCount;
this.ConsumerCount = consumerCount;
}
}
}
53 changes: 44 additions & 9 deletions projects/client/RabbitMQ.Client/src/client/impl/ModelBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -858,22 +858,46 @@ public string QueueDeclare()

public string QueueDeclarePassive(string queue)
{
return _Private_QueueDeclare(queue, true, false, false, false, false, null);
return QueueDeclareFull(queue, true, false, false, false, false, null).Queue;
}

public string QueueDeclare(string queue, bool durable, bool exclusive,
bool autoDelete, IDictionary arguments)
{
return _Private_QueueDeclare(queue, false, durable, exclusive, autoDelete, false, arguments);
return QueueDeclareFull(queue, false, durable, exclusive, autoDelete, false, arguments).Queue;
}

public abstract string _Private_QueueDeclare(string queue,
bool passive,
bool durable,
bool exclusive,
bool autoDelete,
bool nowait,
IDictionary arguments);
public class QueueDeclareRpcContinuation : SimpleBlockingRpcContinuation
{
public QueueDeclareResult m_result;
public QueueDeclareRpcContinuation() { }
}
public QueueDeclareResult QueueDeclareFull(string queue, bool passive, bool durable, bool exclusive,
bool autoDelete, bool nowait, IDictionary arguments)
{
QueueDeclareRpcContinuation k = new QueueDeclareRpcContinuation();
Enqueue(k);
try
{
_Private_QueueDeclare(queue, false, durable, exclusive, autoDelete, false, arguments);
}
catch (AlreadyClosedException)
{
// Ignored, since the continuation will be told about
// the closure via an OperationInterruptedException because
// of the shutdown event propagation.
}
k.GetReply();
return k.m_result;
}

public abstract void _Private_QueueDeclare(string queue,
bool passive,
bool durable,
bool exclusive,
bool autoDelete,
bool nowait,
IDictionary arguments);

public void QueueBind(string queue,
string exchange,
Expand Down Expand Up @@ -1473,6 +1497,17 @@ public abstract void _Private_ConnectionClose(ushort replyCode,

public abstract void _Private_ConnectionCloseOk();

public void HandleQueueDeclareOk(string queue,
uint messageCount,
uint consumerCount)
{
QueueDeclareRpcContinuation k = (QueueDeclareRpcContinuation)m_continuationQueue.Next();
k.m_result = new QueueDeclareResult(queue,
messageCount,
consumerCount);
k.HandleCommand(null); // release the continuation.
}

public override string ToString() {
return m_session.ToString();
}
Expand Down