@@ -11,7 +11,6 @@ import (
1111 "os"
1212 "path"
1313 "sync"
14- "sync/atomic"
1514 "time"
1615)
1716
@@ -92,6 +91,7 @@ type diskQueue struct {
9291 readChan chan []byte
9392
9493 // internal channels
94+ depthChan chan int64
9595 writeChan chan []byte
9696 writeResponseChan chan error
9797 emptyChan chan int
@@ -114,6 +114,7 @@ func New(name string, dataPath string, maxBytesPerFile int64,
114114 minMsgSize : minMsgSize ,
115115 maxMsgSize : maxMsgSize ,
116116 readChan : make (chan []byte ),
117+ depthChan : make (chan int64 ),
117118 writeChan : make (chan []byte ),
118119 writeResponseChan : make (chan error ),
119120 emptyChan : make (chan int ),
@@ -137,7 +138,7 @@ func New(name string, dataPath string, maxBytesPerFile int64,
137138
138139// Depth returns the depth of the queue
139140func (d * diskQueue ) Depth () int64 {
140- return atomic . LoadInt64 ( & d . depth )
141+ return <- d . depthChan
141142}
142143
143144// ReadChan returns the receive-only []byte channel for reading data
@@ -256,7 +257,7 @@ func (d *diskQueue) skipToNextRWFile() error {
256257 d .readPos = 0
257258 d .nextReadFileNum = d .writeFileNum
258259 d .nextReadPos = 0
259- atomic . StoreInt64 ( & d .depth , 0 )
260+ d .depth = 0
260261
261262 return err
262263}
@@ -385,7 +386,7 @@ func (d *diskQueue) writeOne(data []byte) error {
385386
386387 totalBytes := int64 (4 + dataLen )
387388 d .writePos += totalBytes
388- atomic . AddInt64 ( & d .depth , 1 )
389+ d .depth += 1
389390
390391 if d .writePos >= d .maxBytesPerFile {
391392 d .writeFileNum ++
@@ -446,7 +447,7 @@ func (d *diskQueue) retrieveMetaData() error {
446447 if err != nil {
447448 return err
448449 }
449- atomic . StoreInt64 ( & d .depth , depth )
450+ d .depth = depth
450451 d .nextReadFileNum = d .readFileNum
451452 d .nextReadPos = d .readPos
452453
@@ -468,7 +469,7 @@ func (d *diskQueue) persistMetaData() error {
468469 }
469470
470471 _ , err = fmt .Fprintf (f , "%d\n %d,%d\n %d,%d\n " ,
471- atomic . LoadInt64 ( & d .depth ) ,
472+ d .depth ,
472473 d .readFileNum , d .readPos ,
473474 d .writeFileNum , d .writePos )
474475 if err != nil {
@@ -508,7 +509,7 @@ func (d *diskQueue) checkTailCorruption(depth int64) {
508509 d .name , depth )
509510 }
510511 // force set depth 0
511- atomic . StoreInt64 ( & d .depth , 0 )
512+ d .depth = 0
512513 d .needSync = true
513514 }
514515
@@ -534,7 +535,7 @@ func (d *diskQueue) moveForward() {
534535 oldReadFileNum := d .readFileNum
535536 d .readFileNum = d .nextReadFileNum
536537 d .readPos = d .nextReadPos
537- depth := atomic . AddInt64 ( & d .depth , - 1 )
538+ d .depth -= 1
538539
539540 // see if we need to clean up the old file
540541 if oldReadFileNum != d .nextReadFileNum {
@@ -548,7 +549,7 @@ func (d *diskQueue) moveForward() {
548549 }
549550 }
550551
551- d .checkTailCorruption (depth )
552+ d .checkTailCorruption (d . depth )
552553}
553554
554555func (d * diskQueue ) handleReadError () {
@@ -639,6 +640,7 @@ func (d *diskQueue) ioLoop() {
639640 count ++
640641 // moveForward sets needSync flag if a file is removed
641642 d .moveForward ()
643+ case d .depthChan <- d .depth :
642644 case <- d .emptyChan :
643645 d .emptyResponseChan <- d .deleteAllFiles ()
644646 count = 0
0 commit comments