Skip to content

Use a more robust metric for sorting (de)compression tasks #19656

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 5 commits into
base: branch-25.10
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 0 additions & 29 deletions cpp/src/io/comp/common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
#include "nvcomp_adapter.hpp"

#include <cudf/detail/nvtx/ranges.hpp>
#include <cudf/detail/utilities/vector_factories.hpp>
#include <cudf/io/types.hpp>

namespace cudf::io::detail {
Expand Down Expand Up @@ -56,32 +55,4 @@ namespace cudf::io::detail {
}
}

[[nodiscard]] size_t find_split_index(device_span<device_span<uint8_t const> const> inputs,
host_engine_state host_state,
size_t auto_mode_threshold,
size_t hybrid_mode_target_ratio,
rmm::cuda_stream_view stream)
{
CUDF_FUNC_RANGE();
if (host_state == host_engine_state::OFF or inputs.empty()) { return 0; }
if (host_state == host_engine_state::ON) { return inputs.size(); }

if (host_state == host_engine_state::AUTO) {
return inputs.size() < auto_mode_threshold ? inputs.size() : 0;
}

if (host_state == host_engine_state::HYBRID) {
auto const h_inputs = cudf::detail::make_host_vector(inputs, stream);
size_t total_host_size = 0;
for (size_t i = 0; i < h_inputs.size(); ++i) {
if (total_host_size >= hybrid_mode_target_ratio * h_inputs[i].size()) { return i; }
total_host_size += h_inputs[i].size();
}
return inputs.size(); // No split
}

CUDF_FAIL("Invalid host engine state for compression: " +
std::to_string(static_cast<uint8_t>(host_state)));
}

} // namespace cudf::io::detail
39 changes: 30 additions & 9 deletions cpp/src/io/comp/common_internal.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,10 @@ constexpr size_t default_host_compression_auto_threshold = 128;
constexpr size_t default_host_decompression_auto_threshold = 128;
// Estimated ratio between total CPU decompression throughput and decompression throughput of a
// single GPU block; higher values lead to more host decompression in HYBRID mode
constexpr double default_host_device_decompression_work_ratio = 100;
constexpr double default_host_device_decompression_cost_ratio = 100;
// Estimated ratio between total CPU compression throughput and compression throughput of a
// single GPU block; higher values lead to more host compression in HYBRID mode
constexpr double default_host_device_compression_work_ratio = 100;
constexpr double default_host_device_compression_cost_ratio = 100;

[[nodiscard]] std::optional<nvcomp::compression_type> to_nvcomp_compression(
compression_type compression);
Expand All @@ -59,9 +59,9 @@ struct sorted_codec_parameters {
};

/**
* @brief Sorts input and output spans by input size in descending order
* @brief Sorts input and output spans for decompression by input size in descending order
*
* This function creates a sorted view of the inputs and outputs where they are
* This function creates a sorted view of the inputs and outputs for decompression, where they are
* ordered by the size of each input span in descending order (largest first).
* This can reduce latency by processing larger chunks first.
*
Expand All @@ -71,10 +71,30 @@ struct sorted_codec_parameters {
* @param mr Memory resource to use for allocations of results
* @return sorted_codec_parameters containing sorted inputs, outputs, and original ordering
*/
sorted_codec_parameters sort_tasks(device_span<device_span<uint8_t const> const> inputs,
device_span<device_span<uint8_t> const> outputs,
rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr);
[[nodiscard]] sorted_codec_parameters sort_decompression_tasks(
device_span<device_span<uint8_t const> const> inputs,
device_span<device_span<uint8_t> const> outputs,
rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr);

/**
* @brief Sorts input and output spans for compression by output size in descending order
*
* This function creates a sorted view of the inputs and outputs for compression, where they are
* ordered by the size of each output span in descending order (largest first).
* This can reduce latency by processing larger chunks first.
*
* @param inputs Device spans of input data to be sorted
* @param outputs Device spans of output buffers corresponding to inputs
* @param stream CUDA stream for asynchronous execution
* @param mr Memory resource to use for allocations of results
* @return sorted_codec_parameters containing sorted inputs, outputs, and original ordering
*/
[[nodiscard]] sorted_codec_parameters sort_compression_tasks(
device_span<device_span<uint8_t const> const> inputs,
device_span<device_span<uint8_t> const> outputs,
rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr);

/**
* @brief Finds the split index for input data based on the specified thresholds and target ratio.
Expand All @@ -92,9 +112,10 @@ sorted_codec_parameters sort_tasks(device_span<device_span<uint8_t const> const>
* @return The index at which the input data should be split.
*/
[[nodiscard]] size_t find_split_index(device_span<device_span<uint8_t const> const> inputs,
device_span<device_span<uint8_t> const> outputs,
host_engine_state host_state,
size_t auto_mode_threshold,
size_t hybrid_mode_target_ratio,
size_t hybrid_mode_cost_ratio,
rmm::cuda_stream_view stream);

/**
Expand Down
23 changes: 12 additions & 11 deletions cpp/src/io/comp/compression.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -446,26 +446,27 @@ void compress(compression_type compression,

// sort inputs by size, largest first
auto const [sorted_inputs, sorted_outputs, order] =
sort_tasks(inputs, outputs, stream, cudf::get_current_device_resource_ref());
sort_compression_tasks(inputs, outputs, stream, cudf::get_current_device_resource_ref());
auto inputs_view = device_span<device_span<uint8_t const> const>(sorted_inputs);
auto outputs_view = device_span<device_span<uint8_t> const>(sorted_outputs);

auto tmp_results = cudf::detail::make_device_uvector_async<detail::codec_exec_result>(
results, stream, cudf::get_current_device_resource_ref());
auto results_view = device_span<codec_exec_result>(tmp_results);

auto const split_idx = detail::find_split_index(
auto const split_idx = find_split_index(
inputs_view,
detail::get_host_engine_state(compression),
outputs_view,
get_host_engine_state(compression),
getenv_or("LIBCUDF_HOST_COMPRESSION_THRESHOLD", default_host_compression_auto_threshold),
getenv_or("LIBCUDF_HOST_COMPRESSION_RATIO", default_host_device_compression_work_ratio),
getenv_or("LIBCUDF_HOST_COMPRESSION_RATIO", default_host_device_compression_cost_ratio),
stream);

auto tmp_results = cudf::detail::make_device_uvector_async<detail::codec_exec_result>(
results, stream, cudf::get_current_device_resource_ref());
auto results_view = device_span<codec_exec_result>(tmp_results);

auto const streams = cudf::detail::fork_streams(stream, 2);
detail::device_compress(compression,
inputs_view.subspan(split_idx, sorted_inputs.size() - split_idx),
outputs_view.subspan(split_idx, sorted_outputs.size() - split_idx),
results_view.subspan(split_idx, tmp_results.size() - split_idx),
inputs_view.subspan(split_idx, inputs_view.size() - split_idx),
outputs_view.subspan(split_idx, outputs_view.size() - split_idx),
results_view.subspan(split_idx, results_view.size() - split_idx),
streams[0]);
detail::host_compress(compression,
inputs_view.subspan(0, split_idx),
Expand Down
19 changes: 10 additions & 9 deletions cpp/src/io/comp/decompression.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -751,26 +751,27 @@ void decompress(compression_type compression,

// sort inputs by size, largest first
auto const [sorted_inputs, sorted_outputs, order] =
sort_tasks(inputs, outputs, stream, cudf::get_current_device_resource_ref());
sort_decompression_tasks(inputs, outputs, stream, cudf::get_current_device_resource_ref());
auto inputs_view = device_span<device_span<uint8_t const> const>(sorted_inputs);
auto outputs_view = device_span<device_span<uint8_t> const>(sorted_outputs);

auto tmp_results = cudf::detail::make_device_uvector_async<detail::codec_exec_result>(
results, stream, cudf::get_current_device_resource_ref());
auto results_view = device_span<codec_exec_result>(tmp_results);

auto const split_idx = find_split_index(
inputs_view,
outputs_view,
get_host_engine_state(compression),
getenv_or("LIBCUDF_HOST_DECOMPRESSION_THRESHOLD", default_host_decompression_auto_threshold),
getenv_or("LIBCUDF_HOST_DECOMPRESSION_RATIO", default_host_device_decompression_work_ratio),
getenv_or("LIBCUDF_HOST_DECOMPRESSION_RATIO", default_host_device_decompression_cost_ratio),
stream);

auto tmp_results = cudf::detail::make_device_uvector_async<detail::codec_exec_result>(
results, stream, cudf::get_current_device_resource_ref());
auto results_view = device_span<codec_exec_result>(tmp_results);

auto const streams = cudf::detail::fork_streams(stream, 2);
detail::device_decompress(compression,
inputs_view.subspan(split_idx, sorted_inputs.size() - split_idx),
outputs_view.subspan(split_idx, sorted_outputs.size() - split_idx),
results_view.subspan(split_idx, tmp_results.size() - split_idx),
inputs_view.subspan(split_idx, inputs_view.size() - split_idx),
outputs_view.subspan(split_idx, outputs_view.size() - split_idx),
results_view.subspan(split_idx, results_view.size() - split_idx),
max_uncomp_chunk_size,
max_total_uncomp_size,
streams[0]);
Expand Down
128 changes: 111 additions & 17 deletions cpp/src/io/comp/gpuinflate.cu
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ Mark Adler [email protected]
#include "io/utilities/block_utils.cuh"

#include <cudf/detail/nvtx/ranges.hpp>
#include <cudf/detail/utilities/vector_factories.hpp>

#include <rmm/cuda_stream_view.hpp>
#include <rmm/device_uvector.hpp>
Expand All @@ -59,6 +60,8 @@ Mark Adler [email protected]

namespace cudf::io::detail {

namespace {

constexpr int max_bits = 15; // maximum bits in a code
constexpr int max_l_codes = 286; // maximum number of literal/length codes
constexpr int max_d_codes = 30; // maximum number of distance codes
Expand Down Expand Up @@ -1208,31 +1211,46 @@ CUDF_KERNEL void __launch_bounds__(1024)
if (t < len) { dst[t] = src[t]; }
}

void gpuinflate(device_span<device_span<uint8_t const> const> inputs,
device_span<device_span<uint8_t> const> outputs,
device_span<codec_exec_result> results,
gzip_header_included parse_hdr,
rmm::cuda_stream_view stream)
enum class task_type { DECOMPRESSION, COMPRESSION };
// Relative cost of the trivial cases (incompressible data)
constexpr double trivial_case_cost_ratio = 0.1;

CUDF_HOST_DEVICE double cost_factor(size_t input_size, size_t output_size, task_type task_type)
{
constexpr int block_size = 128; // Threads per block
if (inputs.size() > 0) {
inflate_kernel<block_size>
<<<inputs.size(), block_size, 0, stream.value()>>>(inputs, outputs, results, parse_hdr);
if (task_type == task_type::DECOMPRESSION) {
auto const compression_ratio = std::max(1., static_cast<double>(output_size) / input_size);
// When the compression ratio is one, the cost factor is the same as the copy cost ratio,
// meaning that the cost of decompressing the block is the same as the cost of copying it. The
// cost factor asymptotes to one as the compression ratio increases, meaning that the cost
// approaches the base cost of decompressing (which is a lot higher than the copy cost)
return 1. - (1. - trivial_case_cost_ratio) / std::pow(compression_ratio, 4);
} else {
// We don't know the compression ratio for compression, so use a constant cost factor
return 1.;
}
}

void gpu_copy_uncompressed_blocks(device_span<device_span<uint8_t const> const> inputs,
device_span<device_span<uint8_t> const> outputs,
rmm::cuda_stream_view stream)
CUDF_HOST_DEVICE double task_device_cost(size_t input_size, size_t output_size, task_type task_type)
{
if (inputs.size() > 0) {
copy_uncompressed_kernel<<<inputs.size(), 1024, 0, stream.value()>>>(inputs, outputs);
}
return cost_factor(input_size, output_size, task_type) * input_size;
}

CUDF_HOST_DEVICE double task_host_cost(size_t input_size,
size_t output_size,
double device_host_ratio,
task_type task_type)
{
// Cost to copy the block to host and back; NOTE: assumes that the copy throughput is the same as
// the decompression/compression throughput when the data is incompressible
auto const copy_cost = trivial_case_cost_ratio * (input_size + output_size);
return (cost_factor(input_size, output_size, task_type) * input_size + copy_cost) /
device_host_ratio;
}

sorted_codec_parameters sort_tasks(device_span<device_span<uint8_t const> const> inputs,
device_span<device_span<uint8_t> const> outputs,
rmm::cuda_stream_view stream,
task_type task_type,
rmm::device_async_resource_ref mr)
{
CUDF_FUNC_RANGE();
Expand All @@ -1241,8 +1259,9 @@ sorted_codec_parameters sort_tasks(device_span<device_span<uint8_t const> const>
thrust::sort(rmm::exec_policy_nosync(stream),
order.begin(),
order.end(),
[inputs] __device__(std::size_t a, std::size_t b) {
return inputs[a].size() > inputs[b].size();
[inputs, outputs, task_type] __device__(std::size_t a, std::size_t b) {
return task_device_cost(inputs[a].size(), outputs[a].size(), task_type) >
task_device_cost(inputs[b].size(), outputs[b].size(), task_type);
});

auto sorted_inputs = rmm::device_uvector<device_span<uint8_t const>>(inputs.size(), stream, mr);
Expand All @@ -1262,6 +1281,47 @@ sorted_codec_parameters sort_tasks(device_span<device_span<uint8_t const> const>
return {std::move(sorted_inputs), std::move(sorted_outputs), std::move(order)};
}

} // namespace

void gpuinflate(device_span<device_span<uint8_t const> const> inputs,
device_span<device_span<uint8_t> const> outputs,
device_span<codec_exec_result> results,
gzip_header_included parse_hdr,
rmm::cuda_stream_view stream)
{
constexpr int block_size = 128; // Threads per block
if (inputs.size() > 0) {
inflate_kernel<block_size>
<<<inputs.size(), block_size, 0, stream.value()>>>(inputs, outputs, results, parse_hdr);
}
}

void gpu_copy_uncompressed_blocks(device_span<device_span<uint8_t const> const> inputs,
device_span<device_span<uint8_t> const> outputs,
rmm::cuda_stream_view stream)
{
if (inputs.size() > 0) {
copy_uncompressed_kernel<<<inputs.size(), 1024, 0, stream.value()>>>(inputs, outputs);
}
}

sorted_codec_parameters sort_decompression_tasks(
device_span<device_span<uint8_t const> const> inputs,
device_span<device_span<uint8_t> const> outputs,
rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr)
{
return sort_tasks(inputs, outputs, stream, task_type::DECOMPRESSION, mr);
}

sorted_codec_parameters sort_compression_tasks(device_span<device_span<uint8_t const> const> inputs,
device_span<device_span<uint8_t> const> outputs,
rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr)
{
return sort_tasks(inputs, outputs, stream, task_type::COMPRESSION, mr);
}

void copy_results_to_original_order(device_span<codec_exec_result const> sorted_results,
device_span<codec_exec_result> original_results,
device_span<std::size_t const> order,
Expand All @@ -1274,4 +1334,38 @@ void copy_results_to_original_order(device_span<codec_exec_result const> sorted_
original_results.begin());
}

[[nodiscard]] size_t find_split_index(device_span<device_span<uint8_t const> const> inputs,
device_span<device_span<uint8_t> const> outputs,
host_engine_state host_state,
size_t auto_mode_threshold,
size_t hybrid_mode_cost_ratio,
rmm::cuda_stream_view stream)
{
CUDF_FUNC_RANGE();
if (host_state == host_engine_state::OFF or inputs.empty()) { return 0; }
if (host_state == host_engine_state::ON) { return inputs.size(); }

if (host_state == host_engine_state::AUTO) {
return inputs.size() < auto_mode_threshold ? inputs.size() : 0;
}

if (host_state == host_engine_state::HYBRID) {
auto const h_inputs = cudf::detail::make_host_vector(inputs, stream);
auto const h_outputs = cudf::detail::make_host_vector(outputs, stream);
double total_host_cost = 0;
for (size_t i = 0; i < h_inputs.size(); ++i) {
if (total_host_cost >=
task_device_cost(h_inputs[i].size(), h_outputs[i].size(), task_type::DECOMPRESSION)) {
return i;
}
total_host_cost += task_host_cost(
h_inputs[i].size(), h_outputs[i].size(), hybrid_mode_cost_ratio, task_type::DECOMPRESSION);
}
return inputs.size(); // No split
}

CUDF_FAIL("Invalid host engine state for compression: " +
std::to_string(static_cast<uint8_t>(host_state)));
}

} // namespace cudf::io::detail