Skip to content
46 changes: 25 additions & 21 deletions modules/queue/unique_queue_disk_channel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"fmt"
"strconv"
"sync"
"sync/atomic"
"testing"
"time"

Expand Down Expand Up @@ -97,7 +98,7 @@ func TestPersistableChannelUniqueQueue(t *testing.T) {
executedInitial := map[string][]string{}
hasInitial := map[string][]string{}

fillQueue := func(name string, done chan struct{}) {
fillQueue := func(name string, done chan int) {
t.Run("Initial Filling: "+name, func(t *testing.T) {
lock := sync.Mutex{}

Expand Down Expand Up @@ -154,33 +155,36 @@ func TestPersistableChannelUniqueQueue(t *testing.T) {
assert.Equal(t, 101, len(executedInitial[name])+len(hasInitial[name]))
mapLock.Unlock()
})
done <- len(hasInitial[name])
close(done)
}

doneA := make(chan struct{})
doneB := make(chan struct{})
hasQueueAChan := make(chan int)
hasQueueBChan := make(chan int)

go fillQueue("QueueA", doneA)
go fillQueue("QueueB", doneB)
go fillQueue("QueueA", hasQueueAChan)
go fillQueue("QueueB", hasQueueBChan)

<-doneA
<-doneB
hasA := <-hasQueueAChan
hasB := <-hasQueueBChan

executedEmpty := map[string][]string{}
hasEmpty := map[string][]string{}
emptyQueue := func(name string, done chan struct{}) {
emptyQueue := func(name string, numInQueue int64, done chan struct{}) {
t.Run("Empty Queue: "+name, func(t *testing.T) {
lock := sync.Mutex{}
stop := make(chan struct{})

// collect the tasks that have been executed
handle := func(data ...Data) []Data {
lock.Lock()
i := int64(0)
for _, datum := range data {
mapLock.Lock()
executedEmpty[name] = append(executedEmpty[name], datum.(string))
mapLock.Unlock()
if datum.(string) == "final" {
count := atomic.AddInt64(&i, 1)
if count >= numInQueue {
close(stop)
}
}
Expand Down Expand Up @@ -214,11 +218,11 @@ func TestPersistableChannelUniqueQueue(t *testing.T) {
close(done)
}

doneA = make(chan struct{})
doneB = make(chan struct{})
doneA := make(chan struct{})
doneB := make(chan struct{})

go emptyQueue("QueueA", doneA)
go emptyQueue("QueueB", doneB)
go emptyQueue("QueueA", int64(hasA), doneA)
go emptyQueue("QueueB", int64(hasB), doneB)

<-doneA
<-doneB
Expand All @@ -234,20 +238,20 @@ func TestPersistableChannelUniqueQueue(t *testing.T) {
hasEmpty = map[string][]string{}
mapLock.Unlock()

doneA = make(chan struct{})
doneB = make(chan struct{})
hasQueueAChan = make(chan int)
hasQueueBChan = make(chan int)

go fillQueue("QueueA", doneA)
go fillQueue("QueueB", doneB)
go fillQueue("QueueA", hasQueueAChan)
go fillQueue("QueueB", hasQueueBChan)

<-doneA
<-doneB
hasA = <-hasQueueAChan
hasB = <-hasQueueBChan

doneA = make(chan struct{})
doneB = make(chan struct{})

go emptyQueue("QueueA", doneA)
go emptyQueue("QueueB", doneB)
go emptyQueue("QueueA", int64(hasA), doneA)
go emptyQueue("QueueB", int64(hasB), doneB)

<-doneA
<-doneB
Expand Down