Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
using RabbitMQ.Util;
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;

namespace RabbitMQ.Client
Expand Down Expand Up @@ -48,12 +47,7 @@ public void ExecuteThunk()
}
catch (Exception)
{
#if NETFX_CORE
// To end a task, return
return;
#else
//Thread.CurrentThread.Interrupt(); //TODO: what to do?
#endif
}
}

Expand Down
203 changes: 58 additions & 145 deletions projects/client/RabbitMQ.Client/src/util/BatchingWorkPool.cs
Original file line number Diff line number Diff line change
Expand Up @@ -38,204 +38,117 @@
// Copyright (c) 2007-2016 Pivotal Software, Inc. All rights reserved.
//---------------------------------------------------------------------------

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Threading;

namespace RabbitMQ.Util
{
public class BatchingWorkPool<K, V>
{
private object lockObject = new object();
private IDictionary<K, BlockingCollection<V>> pool =
new Dictionary<K, BlockingCollection<V>>();
private SetQueue<K> ready = new SetQueue<K>();
private HashSet<K> inProgress = new HashSet<K>();

public BatchingWorkPool() {}
private ConcurrentDictionary<K, ConcurrentQueue<V>> pool = new ConcurrentDictionary<K, ConcurrentQueue<V>>();
private ConcurrentQueue<K> ready = new ConcurrentQueue<K>();

public bool AddWorkItem(K key, V item)
{
BlockingCollection<V> q;
lock(lockObject)
{
try
{
q = this.pool[key];
}
catch (KeyNotFoundException)
{
return false;
}
}
ConcurrentQueue<V> q;

#if NETFX_CORE
q.Add(item);
#else
try
if (!pool.TryGetValue(key, out q))
{
q.Add(item);
return false;
}
catch (Exception)
{
// most likely due to shutdown. ok.
}
#endif

lock (lockObject)
{
if (IsDormant(key))
{
DormantToReady(key);
return true;
}
}
return false;
q.Enqueue(item);
ready.Enqueue(key);

return true;
}

public void RegisterKey(K key)
{
lock(lockObject)
{
var q = new BlockingCollection<V>(new ConcurrentQueue<V>());
this.pool.Add(key, q);
}
var q = new ConcurrentQueue<V>();
pool.TryAdd(key, q);
}

public void UnregisterKey(K key)
{
lock (lockObject)
{
this.pool.Remove(key);
this.ready.Remove(key);
this.inProgress.Remove(key);
}
ConcurrentQueue<V> value;
pool.TryRemove(key, out value);
}

public void UnregisterAllKeys()
{
lock(lockObject)
{
this.pool.Clear();
this.ready.Clear();
this.inProgress.Clear();
}
pool.Clear();
}

public K NextWorkBlock(ref List<V> to, int size)
{
lock(lockObject)
K nextKey;
K result = default(K);
var dequeue = true;

while (dequeue)
{
K nextKey = this.ReadyToInProgress();
if(nextKey != null)
dequeue = ready.TryDequeue(out nextKey);

if (dequeue)
{
var q = this.pool[nextKey];
DrainTo(q, ref to, size);
ConcurrentQueue<V> q;

if (!pool.TryGetValue(nextKey, out q))
{
continue;
}

dequeue = false;

var count = DrainTo(q, ref to, size);

if (count != 0)
{
result = nextKey;
}
}
return nextKey;
}

return result;
}

public bool FinishWorkBlock(K key)
{
lock (lockObject)
ConcurrentQueue<V> q;

if (!pool.TryGetValue(key, out q))
{
if (!this.IsRegistered(key))
{
return false;
}
if (!this.inProgress.Contains(key))
{
throw new ArgumentException(String.Format("Client {0} not in progress"));
}
return false;
}

if (MoreWorkItems(key))
{
InProgressToReady(key);
return true;
}
else
{
InProgressToDormant(key);
return false;
}
if (!q.IsEmpty)
{
ready.Enqueue(key);
return true;
}

return false;
}

private int DrainTo(BlockingCollection<V> from, ref List<V> to, int maxElements)
private int DrainTo(ConcurrentQueue<V> from, ref List<V> to, int maxElements)
{
int n = 0;
while(n < maxElements)

while (n < maxElements)
{
V item;
if(!from.TryTake(out item))

if (!from.TryDequeue(out item))
{
break;
}
else
{
to.Add(item);
++n;
}
}
return n;
}

private bool IsReady(K key)
{
return this.ready.Contains(key);
}

private bool IsInProgress(K key)
{
return this.inProgress.Contains(key);
}

private bool IsRegistered(K key)
{
return this.pool.ContainsKey(key);
}

private bool IsDormant(K key)
{
return IsRegistered(key) && !IsInProgress(key) && !IsReady(key);
}

private K ReadyToInProgress()
{
K key = this.ready.Dequeue();
if(key == null)
{
return default(K);
}
else
{
this.inProgress.Add(key);
return key;
to.Add(item);
++n;
}
}

private void InProgressToReady(K key)
{
this.inProgress.Remove(key);
this.ready.Enqueue(key);
}

private void DormantToReady(K key)
{
this.ready.Enqueue(key);
}

private void InProgressToDormant(K key)
{
this.inProgress.Remove(key);
}

private bool MoreWorkItems(K key)
{
var xs = this.pool[key];
return ((xs != null) && !(xs.Count == 0));
return n;
}
}
}
12 changes: 6 additions & 6 deletions projects/client/Unit/src/unit/TestBatchingWorkPool.cs
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,8 @@ public void TestBasicInOut()
var k = "TestBasicInOut";

this.workPool.RegisterKey(k);
Assert.IsTrue(this.workPool.AddWorkItem(k, one), "AddWorkItem should return true for new registered keys");
Assert.IsFalse(this.workPool.AddWorkItem(k, two), "AddWorkItem should return false for existent keys");
Assert.IsTrue(this.workPool.AddWorkItem(k, one), "AddWorkItem should return true for registered keys");
Assert.IsTrue(this.workPool.AddWorkItem(k, two), "AddWorkItem should return true for registered keys");

var workList = new List<object>(16);
var key = this.workPool.NextWorkBlock(ref workList, 1);
Expand Down Expand Up @@ -107,7 +107,7 @@ public void TestWorkInWhileInProgress()
Assert.AreEqual(1, workList.Count, "Work list length should be 1");
Assert.AreEqual(one, workList[0]);

Assert.IsFalse(this.workPool.AddWorkItem(k, two));
Assert.IsTrue(this.workPool.AddWorkItem(k, two));
Assert.IsTrue(workPool.FinishWorkBlock(key));

workList.Clear();
Expand All @@ -132,7 +132,7 @@ public void TestInterleavingKeys()

Assert.IsTrue(this.workPool.AddWorkItem(k1, one));
Assert.IsTrue(this.workPool.AddWorkItem(k2, two));
Assert.IsFalse(this.workPool.AddWorkItem(k1, three));
Assert.IsTrue(this.workPool.AddWorkItem(k1, three));

var workList = new List<object>(16);
var key = this.workPool.NextWorkBlock(ref workList, 3);
Expand Down Expand Up @@ -163,7 +163,7 @@ public void TestUnregisterKey()

Assert.IsTrue(this.workPool.AddWorkItem(k1, one));
Assert.IsTrue(this.workPool.AddWorkItem(k2, two));
Assert.IsFalse(this.workPool.AddWorkItem(k1, three));
Assert.IsTrue(this.workPool.AddWorkItem(k1, three));

this.workPool.UnregisterKey(k1);

Expand All @@ -188,7 +188,7 @@ public void TestUnregisterAllKeys()

Assert.IsTrue(this.workPool.AddWorkItem(k1, one));
Assert.IsTrue(this.workPool.AddWorkItem(k2, two));
Assert.IsFalse(this.workPool.AddWorkItem(k1, three));
Assert.IsTrue(this.workPool.AddWorkItem(k1, three));

this.workPool.UnregisterAllKeys();

Expand Down