Skip to content

Commit 99f6193

Browse files
authored
Merge pull request #23 from nsqio/max-bytes-per-file-read
proper handling of maxBytesPerFile for reads
2 parents f9543b6 + b60aadb commit 99f6193

File tree

2 files changed

+83
-13
lines changed

2 files changed

+83
-13
lines changed

diskqueue.go

Lines changed: 28 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -67,15 +67,16 @@ type diskQueue struct {
6767
sync.RWMutex
6868

6969
// instantiation time metadata
70-
name string
71-
dataPath string
72-
maxBytesPerFile int64 // currently this cannot change once created
73-
minMsgSize int32
74-
maxMsgSize int32
75-
syncEvery int64 // number of writes per fsync
76-
syncTimeout time.Duration // duration of time per fsync
77-
exitFlag int32
78-
needSync bool
70+
name string
71+
dataPath string
72+
maxBytesPerFile int64 // cannot change once created
73+
maxBytesPerFileRead int64
74+
minMsgSize int32
75+
maxMsgSize int32
76+
syncEvery int64 // number of writes per fsync
77+
syncTimeout time.Duration // duration of time per fsync
78+
exitFlag int32
79+
needSync bool
7980

8081
// keeps track of the position where we have read
8182
// (but not yet sent over readChan)
@@ -293,6 +294,16 @@ func (d *diskQueue) readOne() ([]byte, error) {
293294
}
294295
}
295296

297+
// for "complete" files (i.e. not the "current" file), maxBytesPerFileRead
298+
// should be initialized to the file's size, or default to maxBytesPerFile
299+
d.maxBytesPerFileRead = d.maxBytesPerFile
300+
if d.readFileNum < d.writeFileNum {
301+
stat, err := d.readFile.Stat()
302+
if err == nil {
303+
d.maxBytesPerFileRead = stat.Size()
304+
}
305+
}
306+
296307
d.reader = bufio.NewReader(d.readFile)
297308
}
298309

@@ -326,10 +337,10 @@ func (d *diskQueue) readOne() ([]byte, error) {
326337
d.nextReadPos = d.readPos + totalBytes
327338
d.nextReadFileNum = d.readFileNum
328339

329-
// TODO: each data file should embed the maxBytesPerFile
330-
// as the first 8 bytes (at creation time) ensuring that
331-
// the value can change without affecting runtime
332-
if d.nextReadPos > d.maxBytesPerFile {
340+
// we only consider rotating if we're reading a "complete" file
341+
// and since we cannot know the size at which it was rotated, we
342+
// rely on maxBytesPerFileRead rather than maxBytesPerFile
343+
if d.readFileNum < d.writeFileNum && d.nextReadPos >= d.maxBytesPerFileRead {
333344
if d.readFile != nil {
334345
d.readFile.Close()
335346
d.readFile = nil
@@ -396,6 +407,10 @@ func (d *diskQueue) writeOne(data []byte) error {
396407
d.depth += 1
397408

398409
if d.writePos >= d.maxBytesPerFile {
410+
if d.readFileNum == d.writeFileNum {
411+
d.maxBytesPerFileRead = d.writePos
412+
}
413+
399414
d.writeFileNum++
400415
d.writePos = 0
401416

diskqueue_test.go

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -420,6 +420,61 @@ func TestDiskQueueTorture(t *testing.T) {
420420
Equal(t, depth, read)
421421
}
422422

423+
func TestDiskQueueResize(t *testing.T) {
424+
l := NewTestLogger(t)
425+
dqName := "test_disk_queue_resize" + strconv.Itoa(int(time.Now().Unix()))
426+
tmpDir, err := ioutil.TempDir("", fmt.Sprintf("nsq-test-%d", time.Now().UnixNano()))
427+
if err != nil {
428+
panic(err)
429+
}
430+
defer os.RemoveAll(tmpDir)
431+
msg := []byte{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}
432+
ml := int64(len(msg))
433+
dq := New(dqName, tmpDir, 8*(ml+4), int32(ml), 1<<10, 2500, time.Second, l)
434+
NotNil(t, dq)
435+
Equal(t, int64(0), dq.Depth())
436+
437+
for i := 0; i < 8; i++ {
438+
msg[0] = byte(i)
439+
err := dq.Put(msg)
440+
Nil(t, err)
441+
}
442+
Equal(t, int64(1), dq.(*diskQueue).writeFileNum)
443+
Equal(t, int64(0), dq.(*diskQueue).writePos)
444+
Equal(t, int64(8), dq.Depth())
445+
446+
dq.Close()
447+
dq = New(dqName, tmpDir, 10*(ml+4), int32(ml), 1<<10, 2500, time.Second, l)
448+
449+
for i := 0; i < 10; i++ {
450+
msg[0] = byte(20 + i)
451+
err := dq.Put(msg)
452+
Nil(t, err)
453+
}
454+
Equal(t, int64(2), dq.(*diskQueue).writeFileNum)
455+
Equal(t, int64(0), dq.(*diskQueue).writePos)
456+
Equal(t, int64(18), dq.Depth())
457+
458+
for i := 0; i < 8; i++ {
459+
msg[0] = byte(i)
460+
Equal(t, msg, <-dq.ReadChan())
461+
}
462+
for i := 0; i < 10; i++ {
463+
msg[0] = byte(20 + i)
464+
Equal(t, msg, <-dq.ReadChan())
465+
}
466+
Equal(t, int64(0), dq.Depth())
467+
dq.Close()
468+
469+
// make sure there aren't "bad" files due to read logic errors
470+
files, err := filepath.Glob(filepath.Join(tmpDir, dqName+"*.bad"))
471+
Nil(t, err)
472+
// empty files slice is actually nil, length check is less confusing
473+
if len(files) > 0 {
474+
Equal(t, []string{}, files)
475+
}
476+
}
477+
423478
func BenchmarkDiskQueuePut16(b *testing.B) {
424479
benchmarkDiskQueuePut(16, b)
425480
}

0 commit comments

Comments
 (0)