@@ -465,35 +465,59 @@ func (t *freezerTable) releaseFilesAfter(num uint32, remove bool) {
465465// Note, this method will *not* flush any data to disk so be sure to explicitly
466466// fsync before irreversibly deleting data from the database.
467467func (t * freezerTable ) Append (item uint64 , blob []byte ) error {
468+ // Encode the blob before the lock portion
469+ if ! t .noCompression {
470+ blob = snappy .Encode (nil , blob )
471+ }
468472 // Read lock prevents competition with truncate
469- t .lock .RLock ()
473+ retry , err := t .append (item , blob , false )
474+ if err != nil {
475+ return err
476+ }
477+ if retry {
478+ // Read lock was insufficient, retry with a writelock
479+ _ , err = t .append (item , blob , true )
480+ }
481+ return err
482+ }
483+
484+ // append injects a binary blob at the end of the freezer table.
485+ // Normally, inserts do not require holding the write-lock, so it should be invoked with 'wlock' set to
486+ // false.
487+ // However, if the data will grown the current file out of bounds, then this
488+ // method will return 'true, nil', indicating that the caller should retry, this time
489+ // with 'wlock' set to true.
490+ func (t * freezerTable ) append (item uint64 , encodedBlob []byte , wlock bool ) (bool , error ) {
491+ if wlock {
492+ t .lock .Lock ()
493+ defer t .lock .Unlock ()
494+ } else {
495+ t .lock .RLock ()
496+ defer t .lock .RUnlock ()
497+ }
470498 // Ensure the table is still accessible
471499 if t .index == nil || t .head == nil {
472- t .lock .RUnlock ()
473- return errClosed
500+ return false , errClosed
474501 }
475502 // Ensure only the next item can be written, nothing else
476503 if atomic .LoadUint64 (& t .items ) != item {
477- t .lock .RUnlock ()
478- return fmt .Errorf ("appending unexpected item: want %d, have %d" , t .items , item )
479- }
480- // Encode the blob and write it into the data file
481- if ! t .noCompression {
482- blob = snappy .Encode (nil , blob )
504+ return false , fmt .Errorf ("appending unexpected item: want %d, have %d" , t .items , item )
483505 }
484- bLen := uint32 (len (blob ))
506+ bLen := uint32 (len (encodedBlob ))
485507 if t .headBytes + bLen < bLen ||
486508 t .headBytes + bLen > t .maxFileSize {
487- // we need a new file, writing would overflow
488- t .lock .RUnlock ()
489- t .lock .Lock ()
509+ // Writing would overflow, so we need to open a new data file.
510+ // If we don't already hold the writelock, abort and let the caller
511+ // invoke this method a second time.
512+ if ! wlock {
513+ return true , nil
514+ }
490515 nextID := atomic .LoadUint32 (& t .headId ) + 1
491516 // We open the next file in truncated mode -- if this file already
492517 // exists, we need to start over from scratch on it
493518 newHead , err := t .openFile (nextID , openFreezerFileTruncated )
494519 if err != nil {
495- t .lock .Unlock ()
496- return err
520+ return false , err
497521 }
498522 // Close old file, and reopen in RDONLY mode
499523 t .releaseFile (t .headId )
@@ -503,13 +527,9 @@ func (t *freezerTable) Append(item uint64, blob []byte) error {
503527 t .head = newHead
504528 atomic .StoreUint32 (& t .headBytes , 0 )
505529 atomic .StoreUint32 (& t .headId , nextID )
506- t .lock .Unlock ()
507- t .lock .RLock ()
508530 }
509-
510- defer t .lock .RUnlock ()
511- if _ , err := t .head .Write (blob ); err != nil {
512- return err
531+ if _ , err := t .head .Write (encodedBlob ); err != nil {
532+ return false , err
513533 }
514534 newOffset := atomic .AddUint32 (& t .headBytes , bLen )
515535 idx := indexEntry {
@@ -523,7 +543,7 @@ func (t *freezerTable) Append(item uint64, blob []byte) error {
523543 t .sizeGauge .Inc (int64 (bLen + indexEntrySize ))
524544
525545 atomic .AddUint64 (& t .items , 1 )
526- return nil
546+ return false , nil
527547}
528548
529549// getBounds returns the indexes for the item
@@ -562,44 +582,48 @@ func (t *freezerTable) getBounds(item uint64) (uint32, uint32, uint32, error) {
562582// Retrieve looks up the data offset of an item with the given number and retrieves
563583// the raw binary blob from the data file.
564584func (t * freezerTable ) Retrieve (item uint64 ) ([]byte , error ) {
585+ blob , err := t .retrieve (item )
586+ if err != nil {
587+ return nil , err
588+ }
589+ if t .noCompression {
590+ return blob , nil
591+ }
592+ return snappy .Decode (nil , blob )
593+ }
594+
595+ // retrieve looks up the data offset of an item with the given number and retrieves
596+ // the raw binary blob from the data file. OBS! This method does not decode
597+ // compressed data.
598+ func (t * freezerTable ) retrieve (item uint64 ) ([]byte , error ) {
565599 t .lock .RLock ()
600+ defer t .lock .RUnlock ()
566601 // Ensure the table and the item is accessible
567602 if t .index == nil || t .head == nil {
568- t .lock .RUnlock ()
569603 return nil , errClosed
570604 }
571605 if atomic .LoadUint64 (& t .items ) <= item {
572- t .lock .RUnlock ()
573606 return nil , errOutOfBounds
574607 }
575608 // Ensure the item was not deleted from the tail either
576609 if uint64 (t .itemOffset ) > item {
577- t .lock .RUnlock ()
578610 return nil , errOutOfBounds
579611 }
580612 startOffset , endOffset , filenum , err := t .getBounds (item - uint64 (t .itemOffset ))
581613 if err != nil {
582- t .lock .RUnlock ()
583614 return nil , err
584615 }
585616 dataFile , exist := t .files [filenum ]
586617 if ! exist {
587- t .lock .RUnlock ()
588618 return nil , fmt .Errorf ("missing data file %d" , filenum )
589619 }
590620 // Retrieve the data itself, decompress and return
591621 blob := make ([]byte , endOffset - startOffset )
592622 if _ , err := dataFile .ReadAt (blob , int64 (startOffset )); err != nil {
593- t .lock .RUnlock ()
594623 return nil , err
595624 }
596- t .lock .RUnlock ()
597625 t .readMeter .Mark (int64 (len (blob ) + 2 * indexEntrySize ))
598-
599- if t .noCompression {
600- return blob , nil
601- }
602- return snappy .Decode (nil , blob )
626+ return blob , nil
603627}
604628
605629// has returns an indicator whether the specified number data
0 commit comments