Skip to content

Commit 8cc113d

Browse files
committed
valsep: create SSTBlobWriter to write external sst with blob files
SSTBlobWriter is an SST writer that can be configured to separate values into new blob files. On close, the writer can return metadata about the written sst and its blob files.
1 parent 18d3578 commit 8cc113d

File tree

6 files changed

+439
-0
lines changed

6 files changed

+439
-0
lines changed

sstable/colblk_writer.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -269,6 +269,14 @@ func (w *RawColumnWriter) IsPrefixEqualPrev(k []byte) bool {
269269
return w.dataBlock.KeyWriter.ComparePrev(k).PrefixEqual()
270270
}
271271

272+
// PrevPointKeyKind implements the RawWriter interface.
273+
func (w *RawColumnWriter) PrevPointKeyKind() base.InternalKeyKind {
274+
if w == nil || w.dataBlock.Rows() == 0 {
275+
return base.InternalKeyKindInvalid
276+
}
277+
return w.prevPointKey.trailer.Kind()
278+
}
279+
272280
// SetSnapshotPinnedProperties sets the properties for pinned keys. Should only
273281
// be used internally by Pebble.
274282
func (w *RawColumnWriter) SetSnapshotPinnedProperties(

sstable/rowblk_writer.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1408,6 +1408,14 @@ func (w *RawRowWriter) IsPrefixEqualPrev(k []byte) bool {
14081408
return bytes.Equal(w.split.Prefix(k), w.split.Prefix(w.dataBlockBuf.dataBlock.CurUserKey()))
14091409
}
14101410

1411+
// PrevPointKeyKind implements the RawWriter interface.
1412+
func (w *RawRowWriter) PrevPointKeyKind() base.InternalKeyKind {
1413+
if w == nil || w.dataBlockBuf.dataBlock.EntryCount() == 0 {
1414+
return base.InternalKeyKindInvalid
1415+
}
1416+
return w.dataBlockBuf.dataBlock.CurKey().Kind()
1417+
}
1418+
14111419
// EncodeSpan encodes the keys in the given span. The span can contain either
14121420
// only RANGEDEL keys or only range keys.
14131421
//

sstable/writer.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -363,6 +363,9 @@ type RawWriter interface {
363363
//
364364
// Must not be called after Writer is closed.
365365
IsPrefixEqualPrev(k []byte) bool
366+
// PrevPointKeyKind returns the InternalKeyKind of the last point key written
367+
// to the writer. Must not be called after Writer is closed.
368+
PrevPointKeyKind() base.InternalKeyKind
366369

367370
// SetValueSeparationProps sets the value separation props that were used when
368371
// writing this sstable. This is recorded in the sstable properties.

valsep/sst_blob_writer.go

Lines changed: 167 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,167 @@
1+
// Copyright 2025 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2+
// of this source code is governed by a BSD-style license that can be found in
3+
// the LICENSE file.
4+
5+
package valsep
6+
7+
import (
8+
"github.com/cockroachdb/errors"
9+
"github.com/cockroachdb/pebble/internal/base"
10+
"github.com/cockroachdb/pebble/objstorage"
11+
"github.com/cockroachdb/pebble/sstable"
12+
"github.com/cockroachdb/pebble/sstable/blob"
13+
"github.com/cockroachdb/pebble/sstable/block"
14+
)
15+
16+
// SSTBlobWriter writes an sstable and 0 or more blob value files
17+
// for ingesting into pebble. Values are extracted depending on the
18+
// value separation strategy defined. If ValueSeparation is
19+
// neverSeparateValues, this behaves like sstable.Writer and no blob
20+
// files are written.
21+
type SSTBlobWriter struct {
22+
// Promote methods from sstable.Writer.
23+
*sstable.Writer
24+
valSep ValueSeparation
25+
err error
26+
27+
blobFileNum base.DiskFileNum
28+
closed bool
29+
// isStrictObsolete is true if the writer is configured to write and enforce
30+
// a 'strict obsolete' sstable. This includes prohibiting the addition of
31+
// MERGE keys. See the documentation in format.go for more details.
32+
isStrictObsolete bool
33+
34+
kvScratch base.InternalKV
35+
// Metadata on the blob files written.
36+
blobFilesMeta []blob.FileWriterStats
37+
}
38+
39+
type NewBlobFile func() (objstorage.Writable, error)
40+
41+
var neverSeparateValues = &NeverSeparateValues{}
42+
43+
type SSTBlobWriterOptions struct {
44+
SSTWriterOpts sstable.WriterOptions
45+
BlobWriterOpts blob.FileWriterOptions
46+
// BlobFilesDisabled is true if value separation into blob files
47+
// is disabled and the writer will behave like an sstable.Writer
48+
// in this case. Note that values may still be separated into a
49+
// value block in the same sstable.
50+
BlobFilesDisabled bool
51+
// The minimum size required for a value to be separated into a
52+
// blob file. This value may be overridden by the span policy.
53+
ValueSeparationMinSize int
54+
// SpanPolicy specifies the specific policies applied to the table span.
55+
// When using the external writer, there should be 1 span policy
56+
// applied to the entire sstable.
57+
SpanPolicy base.SpanPolicy
58+
NewBlobFileFn NewBlobFile
59+
}
60+
61+
// NewSSTBlobWriter returns a new SSTBlobWriter that writes to the provided
62+
// sstHandle. The writer uses the provided options to configure both the sstable
63+
// writer and the blob file writer.
64+
func NewSSTBlobWriter(sstHandle objstorage.Writable, opts SSTBlobWriterOptions) *SSTBlobWriter {
65+
writer := &SSTBlobWriter{
66+
isStrictObsolete: opts.SSTWriterOpts.IsStrictObsolete,
67+
}
68+
69+
if opts.SpanPolicy.PreferFastCompression && opts.SSTWriterOpts.Compression != block.NoCompression {
70+
opts.SSTWriterOpts.Compression = block.FastestCompression
71+
}
72+
73+
writer.Writer = sstable.NewWriter(sstHandle, opts.SSTWriterOpts)
74+
75+
// Create the value separator.
76+
minimumValueSize := opts.ValueSeparationMinSize
77+
if opts.SpanPolicy.ValueStoragePolicy.OverrideBlobSeparationMinimumSize > 0 {
78+
minimumValueSize = opts.SpanPolicy.ValueStoragePolicy.OverrideBlobSeparationMinimumSize
79+
}
80+
if opts.BlobFilesDisabled || minimumValueSize == 0 || opts.SpanPolicy.ValueStoragePolicy.DisableBlobSeparation {
81+
writer.valSep = neverSeparateValues
82+
} else {
83+
newBlobObject := func() (objstorage.Writable, objstorage.ObjectMetadata, error) {
84+
// The ObjectMetadata collected by the value separator will not be
85+
// exposed by this writer, since this store does not yet know about
86+
// these objects. However, we must provide a unique file number for
87+
// each new blob file because the value separator uses file ids to
88+
// retrieve the index of the blob file within the sst's tracked blob
89+
// references array. The reference id (array index) is what is then
90+
// written to the inline blob handle.
91+
newHandle, err := opts.NewBlobFileFn()
92+
if err != nil {
93+
return nil, objstorage.ObjectMetadata{}, err
94+
}
95+
nextFileNum := writer.blobFileNum
96+
writer.blobFileNum++
97+
return newHandle, objstorage.ObjectMetadata{DiskFileNum: nextFileNum}, nil
98+
}
99+
writer.valSep = NewWriteNewBlobFiles(
100+
opts.SSTWriterOpts.Comparer,
101+
newBlobObject,
102+
opts.BlobWriterOpts,
103+
minimumValueSize,
104+
WriteNewBlobFilesOptions{
105+
DisableValueSeparationBySuffix: opts.SpanPolicy.ValueStoragePolicy.DisableSeparationBySuffix,
106+
ShortAttrExtractor: opts.SSTWriterOpts.ShortAttributeExtractor,
107+
InvalidValueCallback: func(userKey []byte, value []byte, err error) {
108+
writer.err = errors.CombineErrors(writer.err, err)
109+
},
110+
},
111+
)
112+
}
113+
114+
return writer
115+
}
116+
117+
// Error returns the current accumulated error if any.
118+
func (w *SSTBlobWriter) Error() error {
119+
return errors.CombineErrors(w.err, w.Writer.Error())
120+
}
121+
122+
// Set sets the value for the given key. The sequence number is set to 0.
123+
// Values may be separated into blob files depending on the value separation
124+
// strategy configured for the writer. Intended for use to externally construct
125+
// an sstable with its blob files before ingestion into a DB. For a given
126+
// SSTBlobWriter, the keys passed to Set must be in strictly increasing order.
127+
func (w *SSTBlobWriter) Set(key, value []byte) error {
128+
if err := w.Error(); err != nil {
129+
return err
130+
}
131+
132+
if w.isStrictObsolete {
133+
return errors.Errorf("use raw writer Add in strict obsolete mode")
134+
}
135+
136+
w.kvScratch.K = base.MakeInternalKey(key, 0, sstable.InternalKeyKindSet)
137+
w.kvScratch.V = base.MakeInPlaceValue(value)
138+
isLikelyMVCCGarbage := sstable.IsLikelyMVCCGarbage(
139+
key, w.Raw().PrevPointKeyKind(), sstable.InternalKeyKindSet, len(value), w.Writer.Raw().IsPrefixEqualPrev)
140+
return w.valSep.Add(w.Raw(), &w.kvScratch, false, isLikelyMVCCGarbage)
141+
}
142+
143+
// BlobWriterMetas returns an array of blob.FileWriterStats describing the
144+
// blob files written by this SSTBlobWriter. The ordering of the returned
145+
// slice matches the ordering of blob files as they should appear in the
146+
// sstable's manifest.BlobReferences. Close must be called before calling this
147+
// method.
148+
func (w *SSTBlobWriter) BlobWriterMetas() ([]blob.FileWriterStats, error) {
149+
if !w.closed {
150+
return nil, errors.New("blob writer not closed")
151+
}
152+
return w.blobFilesMeta, nil
153+
}
154+
155+
// Close closes both the sstable writer and the blob file writer if any.
156+
func (w *SSTBlobWriter) Close() error {
157+
w.err = errors.CombineErrors(w.err, w.Writer.Close())
158+
meta, err := w.valSep.FinishOutput()
159+
if err != nil {
160+
w.err = errors.CombineErrors(w.err, err)
161+
}
162+
for _, blobFile := range meta.NewBlobFiles {
163+
w.blobFilesMeta = append(w.blobFilesMeta, blobFile.FileStats)
164+
}
165+
w.closed = true
166+
return w.err
167+
}

valsep/sst_blob_writer_test.go

Lines changed: 184 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,184 @@
1+
// Copyright 2025 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2+
// of this source code is governed by a BSD-style license that can be found in
3+
// the LICENSE file.
4+
5+
package valsep
6+
7+
import (
8+
"bytes"
9+
"context"
10+
"fmt"
11+
"strconv"
12+
"strings"
13+
"testing"
14+
15+
"github.com/cockroachdb/crlib/testutils/leaktest"
16+
"github.com/cockroachdb/datadriven"
17+
"github.com/cockroachdb/pebble/internal/base"
18+
"github.com/cockroachdb/pebble/internal/testkeys"
19+
"github.com/cockroachdb/pebble/objstorage"
20+
"github.com/cockroachdb/pebble/objstorage/objstorageprovider"
21+
"github.com/cockroachdb/pebble/sstable"
22+
"github.com/cockroachdb/pebble/vfs"
23+
"github.com/stretchr/testify/require"
24+
)
25+
26+
func TestSSTBlobWriter(t *testing.T) {
27+
defer leaktest.AfterTest(t)()
28+
runDataDriven(t, "testdata/sst_blob_writer")
29+
}
30+
31+
// The span policy string is in the form "(<option1>=<val>,<option2>=<val>...)"
32+
func parseSpanPolicy(t *testing.T, spanPolicyStr string) base.SpanPolicy {
33+
spanPolicyStr = strings.TrimPrefix(spanPolicyStr, "(")
34+
spanPolicyStr = strings.TrimSuffix(spanPolicyStr, ")")
35+
spanPolicyParts := strings.Split(spanPolicyStr, ",")
36+
var policy base.ValueStoragePolicyAdjustment
37+
var err error
38+
for _, part := range spanPolicyParts {
39+
fieldParts := strings.Split(part, "=")
40+
switch fieldParts[0] {
41+
case "no-value-separation":
42+
policy.DisableBlobSeparation = true
43+
case "value-separation-min-size":
44+
policy.OverrideBlobSeparationMinimumSize, err = strconv.Atoi(fieldParts[1])
45+
if err != nil {
46+
t.Fatalf("parsing value-separation-min-size: %v", err)
47+
}
48+
case "disable-value-separation-by-suffix":
49+
policy.DisableSeparationBySuffix = true
50+
default:
51+
t.Fatalf("unrecognized span policy option: %s", fieldParts[0])
52+
}
53+
}
54+
55+
return base.SpanPolicy{
56+
ValueStoragePolicy: policy,
57+
}
58+
}
59+
60+
func parseBuildSSTBlobWriterOptions(t *testing.T, td *datadriven.TestData) SSTBlobWriterOptions {
61+
opts := SSTBlobWriterOptions{}
62+
td.MaybeScanArgs(t, "value-separation-min-size", &opts.ValueSeparationMinSize)
63+
64+
var spanPolicyStr string
65+
td.MaybeScanArgs(t, "span-policy", &spanPolicyStr)
66+
if spanPolicyStr != "" {
67+
opts.SpanPolicy = parseSpanPolicy(t, spanPolicyStr)
68+
}
69+
return opts
70+
}
71+
72+
func runDataDriven(t *testing.T, file string) {
73+
datadriven.RunTest(t, file, func(t *testing.T, td *datadriven.TestData) string {
74+
ctx := context.Background()
75+
switch td.Cmd {
76+
case "build":
77+
var buf bytes.Buffer
78+
fs := vfs.WithLogging(vfs.NewMem(), func(format string, args ...any) {
79+
fmt.Fprint(&buf, "# ")
80+
fmt.Fprintf(&buf, format, args...)
81+
fmt.Fprintln(&buf)
82+
})
83+
objStore, err := objstorageprovider.Open(objstorageprovider.Settings{
84+
FS: fs,
85+
})
86+
require.NoError(t, err)
87+
blobFileCount := 0
88+
opts := parseBuildSSTBlobWriterOptions(t, td)
89+
opts.SSTWriterOpts.Comparer = testkeys.Comparer
90+
opts.SSTWriterOpts.TableFormat = sstable.TableFormatPebblev7
91+
opts.NewBlobFileFn = func() (objstorage.Writable, error) {
92+
fnum := blobFileCount
93+
w, _, err := objStore.Create(ctx, base.FileTypeBlob, base.DiskFileNum(fnum), objstorage.CreateOptions{})
94+
if err != nil {
95+
return nil, err
96+
}
97+
blobFileCount++
98+
return w, err
99+
}
100+
sstHandle, _, err := objStore.Create(ctx, base.FileTypeTable, 0, objstorage.CreateOptions{})
101+
require.NoError(t, err)
102+
writer := NewSSTBlobWriter(sstHandle, opts)
103+
defer func() {
104+
if !writer.closed {
105+
_ = writer.Close()
106+
}
107+
}()
108+
kvs, err := sstable.ParseTestKVsAndSpans(td.Input, nil)
109+
if err != nil {
110+
return fmt.Sprintf("error parsing input: %v", err)
111+
}
112+
113+
for _, kv := range kvs {
114+
keyKind := kv.Key.Kind()
115+
if kv.IsKeySpan() {
116+
keyKind = kv.Span.Keys[0].Kind()
117+
}
118+
switch keyKind {
119+
case sstable.InternalKeyKindSet, sstable.InternalKeyKindSetWithDelete:
120+
if err := writer.Set(kv.Key.UserKey, kv.Value); err != nil {
121+
return fmt.Sprintf("error putting key %s: %v", kv.Key.UserKey, err)
122+
}
123+
case sstable.InternalKeyKindDelete, sstable.InternalKeyKindDeleteSized:
124+
if err := writer.Delete(kv.Key.UserKey); err != nil {
125+
return fmt.Sprintf("error deleting key %s: %v", kv.Key.UserKey, err)
126+
}
127+
case sstable.InternalKeyKindRangeDelete:
128+
if err := writer.RangeKeyDelete(kv.Span.Start, kv.Span.End); err != nil {
129+
return fmt.Sprintf("error deleting range %s-%s: %v", kv.Span.Start, kv.Span.End, err)
130+
}
131+
case sstable.InternalKeyKindMerge:
132+
if err := writer.Merge(kv.Key.UserKey, kv.Value); err != nil {
133+
return fmt.Sprintf("error merging key %s: %v", kv.Key.UserKey, err)
134+
}
135+
case base.InternalKeyKindRangeKeySet:
136+
if err := writer.RangeKeySet(kv.Span.Start, kv.Span.End, nil, kv.Value); err != nil {
137+
return fmt.Sprintf("error setting range key %s-%s: %v", kv.Span.Start, kv.Span.End, err)
138+
}
139+
case base.InternalKeyKindRangeKeyUnset:
140+
if err := writer.RangeKeyUnset(kv.Span.Start, kv.Span.End, kv.Key.UserKey); err != nil {
141+
return fmt.Sprintf("error unsetting range key %s-%s: %v", kv.Span.Start, kv.Span.End, err)
142+
}
143+
case base.InternalKeyKindRangeKeyDelete:
144+
if err := writer.RangeKeyDelete(kv.Span.Start, kv.Span.End); err != nil {
145+
return fmt.Sprintf("error deleting range key %s-%s: %v", kv.Span.Start, kv.Span.End, err)
146+
}
147+
default:
148+
return fmt.Sprintf("unsupported key kind %v", kv.Key.Kind())
149+
}
150+
}
151+
152+
if err := writer.Close(); err != nil {
153+
return fmt.Sprintf("error closing writer: %v", err)
154+
}
155+
156+
tableMeta, err := writer.Metadata()
157+
if err != nil {
158+
return fmt.Sprintf("error getting metadata: %v", err)
159+
}
160+
161+
blobMetas, err := writer.BlobWriterMetas()
162+
if err != nil {
163+
return fmt.Sprintf("error getting blob metas: %v", err)
164+
}
165+
166+
var outputBuf bytes.Buffer
167+
// Print some sst properties.
168+
fmt.Fprintf(&outputBuf, "size:%d\n", tableMeta.Size)
169+
outputBuf.WriteString("blobfiles:")
170+
require.Equal(t, blobFileCount, len(blobMetas))
171+
if len(blobMetas) > 0 {
172+
outputBuf.WriteString("\n")
173+
for i, bm := range blobMetas {
174+
fmt.Fprintf(&outputBuf, "%d: %s\n", i+1, bm.String())
175+
}
176+
} else {
177+
outputBuf.WriteString(" none\n")
178+
}
179+
return outputBuf.String()
180+
default:
181+
return fmt.Sprintf("unrecognized command %s", td.Cmd)
182+
}
183+
})
184+
}

0 commit comments

Comments
 (0)