diff --git a/.travis.yml b/.travis.yml index dc30e30..e8a6e43 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,6 +1,8 @@ language: go go: - 1.x +script: + - GOMAXPROCS=1 go test -v + - GOMAXPROCS=4 go test -v -race notifications: email: false - diff --git a/diskqueue.go b/diskqueue.go index b7e0736..078a924 100644 --- a/diskqueue.go +++ b/diskqueue.go @@ -11,7 +11,6 @@ import ( "os" "path" "sync" - "sync/atomic" "time" ) @@ -92,6 +91,7 @@ type diskQueue struct { readChan chan []byte // internal channels + depthChan chan int64 writeChan chan []byte writeResponseChan chan error emptyChan chan int @@ -114,6 +114,7 @@ func New(name string, dataPath string, maxBytesPerFile int64, minMsgSize: minMsgSize, maxMsgSize: maxMsgSize, readChan: make(chan []byte), + depthChan: make(chan int64), writeChan: make(chan []byte), writeResponseChan: make(chan error), emptyChan: make(chan int), @@ -137,7 +138,7 @@ func New(name string, dataPath string, maxBytesPerFile int64, // Depth returns the depth of the queue func (d *diskQueue) Depth() int64 { - return atomic.LoadInt64(&d.depth) + return <-d.depthChan } // ReadChan returns the receive-only []byte channel for reading data @@ -256,7 +257,7 @@ func (d *diskQueue) skipToNextRWFile() error { d.readPos = 0 d.nextReadFileNum = d.writeFileNum d.nextReadPos = 0 - atomic.StoreInt64(&d.depth, 0) + d.depth = 0 return err } @@ -385,7 +386,7 @@ func (d *diskQueue) writeOne(data []byte) error { totalBytes := int64(4 + dataLen) d.writePos += totalBytes - atomic.AddInt64(&d.depth, 1) + d.depth += 1 if d.writePos >= d.maxBytesPerFile { d.writeFileNum++ @@ -446,7 +447,7 @@ func (d *diskQueue) retrieveMetaData() error { if err != nil { return err } - atomic.StoreInt64(&d.depth, depth) + d.depth = depth d.nextReadFileNum = d.readFileNum d.nextReadPos = d.readPos @@ -468,7 +469,7 @@ func (d *diskQueue) persistMetaData() error { } _, err = fmt.Fprintf(f, "%d\n%d,%d\n%d,%d\n", - atomic.LoadInt64(&d.depth), + d.depth, d.readFileNum, d.readPos, d.writeFileNum, d.writePos) if err != nil { @@ -508,7 +509,7 @@ func (d *diskQueue) checkTailCorruption(depth int64) { d.name, depth) } // force set depth 0 - atomic.StoreInt64(&d.depth, 0) + d.depth = 0 d.needSync = true } @@ -534,7 +535,7 @@ func (d *diskQueue) moveForward() { oldReadFileNum := d.readFileNum d.readFileNum = d.nextReadFileNum d.readPos = d.nextReadPos - depth := atomic.AddInt64(&d.depth, -1) + d.depth -= 1 // see if we need to clean up the old file if oldReadFileNum != d.nextReadFileNum { @@ -548,7 +549,7 @@ func (d *diskQueue) moveForward() { } } - d.checkTailCorruption(depth) + d.checkTailCorruption(d.depth) } func (d *diskQueue) handleReadError() { @@ -639,6 +640,7 @@ func (d *diskQueue) ioLoop() { count++ // moveForward sets needSync flag if a file is removed d.moveForward() + case d.depthChan <- d.depth: case <-d.emptyChan: d.emptyResponseChan <- d.deleteAllFiles() count = 0 diff --git a/diskqueue_test.go b/diskqueue_test.go index 1f1f080..a685f07 100644 --- a/diskqueue_test.go +++ b/diskqueue_test.go @@ -123,6 +123,11 @@ func TestDiskQueueRoll(t *testing.T) { Equal(t, int64(1), dq.(*diskQueue).writeFileNum) Equal(t, int64(0), dq.(*diskQueue).writePos) + + for i := 10; i > 0; i-- { + Equal(t, msg, <-dq.ReadChan()) + Equal(t, int64(i-1), dq.Depth()) + } } func assertFileNotExist(t *testing.T, fn string) {