Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions db/column_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,22 @@ Status CheckCompressionSupported(const ColumnFamilyOptions& cf_options) {
" is not linked with the binary.");
}
}
if (cf_options.compression_opts.zstd_max_train_bytes > 0) {
if (!CompressionTypeSupported(CompressionType::kZSTD)) {
// Dictionary trainer is available since v0.6.1, but ZSTD was marked
// stable only since v0.8.0. For now we enable the feature in stable
// versions only.
return Status::InvalidArgument(
"zstd dictionary trainer cannot be used because " +
CompressionTypeToString(CompressionType::kZSTD) +
" is not linked with the binary.");
}
if (cf_options.compression_opts.max_dict_bytes == 0) {
return Status::InvalidArgument(
"The dictionary size limit (`CompressionOptions::max_dict_bytes`) "
"should be nonzero if we're using zstd's dictionary generator.");
}
}
return Status::OK();
}

Expand Down
37 changes: 23 additions & 14 deletions db/compaction_job.cc
Original file line number Diff line number Diff line change
Expand Up @@ -702,15 +702,18 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
sub_compact->compaction->mutable_cf_options();

// To build compression dictionary, we sample the first output file, assuming
// it'll reach the maximum length, and then use the dictionary for compressing
// subsequent output files. The dictionary may be less than max_dict_bytes if
// the first output file's length is less than the maximum.
// it'll reach the maximum length. We optionally pass these samples through
// zstd's dictionary trainer, or just use them directly. Then, the dictionary
// is used for compressing subsequent output files in the same subcompaction.
const bool kUseZstdTrainer =
cfd->ioptions()->compression_opts.zstd_max_train_bytes > 0;
const size_t kSampleBytes =
kUseZstdTrainer ? cfd->ioptions()->compression_opts.zstd_max_train_bytes
: cfd->ioptions()->compression_opts.max_dict_bytes;
const int kSampleLenShift = 6; // 2^6 = 64-byte samples
std::set<size_t> sample_begin_offsets;
if (bottommost_level_ &&
cfd->ioptions()->compression_opts.max_dict_bytes > 0) {
const size_t kMaxSamples =
cfd->ioptions()->compression_opts.max_dict_bytes >> kSampleLenShift;
if (bottommost_level_ && kSampleBytes > 0) {
const size_t kMaxSamples = kSampleBytes >> kSampleLenShift;
const size_t kOutFileLen = mutable_cf_options->MaxFileSizeForLevel(
compact_->compaction->output_level());
if (kOutFileLen != port::kMaxSizet) {
Expand Down Expand Up @@ -780,11 +783,11 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
}
const auto& c_iter_stats = c_iter->iter_stats();
auto sample_begin_offset_iter = sample_begin_offsets.cbegin();
// data_begin_offset and compression_dict are only valid while generating
// data_begin_offset and dict_sample_data are only valid while generating
// dictionary from the first output file.
size_t data_begin_offset = 0;
std::string compression_dict;
compression_dict.reserve(cfd->ioptions()->compression_opts.max_dict_bytes);
std::string dict_sample_data;
dict_sample_data.reserve(kSampleBytes);

while (status.ok() && !cfd->IsDropped() && c_iter->Valid()) {
// Invariant: c_iter.status() is guaranteed to be OK if c_iter->Valid()
Expand Down Expand Up @@ -856,7 +859,7 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
data_elmt_copy_len =
data_end_offset - (data_begin_offset + data_elmt_copy_offset);
}
compression_dict.append(&data_elmt.data()[data_elmt_copy_offset],
dict_sample_data.append(&data_elmt.data()[data_elmt_copy_offset],
data_elmt_copy_len);
if (sample_end_offset > data_end_offset) {
// Didn't finish sample. Try to finish it with the next data_elmt.
Expand Down Expand Up @@ -911,9 +914,15 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
RecordDroppedKeys(range_del_out_stats,
&sub_compact->compaction_job_stats);
if (sub_compact->outputs.size() == 1) {
// Use dictionary from first output file for compression of subsequent
// files.
sub_compact->compression_dict = std::move(compression_dict);
// Use samples from first output file to create dictionary for
// compression of subsequent files.
if (kUseZstdTrainer) {
sub_compact->compression_dict = ZSTD_TrainDictionary(
dict_sample_data, kSampleLenShift,
cfd->ioptions()->compression_opts.max_dict_bytes);
} else {
sub_compact->compression_dict = std::move(dict_sample_data);
}
}
}
}
Expand Down
36 changes: 27 additions & 9 deletions db/db_test2.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1027,6 +1027,7 @@ TEST_F(DBTest2, PresetCompressionDict) {
const size_t kL0FileBytes = 128 << 10;
const size_t kApproxPerBlockOverheadBytes = 50;
const int kNumL0Files = 5;
const int kZstdTrainFactor = 16;

Options options;
options.env = CurrentOptions().env; // Make sure to use any custom env that the test is configured with.
Expand Down Expand Up @@ -1059,17 +1060,34 @@ TEST_F(DBTest2, PresetCompressionDict) {
for (auto compression_type : compression_types) {
options.compression = compression_type;
size_t prev_out_bytes;
for (int i = 0; i < 2; ++i) {
for (int i = 0; i < 3; ++i) {
// First iteration: compress without preset dictionary
// Second iteration: compress with preset dictionary
// To make sure the compression dictionary was actually used, we verify
// the compressed size is smaller in the second iteration. Also in the
// second iteration, verify the data we get out is the same data we put
// in.
if (i) {
options.compression_opts.max_dict_bytes = kBlockSizeBytes;
} else {
options.compression_opts.max_dict_bytes = 0;
// Third iteration (zstd only): compress with zstd-trained dictionary
//
// To make sure the compression dictionary has the intended effect, we
// verify the compressed size is smaller in successive iterations. Also in
// the non-first iterations, verify the data we get out is the same data
// we put in.
switch (i) {
case 0:
options.compression_opts.max_dict_bytes = 0;
options.compression_opts.zstd_max_train_bytes = 0;
break;
case 1:
options.compression_opts.max_dict_bytes = kBlockSizeBytes;
options.compression_opts.zstd_max_train_bytes = 0;
break;
case 2:
if (compression_type != kZSTD) {
continue;
}
options.compression_opts.max_dict_bytes = kBlockSizeBytes;
options.compression_opts.zstd_max_train_bytes =
kZstdTrainFactor * kBlockSizeBytes;
break;
default:
assert(false);
}

options.statistics = rocksdb::CreateDBStatistics();
Expand Down
44 changes: 34 additions & 10 deletions include/rocksdb/advanced_options.h
Original file line number Diff line number Diff line change
Expand Up @@ -90,23 +90,47 @@ struct CompressionOptions {
int window_bits;
int level;
int strategy;
// Maximum size of dictionary used to prime the compression library. Currently
// this dictionary will be constructed by sampling the first output file in a
// subcompaction when the target level is bottommost. This dictionary will be
// loaded into the compression library before compressing/uncompressing each
// data block of subsequent files in the subcompaction. Effectively, this
// improves compression ratios when there are repetitions across data blocks.
// A value of 0 indicates the feature is disabled.

// Maximum size of dictionaries used to prime the compression library.
// Enabling dictionary can improve compression ratios when there are
// repetitions across data blocks.
//
// The dictionary is created by sampling the SST file data. If
// `zstd_max_train_bytes` is nonzero, the samples are passed through zstd's
// dictionary generator. Otherwise, the random samples are used directly as
// the dictionary.
//
// When compression dictionary is disabled, we compress and write each block
// before buffering data for the next one. When compression dictionary is
// enabled, we buffer all SST file data in-memory so we can sample it, as data
// can only be compressed and written after the dictionary has been finalized.
// So users of this feature may see increased memory usage.
//
// Default: 0.
uint32_t max_dict_bytes;

// Maximum size of training data passed to zstd's dictionary trainer. Using
// zstd's dictionary trainer can achieve even better compression ratio
// improvements than using `max_dict_bytes` alone.
//
// The training data will be used to generate a dictionary of max_dict_bytes.
//
// Default: 0.
uint32_t zstd_max_train_bytes;

CompressionOptions()
: window_bits(-14), level(-1), strategy(0), max_dict_bytes(0) {}
CompressionOptions(int wbits, int _lev, int _strategy, int _max_dict_bytes)
: window_bits(-14),
level(-1),
strategy(0),
max_dict_bytes(0),
zstd_max_train_bytes(0) {}
CompressionOptions(int wbits, int _lev, int _strategy, int _max_dict_bytes,
int _zstd_max_train_bytes)
: window_bits(wbits),
level(_lev),
strategy(_strategy),
max_dict_bytes(_max_dict_bytes) {}
max_dict_bytes(_max_dict_bytes),
zstd_max_train_bytes(_zstd_max_train_bytes) {}
};

enum UpdateStatus { // Return status For inplace update callback
Expand Down
43 changes: 42 additions & 1 deletion util/compression.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,10 @@

#if defined(ZSTD)
#include <zstd.h>
#endif
#if ZSTD_VERSION_NUMBER >= 800 // v0.8.0+
#include <zdict.h>
#endif // ZSTD_VERSION_NUMBER >= 800
#endif // ZSTD

#if defined(XPRESS)
#include "port/xpress.h"
Expand Down Expand Up @@ -796,4 +799,42 @@ inline char* ZSTD_Uncompress(const char* input_data, size_t input_length,
return nullptr;
}

inline std::string ZSTD_TrainDictionary(const std::string& samples,
const std::vector<size_t>& sample_lens,
size_t max_dict_bytes) {
// Dictionary trainer is available since v0.6.1, but ZSTD was marked stable
// only since v0.8.0. For now we enable the feature in stable versions only.
#if ZSTD_VERSION_NUMBER >= 800 // v0.8.0+
std::string dict_data(max_dict_bytes, '\0');
size_t dict_len =
ZDICT_trainFromBuffer(&dict_data[0], max_dict_bytes, &samples[0],
&sample_lens[0], sample_lens.size());
if (ZDICT_isError(dict_len)) {
return "";
}
assert(dict_len <= max_dict_bytes);
dict_data.resize(dict_len);
return dict_data;
#else // up to v0.7.x
assert(false);
return "";
#endif // ZSTD_VERSION_NUMBER >= 800
}

inline std::string ZSTD_TrainDictionary(const std::string& samples,
size_t sample_len_shift,
size_t max_dict_bytes) {
// Dictionary trainer is available since v0.6.1, but ZSTD was marked stable
// only since v0.8.0. For now we enable the feature in stable versions only.
#if ZSTD_VERSION_NUMBER >= 800 // v0.8.0+
// skips potential partial sample at the end of "samples"
size_t num_samples = samples.size() >> sample_len_shift;
std::vector<size_t> sample_lens(num_samples, 1 << sample_len_shift);
return ZSTD_TrainDictionary(samples, sample_lens, max_dict_bytes);
#else // up to v0.7.x
assert(false);
return "";
#endif // ZSTD_VERSION_NUMBER >= 800
}

} // namespace rocksdb