diff --git a/cpp/src/arrow/util/CMakeLists.txt b/cpp/src/arrow/util/CMakeLists.txt index 28ea215bd23..628e9a4d1c7 100644 --- a/cpp/src/arrow/util/CMakeLists.txt +++ b/cpp/src/arrow/util/CMakeLists.txt @@ -99,6 +99,7 @@ add_arrow_test(bit-utility-test bit_util_test.cc bitmap_test.cc bpacking_test.cc + rle_bitmap_test.cc rle_encoding_test.cc test_common.cc) diff --git a/cpp/src/arrow/util/bit_util.h b/cpp/src/arrow/util/bit_util.h index bffc3c14a10..cd6d15f9134 100644 --- a/cpp/src/arrow/util/bit_util.h +++ b/cpp/src/arrow/util/bit_util.h @@ -18,9 +18,12 @@ #pragma once #include +#include #include +#include #include +#include "arrow/util/endian.h" #include "arrow/util/macros.h" #include "arrow/util/visibility.h" @@ -176,6 +179,39 @@ static constexpr bool GetBitFromByte(uint8_t byte, uint8_t i) { return byte & kBitmask[i]; } +template +struct CopyBitsParams { + Uint src = {}; + Uint dst = {}; + int start = {}; + int end = {}; +}; + +/// Copy a contiguous span of bits from src into dst. +/// +/// Copy bits [start, end[ from src into the position [start, end[ in dst +/// and return the result (inputs are unmodified). +/// Setting ``kAllowFullCopy`` to false is an optimization when the caller can +/// guarantee that the range of bits to copy does not cover the whole range. +template +[[nodiscard]] constexpr Uint CopyBitsInInteger(const CopyBitsParams& params) { + constexpr auto kUintSizeBits = static_cast(sizeof(Uint) * 8); + assert(params.start <= params.end); + assert(params.start < kUintSizeBits); + assert(params.end <= kUintSizeBits); + + const int length = params.end - params.start; + if constexpr (kAllowFullCopy) { + if (length == kUintSizeBits) { + return params.src; + } + } + assert(length < kUintSizeBits); + const Uint mask = + static_cast(LeastSignificantBitMask(length) << params.start); + return (~mask & params.dst) | (mask & params.src); +} + static inline void ClearBit(uint8_t* bits, int64_t i) { bits[i / 8] &= kFlippedBitmask[i % 8]; } diff --git a/cpp/src/arrow/util/bit_util_test.cc b/cpp/src/arrow/util/bit_util_test.cc index 1e7714540ee..a6af004f7c5 100644 --- a/cpp/src/arrow/util/bit_util_test.cc +++ b/cpp/src/arrow/util/bit_util_test.cc @@ -1065,4 +1065,47 @@ TEST(SpliceWord, SpliceWord) { 0xfedc456789abcdef); } +TEST(BitUtil, CopyBits) { + // Copy bits [start, end[ from src into dst, keeping dst's other bits. + using bit_util::CopyBitsInInteger; + + // Empty range: result equals dst. + ASSERT_EQ(CopyBitsInInteger( + {.src = 0b11111111, .dst = 0b00010010, .start = 3, .end = 3}), + 0b00010010); + // dst = 0101 0101, src = 1010 1010 -> 0101 1010 + ASSERT_EQ(CopyBitsInInteger( + {.src = 0b10101010, .dst = 0b01010101, .start = 0, .end = 4}), + 0b01011010); + // Copy a middle span [2, 5[ of all-ones into an all-zeros dst. + ASSERT_EQ(CopyBitsInInteger( + {.src = 0b11111111, .dst = 0b00000000, .start = 2, .end = 5}), + 0b00011100); + // Copy a middle span [2, 5[ of all-zeros into an all-ones dst. + ASSERT_EQ(CopyBitsInInteger( + {.src = 0b00000000, .dst = 0b11111111, .start = 2, .end = 5}), + 0b11100011); + // Full-word copy returns src unchanged. + ASSERT_EQ(CopyBitsInInteger( + {.src = 0b10101011, .dst = 0b00010010, .start = 0, .end = 8}), + 0b10101011); + // uint16_t partial range [4, 12[: dst keeps its bits outside, src fills inside. + ASSERT_EQ( + CopyBitsInInteger( + {.src = 0b1010101010101010, .dst = 0b0101010101010101, .start = 4, .end = 12}), + 0b0101101010100101); + // uint64_t + ASSERT_EQ(CopyBitsInInteger({ + .src = 0x0123456789abcdef, + .dst = 0xfedcba9876543210, + .start = 0, + .end = 64, + }), + 0x0123456789abcdef); + // constexpr-evaluable. + static_assert(CopyBitsInInteger( + {.src = 0b10101010, .dst = 0b01010101, .start = 0, .end = 4}) == + 0b01011010); +} + } // namespace arrow diff --git a/cpp/src/arrow/util/rle_bitmap_internal.h b/cpp/src/arrow/util/rle_bitmap_internal.h new file mode 100644 index 00000000000..0667a9f0a8e --- /dev/null +++ b/cpp/src/arrow/util/rle_bitmap_internal.h @@ -0,0 +1,369 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include +#include + +#include "arrow/util/bit_util.h" +#include "arrow/util/bitmap_ops.h" +#include "arrow/util/logging.h" +#include "arrow/util/macros.h" +#include "arrow/util/rle_encoding_internal.h" + +namespace arrow::util { + +/// A lightweight view over a bitmap. +template +class BitmapSpan { + public: + using size_type = rle_size_t; + using byte_type = B; + + explicit constexpr BitmapSpan(byte_type* data, size_type bit_start = 0) noexcept + : data_{data}, bit_start_{bit_start} { + Normalize(); + } + + /// Pointer to the byte where the first value is stored. + constexpr byte_type* data() const noexcept { return data_; } + + /// Bit offset of the first value in the first byte. + constexpr size_type bit_start() const noexcept { return bit_start_; } + + /// Return a new span starting at the given position. + constexpr BitmapSpan NewStartingAt(size_type bit_start) const noexcept { + auto out = *this; + out.bit_start_ += bit_start; + out.Normalize(); + return out; + } + + private: + byte_type* data_; + size_type bit_start_ = 0; + + /// Ensure `bit_start` always ends up in [0, 8[, even when it starts negative. + constexpr void Normalize() { + // On two's-complement targets an arithmetic right shift floors, and the AND mask + // yields the non-negative remainder (e.g. -1 -> byte -1, offset 7), unlike `/` and + // `%` which truncate toward zero. + data_ += bit_start_ >> 3; + bit_start_ &= 0b0111; + } +}; + +using BitmapSpanMut = BitmapSpan; +using BitmapSpanConst = BitmapSpan; + +class RleRunToBitmapDecoder { + public: + /// The type of run that can be decoded. + using RunType = RleRun; + + constexpr RleRunToBitmapDecoder() noexcept = default; + + explicit RleRunToBitmapDecoder(const RunType& run) noexcept { Reset(run); } + + void Reset(const RunType& run) noexcept { + values_left_ = run.values_count(); + if (run.value_little_endian() == 0) { + value_pattern_ = uint8_t{0}; + } else { + value_pattern_ = uint8_t{0xFF}; + } + } + + /// Return the number of values that can be advanced. + rle_size_t remaining() const { return values_left_; } + + /// Return the repeated value of this decoder. + constexpr bool value() const { return value_pattern_ != 0; } + + /// Return how much the decoder would advance if asked to. + /// + /// Does not modify input. + rle_size_t GetAdvanceCapacity(rle_size_t batch_size) const noexcept { + const auto n_vals = std::min(batch_size, remaining()); + return n_vals; + } + + /// Advance by as many values as provided or until exhaustion of the decoder. + /// Return the number of values skipped. + [[nodiscard]] rle_size_t Advance(rle_size_t batch_size) { + const auto n_vals = GetAdvanceCapacity(batch_size); + values_left_ -= n_vals; + ARROW_DCHECK_GE(remaining(), 0); + return n_vals; + } + + /// Read the next value into `out` and return false if there are no more. + [[nodiscard]] bool Get(BitmapSpanMut out) { return GetBatch(out, 1) == 1; } + + /// Get a batch of values into `out` and return the number of decoded elements. + /// + /// May write fewer elements to the output than requested if there are not + /// enough values left. + [[nodiscard]] rle_size_t GetBatch(BitmapSpanMut out, rle_size_t batch_size) { + const auto out_bit_offset = out.bit_start(); + ARROW_DCHECK_GE(out_bit_offset, 0); + ARROW_DCHECK_LT(out_bit_offset, 8); + + if (ARROW_PREDICT_FALSE(remaining() == 0 || batch_size == 0)) { + return 0; + } + + rle_size_t n_vals = 0; + + // HEADER: Writing inside the first byte if caller gives a non-aligned input + if (out_bit_offset != 0) { + n_vals = GetBatchInByte(out, batch_size); + } + + // Writing full bytes + n_vals += GetBatchFullBytes(out.NewStartingAt(n_vals), batch_size - n_vals); + + // TRAILER: Writing inside the last byte if caller asked for non multiple of 8 values + const auto n_last_vals = std::min(batch_size - n_vals, remaining()); + if (n_last_vals > 0) { + ARROW_DCHECK_LT(n_last_vals, 8); + n_vals += GetBatchInByte(out.NewStartingAt(n_vals), n_last_vals); + } + + return n_vals; + } + + private: + /// The byte pattern for 8 values (full ones or full zeros). + uint8_t value_pattern_ = {}; + /// Number of values left to decode. + rle_size_t values_left_ = 0; + + void AdvanceUnsafe(rle_size_t batch_size) { values_left_ -= batch_size; } + + /// Get batch values to fill the first incomplete byte of the output. + [[nodiscard]] rle_size_t GetBatchInByte(BitmapSpanMut out, rle_size_t batch_size) { + const auto out_bit_offset = out.bit_start(); + ARROW_DCHECK_GE(out_bit_offset, 0); + ARROW_DCHECK_LT(out_bit_offset, 8); + ARROW_DCHECK_GT(remaining(), 0); + ARROW_DCHECK_GE(batch_size, 0); + + // Empty bits in first byte that we can fill + const auto empty_bits = rle_size_t{8} - out_bit_offset; + // Number of bits in first byte that we want to fill + const auto desired_bits = std::min(empty_bits, batch_size); + // Try to advance, and get number of bits we had remaining + const auto n_bits = Advance(desired_bits); + // Copy relevant bits from the value pattern to the output. + *out.data() = bit_util::CopyBitsInInteger({ + .src = value_pattern_, + .dst = *out.data(), + .start = static_cast(out_bit_offset), + .end = static_cast(out_bit_offset + n_bits), + }); + + return n_bits; + } + + /// Get batch in full bytes using memset. + [[nodiscard]] rle_size_t GetBatchFullBytes(BitmapSpanMut out, rle_size_t batch_size) { + const auto n_bytes = std::min(batch_size, remaining()) / 8; + ARROW_DCHECK(out.bit_start() == 0 || n_bytes == 0); + std::memset(out.data(), value_pattern_, n_bytes); + const auto n_vals = 8 * n_bytes; + AdvanceUnsafe(n_vals); + return n_vals; + } +}; + +class BitPackedRunToBitmapDecoder { + public: + /// The type of run that can be decoded. + using RunType = BitPackedRun; + + constexpr BitPackedRunToBitmapDecoder() noexcept = default; + + explicit BitPackedRunToBitmapDecoder(const RunType& run) noexcept { Reset(run); } + + void Reset(const RunType& run) noexcept { + data_ = run.raw_data_ptr(); + values_count_ = run.values_count(); + values_read_ = 0; + ARROW_DCHECK(run.raw_data_max_size() < 0 || + bit_util::BytesForBits(values_count_) <= run.raw_data_max_size()); + } + + /// Return the number of values that can be advanced. + constexpr rle_size_t remaining() const { return values_count_ - values_read_; } + + /// Return how much the decoder would advance if asked to. + /// + /// Does not modify input. + rle_size_t GetAdvanceCapacity(rle_size_t batch_size) const noexcept { + const auto n_vals = std::min(batch_size, remaining()); + return n_vals; + } + + /// Advance by as many values as provided or until exhaustion of the decoder. + /// Return the number of values skipped. + [[nodiscard]] rle_size_t Advance(rle_size_t batch_size) { + const auto n_vals = GetAdvanceCapacity(batch_size); + values_read_ += n_vals; + ARROW_DCHECK_GE(remaining(), 0); + return n_vals; + } + + /// Get the next value and return false if there are no more. + [[nodiscard]] bool Get(BitmapSpanMut out) { return GetBatch(out, 1) == 1; } + + /// Get a batch of values return the number of decoded elements. + /// May write fewer elements to the output than requested if there are not enough values + /// left. + [[nodiscard]] rle_size_t GetBatch(BitmapSpanMut out, rle_size_t batch_size) { + auto n_vals = GetAdvanceCapacity(batch_size); + arrow::internal::CopyBitmap(unread_values_ptr(), unread_values_bit_offset(), n_vals, + out.data(), out.bit_start()); + return Advance(n_vals); + } + + private: + /// The pointer to the beginning of the run + const uint8_t* data_ = nullptr; + /// The total number of values in the run + rle_size_t values_count_ = 0; + /// The number of values read by the decoder + rle_size_t values_read_ = 0; + + /// Start pointer of the unread values (may contain values already read). + const uint8_t* unread_values_ptr() const noexcept { return data_ + (values_read_ / 8); } + + /// Bit in @ref unread_values_ptr where the unread values start. + rle_size_t unread_values_bit_offset() const noexcept { return values_read_ % 8; } +}; + +/// A specialized decoder class to extract RLE+bitpacked booleans. +/// +/// In some cases, such as when reading definition levels for nullable values (with +/// no repetition and no nesting), we know values to be decoded will end up in an +/// Arrow validity bitmap. In such cases, decoding values to a ``int16`` before +/// encoding them again in overly wasteful. +class RleBitPackedToBitmapDecoder { + public: + RleBitPackedToBitmapDecoder() noexcept = default; + + /// Create a decoder object. + /// + /// data and data_size are the raw bytes to decode. + RleBitPackedToBitmapDecoder(const uint8_t* data, rle_size_t data_size) noexcept { + Reset(data, data_size); + } + + void Reset(const uint8_t* data, rle_size_t data_size) noexcept { + parser_.Reset(data, data_size, /* value_bit_width= */ 1); + decoder_ = {}; + } + + /// Whether there is still runs to iterate over. + bool exhausted() const { return (run_remaining() == 0) && parser_.exhausted(); } + + /// Get a batch of values return the number of decoded elements. + /// May write fewer elements to the output than requested if there are not enough + /// values left or if an error occurred. + [[nodiscard]] rle_size_t GetBatch(BitmapSpanMut out, rle_size_t batch_size); + + private: + RleBitPackedParser parser_ = {}; + std::variant decoder_ = {}; + + /// Return the number of values that are remaining in the current run. + rle_size_t run_remaining() const { + return std::visit([](const auto& dec) { return dec.remaining(); }, decoder_); + } + + /// Get a batch of values from the current run and return the number elements read. + [[nodiscard]] rle_size_t RunGetBatch(BitmapSpanMut out, rle_size_t batch_size) { + return std::visit([&](auto& dec) { return dec.GetBatch(out, batch_size); }, decoder_); + } +}; + +/************************************************ + * RleBitPackedToBitmapDecoder implementation * + ************************************************/ + +/// Utility to map a run type to the associate decoder. +template +struct RleBitPackedToBitmapDecoderGetDecoder; + +template <> +struct RleBitPackedToBitmapDecoderGetDecoder { + using type = RleRunToBitmapDecoder; +}; + +template <> +struct RleBitPackedToBitmapDecoderGetDecoder { + using type = BitPackedRunToBitmapDecoder; +}; + +inline auto RleBitPackedToBitmapDecoder::GetBatch(BitmapSpanMut out, + rle_size_t batch_size) -> rle_size_t { + using ControlFlow = RleBitPackedParser::ControlFlow; + + if (ARROW_PREDICT_FALSE(batch_size == 0 || exhausted())) { + return 0; + } + + rle_size_t values_read = 0; + + // Remaining from a previous call that would have left some unread data from a run. + if (ARROW_PREDICT_FALSE(run_remaining() > 0)) { + const auto read = RunGetBatch(out, batch_size); + values_read += read; + + // Either we fulfilled all the batch to be read or we finished remaining run. + if (ARROW_PREDICT_FALSE(values_read == batch_size)) { + return values_read; + } + ARROW_DCHECK(run_remaining() == 0); + } + + parser_.ParseWithCallable([&](auto run) { + using RunDecoder = RleBitPackedToBitmapDecoderGetDecoder::type; + + ARROW_DCHECK_LT(values_read, batch_size); + RunDecoder decoder(run); + // The output span carries its own bit offset, so advancing it past the values + // already written keeps successive runs correctly aligned in the bitmap. + const auto read = + decoder.GetBatch(out.NewStartingAt(values_read), batch_size - values_read); + ARROW_DCHECK_LE(read, batch_size - values_read); + values_read += read; + + // Stop reading and store remaining decoder + if (ARROW_PREDICT_FALSE(values_read == batch_size || read == 0)) { + decoder_ = std::move(decoder); + return ControlFlow::Break; + } + + return ControlFlow::Continue; + }); + + return values_read; +} + +} // namespace arrow::util diff --git a/cpp/src/arrow/util/rle_bitmap_test.cc b/cpp/src/arrow/util/rle_bitmap_test.cc new file mode 100644 index 00000000000..ef7524407d3 --- /dev/null +++ b/cpp/src/arrow/util/rle_bitmap_test.cc @@ -0,0 +1,598 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include +#include +#include +#include +#include + +#include + +#include "arrow/testing/gtest_util.h" +#include "arrow/util/bit_util.h" +#include "arrow/util/rle_bitmap_internal.h" +#include "arrow/util/rle_encoding_internal.h" + +namespace arrow::util { + +namespace { + +/// Make a vector of `size` pseudo-random bytes, deterministic for a given `seed`. +std::vector MakeRandomBytes(size_t size, uint32_t seed = 56) { + std::vector bytes(size); + std::minstd_rand gen(seed); + std::uniform_int_distribution dist(0, 255); // no standard support for uint8_t + for (auto& byte : bytes) { + byte = static_cast(dist(gen)); + } + return bytes; +} + +/// Read the first `count` bits of `bytes` (LSB first) into a vector of booleans. +std::vector BitsFromBytes(const std::vector& bytes, rle_size_t count) { + std::vector bits(count); + for (rle_size_t i = 0; i < count; ++i) { + bits[i] = bit_util::GetBit(bytes.data(), i); + } + return bits; +} + +struct CheckDecodedBitsParams { + const std::vector& actual; + const std::vector& expected; + rle_size_t count; + rle_size_t actual_start_bit = 0; + rle_size_t expected_start_idx = 0; +}; + +/// Check the decoded output in `out` against `expected`. +void CheckDecodedBits(const CheckDecodedBitsParams& params) { + ARROW_SCOPED_TRACE("out_start_bit = ", params.actual_start_bit, + ", expected_start_idx = ", params.expected_start_idx); + for (rle_size_t i = 0; i < params.count; ++i) { + ASSERT_EQ(bit_util::GetBit(params.actual.data(), params.actual_start_bit + i), + params.expected[params.expected_start_idx + i]) + << "first difference at bit " << i; + } +} + +struct CheckBitsEqualParams { + const std::vector& actual; + const std::vector& expected; + rle_size_t count; + rle_size_t actual_start_bit = 0; + rle_size_t expected_start_bit = 0; +}; + +/// Check that two bit ranges, stored in `actual` and `expected`, are equal. +void CheckBitsEqual(const CheckBitsEqualParams& params) { + ARROW_SCOPED_TRACE("actual_start_bit = ", params.actual_start_bit, + ", expected_start_bit = ", params.expected_start_bit); + for (rle_size_t i = 0; i < params.count; ++i) { + ASSERT_EQ(bit_util::GetBit(params.actual.data(), params.actual_start_bit + i), + bit_util::GetBit(params.expected.data(), params.expected_start_bit + i)) + << "first difference at bit " << i; + } +} + +/// Skip the first `expected_skip` values with Advance(), then decode the rest of the run +/// into one output bitmap, `chunk_size` values at a time. Compare against `expected`. +/// +/// `chunk_size` controls output bit alignment. When `chunk_size` is not a multiple of 8, +/// later calls start at a non-zero output bit offset. +/// +/// `expected_skip` shifts the decoder's read offset relative to the output offset. +/// A non-zero `expected_skip` makes the two differ, which exercises the bit-unaligned +/// read path of BitPackedRunToBitmapDecoder. With `expected_skip == 0` they stay in sync +/// and only the aligned path runs. +template +void CheckDecoderValuesChunked(const typename Decoder::RunType& run, + const std::vector& expected, + rle_size_t chunk_size = 1, rle_size_t expected_skip = 0) { + ARROW_SCOPED_TRACE("chunk_size = ", chunk_size, ", expected_skip = ", expected_skip); + + const auto n_vals = static_cast(expected.size()); + ASSERT_LE(expected_skip, n_vals); + + Decoder decoder(run); + const auto advanced = decoder.Advance(expected_skip); + ASSERT_EQ(advanced, expected_skip); + const auto n_vals_to_decode = n_vals - expected_skip; + + // Output buffer + const auto n_bytes = static_cast(bit_util::BytesForBits(n_vals_to_decode)); + std::vector out(n_bytes, 0); + + rle_size_t n_val_read = 0; + while (n_val_read < n_vals_to_decode) { + const auto want = std::min(chunk_size, n_vals_to_decode - n_val_read); + const auto got = + decoder.GetBatch(BitmapSpanMut(out.data(), /*bit_start=*/n_val_read), want); + EXPECT_EQ(got, want) << "at pos " << n_val_read; + ASSERT_GT(got, 0) << "at pos " << n_val_read; // break on failure + n_val_read += got; + EXPECT_EQ(decoder.remaining(), n_vals_to_decode - n_val_read); + } + + EXPECT_EQ(decoder.remaining(), 0); + CheckDecodedBits({ + .actual = out, + .expected = expected, + .count = n_vals_to_decode, + .actual_start_bit = 0, + .expected_start_idx = expected_skip, + }); +} + +/// Decode a chunk of data into a known output to check for out of bounds write. +/// +/// @see CheckDecoderValuesChunked +template +void CheckDecoderClobber(const typename Decoder::RunType& run, + const std::vector& expected, rle_size_t chunk_size = 1, + rle_size_t expected_skip = 0) { + ARROW_SCOPED_TRACE("chunk_size = ", chunk_size, ", expected_skip = ", expected_skip); + + const auto n_vals = static_cast(expected.size()); + ASSERT_LE(expected_skip, n_vals); + + Decoder decoder(run); + const auto advanced = decoder.Advance(expected_skip); + ASSERT_EQ(advanced, expected_skip); + const auto n_vals_to_decode = n_vals - expected_skip; + + // Output buffer with enough capacity to store a full chunk plus extra bytes as + // clobbers/guard to check for out of bounds write. + const auto n_bytes = static_cast(bit_util::BytesForBits(chunk_size) + + bit_util::CeilDiv(n_vals, chunk_size) + 2); + // This seed is arbitrary and of little importance. We are simply trying to avoid an + // unlikely case where guards have the same pattern in all invocations. + const auto out_pattern = + MakeRandomBytes(n_bytes, /* seed= */ (chunk_size << 16) ^ expected_skip); + auto out = out_pattern; + + rle_size_t n_val_read = 0; + rle_size_t out_bit_start = 0; + while (n_val_read < n_vals_to_decode) { + // Clean output buffer + out = out_pattern; + const auto want = std::min(chunk_size, n_vals_to_decode - n_val_read); + const auto got = decoder.GetBatch(BitmapSpanMut(out.data(), out_bit_start), want); + ASSERT_GT(got, 0) << "at pos " << n_val_read; // break on failure + EXPECT_EQ(got, want) << "at pos " << n_val_read; + // Check that the leading bits have not been modified + CheckBitsEqual({.actual = out, .expected = out_pattern, .count = out_bit_start}); + // Check that the trailing bits have not been modified + CheckBitsEqual({ + .actual = out, + .expected = out_pattern, + .count = static_cast(8 * n_bytes) - (out_bit_start + want), + .actual_start_bit = out_bit_start + want, + .expected_start_bit = out_bit_start + want, + }); + // Check decoded bits are also correct + CheckDecodedBits({ + .actual = out, + .expected = expected, + .count = want, + .actual_start_bit = out_bit_start, + .expected_start_idx = expected_skip + n_val_read, + }); + + n_val_read += got; + ++out_bit_start; + EXPECT_EQ(decoder.remaining(), n_vals_to_decode - n_val_read); + } +} + +/// All the checks shared by both decoder types. +/// +/// `expected` is the full sequence of booleans the run should decode to. +template +void CheckBitmapDecoder(const typename Decoder::RunType& run, + const std::vector& expected) { + const auto n_vals = static_cast(expected.size()); + + // remaining() reflects the run size before any value is read. + { + Decoder decoder(run); + EXPECT_EQ(decoder.remaining(), n_vals); + } + + // Empty requests are a no-op. + { + Decoder decoder(run); + uint8_t out = 0; + const auto got = decoder.GetBatch(BitmapSpanMut(&out), /*batch_size=*/0); + EXPECT_EQ(got, 0); + EXPECT_EQ(decoder.remaining(), n_vals); + } + + // Decode the whole run in several chunks. + for (const rle_size_t chunk_size : {rle_size_t{1}, rle_size_t{3}, rle_size_t{7}, + rle_size_t{8}, rle_size_t{9}, n_vals, n_vals + 1}) { + CheckDecoderValuesChunked(run, expected, chunk_size); + } + + // Decode the whole run in several chunks, after an initial Advance that shifts + // the run and output bit alignment. + for (const rle_size_t chunk_size : {rle_size_t{1}, rle_size_t{3}, rle_size_t{7}, + rle_size_t{8}, rle_size_t{9}, n_vals, n_vals + 1}) { + for (rle_size_t expected_skip = 1; expected_skip < 8 && expected_skip < n_vals; + ++expected_skip) { + // Check the decoding happens as expected + CheckDecoderValuesChunked(run, expected, chunk_size, expected_skip); + // Check the decoding does not write out of bounds + CheckDecoderClobber(run, expected, chunk_size, expected_skip); + } + } + + // Get() one value at a time, then read past the end. + { + Decoder decoder(run); + std::vector out(static_cast(bit_util::BytesForBits(n_vals)) + 1, 0); + for (rle_size_t i = 0; i < n_vals; ++i) { + const bool ok = decoder.Get(BitmapSpanMut(out.data(), /*bit_start=*/i)); + EXPECT_TRUE(ok); + EXPECT_EQ(decoder.remaining(), n_vals - i - 1); + } + // Exhausted: nothing more can be read or advanced. + const bool ok = decoder.Get(BitmapSpanMut(out.data())); + EXPECT_FALSE(ok); + const auto advanced = decoder.Advance(1); + EXPECT_EQ(advanced, 0); + EXPECT_EQ(decoder.remaining(), 0); + CheckDecodedBits({.actual = out, .expected = expected, .count = n_vals}); + } + + // Advancing more than available stops at the run boundary. + { + Decoder decoder(run); + const auto advanced = decoder.Advance(n_vals + 100); + EXPECT_EQ(advanced, n_vals); + EXPECT_EQ(decoder.remaining(), 0); + } + + // Reset() rewinds the decoder so the run can be decoded again. + { + Decoder decoder(run); + std::vector out_1(static_cast(bit_util::BytesForBits(n_vals)), 0); + const auto scratch_got = decoder.GetBatch(BitmapSpanMut(out_1.data()), n_vals); + EXPECT_EQ(scratch_got, n_vals); + EXPECT_EQ(decoder.remaining(), 0); + + decoder.Reset(run); + EXPECT_EQ(decoder.remaining(), n_vals); + std::vector out_2(static_cast(bit_util::BytesForBits(n_vals)), 0); + const auto got = decoder.GetBatch(BitmapSpanMut(out_2.data()), n_vals); + EXPECT_EQ(got, n_vals); + CheckDecodedBits({.actual = out_2, .expected = expected, .count = n_vals}); + } +} + +} // namespace + +/*************************** + * RleRunToBitmapDecoder * + ***************************/ + +class RleRunToBitmapDecoderTest : public ::testing::TestWithParam {}; + +TEST_P(RleRunToBitmapDecoderTest, Decode) { + const auto& count = GetParam(); + + // Only two possible repeated value + for (bool value : {true, false}) { + ARROW_SCOPED_TRACE("value = ", value); + + // A boolean RLE run stores its value in a single (1-bit-wide) byte. + const uint8_t data = value ? 1 : 0; + const auto run = RleRun(&data, count, /*value_bit_width=*/1); + + // value() reports the repeated boolean. + RleRunToBitmapDecoder decoder(run); + EXPECT_EQ(decoder.value(), value); + + const std::vector expected(count, value); + CheckBitmapDecoder(run, expected); + } +} + +INSTANTIATE_TEST_SUITE_P( // + RleBitmap, RleRunToBitmapDecoderTest, + ::testing::Values(0, 1, 3, 8, 9, 13, 64, 100, 177)); + +/********************************* + * BitPackedRunToBitmapDecoder * + *********************************/ + +struct BitPackedBitmapCase { + std::string name; + // The raw bit-packed bytes (LSB first). Must hold at least `count` bits. + std::vector bytes; + // The number of values in the run. + rle_size_t count; +}; + +class BitPackedRunToBitmapDecoderTest + : public ::testing::TestWithParam {}; + +TEST_P(BitPackedRunToBitmapDecoderTest, Decode) { + const auto& param = GetParam(); + ASSERT_GE(param.bytes.size(), static_cast(bit_util::BytesForBits(param.count))); + + const auto run = BitPackedRun(param.bytes.data(), param.count, /*value_bit_width=*/1, + /*max_read_bytes=*/-1); + + const std::vector expected = BitsFromBytes(param.bytes, param.count); + CheckBitmapDecoder(run, expected); +} + +INSTANTIATE_TEST_SUITE_P( // + RleBitmap, BitPackedRunToBitmapDecoderTest, + ::testing::Values( // + BitPackedBitmapCase{.name = "empty", .bytes = {0b10110010}, .count = 0}, + BitPackedBitmapCase{.name = "single", .bytes = {0b00000001}, .count = 1}, + BitPackedBitmapCase{.name = "three", .bytes = {0b00000101}, .count = 3}, + BitPackedBitmapCase{.name = "eight", .bytes = {0b11010010}, .count = 8}, + BitPackedBitmapCase{ + .name = "alternating", .bytes = {0b10101010, 0b10101010}, .count = 13}, + BitPackedBitmapCase{.name = "all_zeros", .bytes = {0x00, 0x00}, .count = 16}, + BitPackedBitmapCase{.name = "all_ones", .bytes = {0xFF, 0xFF}, .count = 16}, + BitPackedBitmapCase{ + .name = "mixed", .bytes = {0b11001010, 0b00001111, 0b10110001}, .count = 24}, + BitPackedBitmapCase{ + .name = "unaligned_count", .bytes = {0b00110101, 0b11100100}, .count = 11}, + BitPackedBitmapCase{ + .name = "large", + .bytes = std::vector(16, 0b01101001), + .count = 128, + }), + [](const ::testing::TestParamInfo& info) { + return info.param.name; + }); + +/********************************* + * RleBitPackedToBitmapDecoder * + *********************************/ + +namespace { + +/// Append the LEB128 (unsigned, little-endian base-128) encoding of `value`. +void AppendLeb128(std::vector& out, uint32_t value) { + std::array> buf; + const auto n_bytes = + bit_util::WriteLEB128(value, buf.data(), static_cast(buf.size())); + ASSERT_GT(n_bytes, 0); + out.insert(out.end(), buf.data(), buf.data() + n_bytes); +} + +void AppendRleRun(std::vector& bytes, std::vector& expected, bool value, + rle_size_t count) { + AppendLeb128(bytes, static_cast(count) << 1); // low bit 0 => RLE + bytes.push_back(value ? 1 : 0); + expected.insert(expected.end(), count, value); +} + +void AppendBitPackedRun(std::vector& bytes, std::vector& expected, + const std::vector& packed) { + const auto groups = static_cast(packed.size()); + AppendLeb128(bytes, (static_cast(groups) << 1) | 1); // low bit 1 => packed + bytes.insert(bytes.end(), packed.begin(), packed.end()); + for (rle_size_t i = 0; i < groups * 8; ++i) { + expected.push_back(bit_util::GetBit(packed.data(), i)); + } +} + +/// Decode the whole `bytes` into a bitmap and check it against `expected`. +/// +/// Decode `chunk_size` values per GetBatch call to check the decoder state between +/// calls. The output starts at bit offset `out_offset`. A non-zero offset makes +/// the output and the encoded `bytes` use different bit alignment. +void CheckRleBitPackedDecode(const std::vector& bytes, + const std::vector& expected, rle_size_t chunk_size, + rle_size_t out_offset = 0) { + ARROW_SCOPED_TRACE("chunk_size = ", chunk_size, ", out_offset = ", out_offset); + const auto n_vals = static_cast(expected.size()); + + RleBitPackedToBitmapDecoder decoder(bytes.data(), + static_cast(bytes.size())); + EXPECT_EQ(decoder.exhausted(), n_vals == 0); + + // Output buffer with one guard byte to catch out-of-bounds writes. + std::vector out( + static_cast(bit_util::BytesForBits(out_offset + n_vals)) + 1, 0); + const uint8_t guard = 0xA5; + out.back() = guard; + + rle_size_t read = 0; + while (read < n_vals) { + const auto want = std::min(chunk_size, n_vals - read); + const auto got = decoder.GetBatch( + BitmapSpanMut(out.data(), /*bit_start=*/out_offset + read), want); + EXPECT_EQ(got, want) << "at pos " << read; + ASSERT_GT(got, 0) << "at pos " << read; // break on failure + read += got; + } + + EXPECT_EQ(read, n_vals); + EXPECT_TRUE(decoder.exhausted()); + // Reading past the end yields nothing and leaves the decoder exhausted. + uint8_t scratch = 0; + const auto past_end = decoder.GetBatch(BitmapSpanMut(&scratch), 8); + EXPECT_EQ(past_end, 0); + EXPECT_TRUE(decoder.exhausted()); + + EXPECT_EQ(out.back(), guard) << "decoder wrote past the end of the output"; + CheckDecodedBits({ + .actual = out, + .expected = expected, + .count = n_vals, + .actual_start_bit = out_offset, + }); +} + +/// Run the decode check over a battery of chunk sizes and output offsets. +void CheckRleBitPackedToBitmap(const std::vector& bytes, + const std::vector& expected) { + const auto n_vals = static_cast(expected.size()); + ASSERT_GT(n_vals, 0); + for (const rle_size_t chunk_size : + {rle_size_t{1}, rle_size_t{3}, rle_size_t{7}, rle_size_t{8}, rle_size_t{9}, + rle_size_t{33}, n_vals, n_vals + 1}) { + CheckRleBitPackedDecode(bytes, expected, chunk_size); + // A non-zero output offset forces the first run to start at a non-byte + // aligned output position. + for (rle_size_t out_offset = 1; out_offset < 8; ++out_offset) { + CheckRleBitPackedDecode(bytes, expected, chunk_size, out_offset); + } + } +} + +} // namespace + +TEST(RleBitPackedToBitmapDecoder, Empty) { + // A default-constructed decoder is already exhausted. + RleBitPackedToBitmapDecoder decoder; + EXPECT_TRUE(decoder.exhausted()); + uint8_t out = 0; + auto got = decoder.GetBatch(BitmapSpanMut(&out), 8); + EXPECT_EQ(got, 0); + + // So is one reset on an empty buffer. + decoder.Reset(nullptr, 0); + EXPECT_TRUE(decoder.exhausted()); + got = decoder.GetBatch(BitmapSpanMut(&out), 8); + EXPECT_EQ(got, 0); +} + +TEST(RleBitPackedToBitmapDecoder, SingleRleZeros) { + std::vector bytes; + std::vector expected; + AppendRleRun(bytes, expected, /*value=*/false, /*count=*/100); + CheckRleBitPackedToBitmap(bytes, expected); +} + +TEST(RleBitPackedToBitmapDecoder, SingleRleOnes) { + std::vector bytes; + std::vector expected; + AppendRleRun(bytes, expected, /*value=*/true, /*count=*/100); + CheckRleBitPackedToBitmap(bytes, expected); +} + +TEST(RleBitPackedToBitmapDecoder, SingleBitPacked) { + std::vector bytes; + std::vector expected; + AppendBitPackedRun(bytes, expected, {0b10101010, 0b11001100, 0b11110000}); + CheckRleBitPackedToBitmap(bytes, expected); +} + +TEST(RleBitPackedToBitmapDecoder, MixedRunsAligned) { + // All runs end on a byte boundary, so each run starts byte-aligned in the + // output. + std::vector bytes; + std::vector expected; + AppendRleRun(bytes, expected, /*value=*/false, /*count=*/16); + AppendBitPackedRun(bytes, expected, {0b10101010, 0b01010101}); + AppendRleRun(bytes, expected, /*value=*/true, /*count=*/64); + AppendBitPackedRun(bytes, expected, {0b00001111}); + CheckRleBitPackedToBitmap(bytes, expected); +} + +TEST(RleBitPackedToBitmapDecoder, MixedRunsUnaligned) { + // RLE runs with counts that are not multiples of 8 make each following run + // start at a non-byte-aligned output position. + std::vector bytes; + std::vector expected; + AppendRleRun(bytes, expected, /*value=*/true, /*count=*/13); + AppendBitPackedRun(bytes, expected, {0b01101001, 0b10010110}); + AppendRleRun(bytes, expected, /*value=*/false, /*count=*/5); + AppendRleRun(bytes, expected, /*value=*/true, /*count=*/200); + AppendBitPackedRun(bytes, expected, {0b11110000}); + AppendRleRun(bytes, expected, /*value=*/false, /*count=*/3); + AppendBitPackedRun(bytes, expected, {0b10110001, 0b00011101}); + CheckRleBitPackedToBitmap(bytes, expected); +} + +TEST(RleBitPackedToBitmapDecoder, ReadPastEnd) { + std::vector bytes; + std::vector expected; + AppendRleRun(bytes, expected, /*value=*/true, /*count=*/10); + AppendBitPackedRun(bytes, expected, {0b10110010}); + const auto n_vals = static_cast(expected.size()); + + RleBitPackedToBitmapDecoder decoder(bytes.data(), + static_cast(bytes.size())); + std::vector out(static_cast(bit_util::BytesForBits(n_vals)) + 1, 0); + // Requesting more values than available produces only the available ones. + auto got = decoder.GetBatch(BitmapSpanMut(out.data()), n_vals + 100); + EXPECT_EQ(got, n_vals); + EXPECT_TRUE(decoder.exhausted()); + got = decoder.GetBatch(BitmapSpanMut(out.data()), 10); + EXPECT_EQ(got, 0); + CheckDecodedBits({.actual = out, .expected = expected, .count = n_vals}); +} + +TEST(RleBitPackedToBitmapDecoder, Reset) { + std::vector bytes; + std::vector expected; + AppendRleRun(bytes, expected, /*value=*/true, /*count=*/13); + AppendBitPackedRun(bytes, expected, {0b01101001, 0b10010110}); + AppendRleRun(bytes, expected, /*value=*/false, /*count=*/20); + const auto n_vals = static_cast(expected.size()); + const auto data_size = static_cast(bytes.size()); + + RleBitPackedToBitmapDecoder decoder(bytes.data(), data_size); + std::vector out_1(static_cast(bit_util::BytesForBits(n_vals)), 0); + const auto got_1 = decoder.GetBatch(BitmapSpanMut(out_1.data()), n_vals); + EXPECT_EQ(got_1, n_vals); + EXPECT_TRUE(decoder.exhausted()); + + // Reset rewinds the decoder so the same buffer decodes again. + decoder.Reset(bytes.data(), data_size); + EXPECT_FALSE(decoder.exhausted()); + std::vector out_2(static_cast(bit_util::BytesForBits(n_vals)), 0); + const auto got_2 = decoder.GetBatch(BitmapSpanMut(out_2.data()), n_vals); + EXPECT_EQ(got_2, n_vals); + EXPECT_TRUE(decoder.exhausted()); + CheckDecodedBits({.actual = out_2, .expected = expected, .count = n_vals}); +} + +TEST(RleBitPackedToBitmapDecoder, Truncated) { + // Malformed input: a bit-packed run declares more values than the buffer + // holds. The decoder should return the values it can read and report that it + // is not exhausted, rather than crash or read out of bounds. + std::vector bytes; + std::vector expected; + AppendRleRun(bytes, expected, /*value=*/true, /*count=*/10); + // The header declares 4 bytes (4 * 8 = 32 values) of bit-packed data, but only + // 1 byte follows. + AppendLeb128(bytes, (4u << 1) | 1); + bytes.push_back(0b10101010); + + RleBitPackedToBitmapDecoder decoder(bytes.data(), + static_cast(bytes.size())); + std::vector out(16, 0); + // The RLE run decodes fully; the truncated bit-packed run cannot be parsed. + const auto got = decoder.GetBatch(BitmapSpanMut(out.data()), 1000); + EXPECT_EQ(got, 10); + EXPECT_FALSE(decoder.exhausted()); + CheckDecodedBits({.actual = out, .expected = expected, .count = 10}); +} + +} // namespace arrow::util diff --git a/cpp/src/arrow/util/rle_encoding_internal.h b/cpp/src/arrow/util/rle_encoding_internal.h index 7b4f2114808..825cd253df9 100644 --- a/cpp/src/arrow/util/rle_encoding_internal.h +++ b/cpp/src/arrow/util/rle_encoding_internal.h @@ -22,6 +22,7 @@ #include #include +#include #include #include #include @@ -32,6 +33,7 @@ #include "arrow/util/bpacking_internal.h" #include "arrow/util/logging.h" #include "arrow/util/macros.h" +#include "arrow/util/ubsan.h" namespace arrow::util { @@ -102,10 +104,6 @@ class RleRunDecoder; /// 10 % on some benchmarks. class RleRun { public: - /// The decoder class used to decode a single run in the given type. - template - using DecoderType = RleRunDecoder; - constexpr RleRun() noexcept = default; explicit RleRun(const uint8_t* data, rle_size_t values_count, @@ -116,6 +114,12 @@ class RleRun { std::copy(data, data + raw_data_size(value_bit_width), data_.begin()); } + /// The repeated value in the run in little endian form (as stored in the buffer). + uint64_t value_little_endian() const noexcept { + // Underlying memcpy is required to avoid undefined behavior. + return SafeLoadAs(data_.data()); + } + /// The number of repeated values in this run. constexpr rle_size_t values_count() const noexcept { return values_count_; } @@ -132,7 +136,7 @@ class RleRun { private: /// The repeated value raw bytes stored inside the class with enough space to store /// up to a 64 bit value. - std::array data_ = {}; + alignas(8) std::array data_ = {}; /// The number of time the value is repeated. rle_size_t values_count_ = 0; }; @@ -150,10 +154,6 @@ class BitPackedRunDecoder; /// 10 % on some benchmarks. class BitPackedRun { public: - /// The decoder class used to decode a single run in the given type. - template - using DecoderType = BitPackedRunDecoder; - constexpr BitPackedRun() noexcept = default; constexpr BitPackedRun(const uint8_t* data, rle_size_t values_count, @@ -240,6 +240,10 @@ class RleBitPackedParser { template void Parse(Handler&& handler); + /// Call the parser with a single callable for all event types. + template + void ParseWithCallable(Callable&& func); + private: /// The pointer to the beginning of the run const uint8_t* data_ = nullptr; @@ -277,10 +281,8 @@ class RleRunDecoder { // if the bool value isn't 0 or 1. value_ = *run.raw_data_ptr() & 1; } else { - // Memcopy is required to avoid undefined behavior. - value_ = {}; - std::memcpy(&value_, run.raw_data_ptr(), run.raw_data_size(value_bit_width)); - value_ = ::arrow::bit_util::FromLittleEndian(value_); + value_ = static_cast( + ::arrow::bit_util::FromLittleEndian(run.value_little_endian())); } } @@ -505,10 +507,6 @@ class RleBitPackedDecoder { decoder_); } - /// Call the parser with a single callable for all event types. - template - void ParseWithCallable(Callable&& func); - /// Utility methods for retrieving spaced values. template [[nodiscard]] rle_size_t GetSpaced(Converter converter, @@ -670,6 +668,17 @@ void RleBitPackedParser::Parse(Handler&& handler) { } } +template +void RleBitPackedParser::ParseWithCallable(Callable&& func) { + struct { + Callable func; + auto OnBitPackedRun(BitPackedRun run) { return func(std::move(run)); } + auto OnRleRun(RleRun run) { return func(std::move(run)); } + } handler{std::move(func)}; + + return Parse(std::move(handler)); +} + namespace internal { /// The maximal unsigned size that a variable can fit. template @@ -758,17 +767,19 @@ auto RleBitPackedParser::PeekImpl(Handler&& handler) const * RleBitPackedDecoder * *************************/ +/// Utility to map a run type to the associate decoder. +template +struct RleBitPackedDecoderGetRunDecoder; + template -template -void RleBitPackedDecoder::ParseWithCallable(Callable&& func) { - struct { - Callable func; - auto OnBitPackedRun(BitPackedRun run) { return func(std::move(run)); } - auto OnRleRun(RleRun run) { return func(std::move(run)); } - } handler{std::move(func)}; +struct RleBitPackedDecoderGetRunDecoder { + using type = RleRunDecoder; +}; - parser_.Parse(std::move(handler)); -} +template +struct RleBitPackedDecoderGetRunDecoder { + using type = BitPackedRunDecoder; +}; template bool RleBitPackedDecoder::Get(value_type* val) { @@ -780,6 +791,10 @@ auto RleBitPackedDecoder::GetBatch(value_type* out, rle_size_t batch_size) -> rle_size_t { using ControlFlow = RleBitPackedParser::ControlFlow; + if (ARROW_PREDICT_FALSE(batch_size == 0 || exhausted())) { + return 0; + } + rle_size_t values_read = 0; // Remaining from a previous call that would have left some unread data from a run. @@ -795,8 +810,8 @@ auto RleBitPackedDecoder::GetBatch(value_type* out, ARROW_DCHECK(run_remaining() == 0); } - ParseWithCallable([&](auto run) { - using RunDecoder = typename decltype(run)::template DecoderType; + parser_.ParseWithCallable([&](auto run) { + using RunDecoder = RleBitPackedDecoderGetRunDecoder::type; ARROW_DCHECK_LT(values_read, batch_size); RunDecoder decoder(run, value_bit_width_); @@ -1108,8 +1123,8 @@ auto RleBitPackedDecoder::GetSpaced(Converter converter, ARROW_DCHECK(run_remaining() == 0); } - ParseWithCallable([&](auto run) { - using RunDecoder = typename decltype(run)::template DecoderType; + parser_.ParseWithCallable([&](auto run) { + using RunDecoder = RleBitPackedDecoderGetRunDecoder::type; RunDecoder decoder(run, value_bit_width_); @@ -1290,8 +1305,8 @@ auto RleBitPackedDecoder::GetBatchWithDict(const V* dictionary, ARROW_DCHECK(run_remaining() == 0); } - ParseWithCallable([&](auto run) { - using RunDecoder = typename decltype(run)::template DecoderType; + parser_.ParseWithCallable([&](auto run) { + using RunDecoder = RleBitPackedDecoderGetRunDecoder::type; RunDecoder decoder(run, value_bit_width_);