Skip to content

Commit 136b468

Browse files
authored
fix(parquet): check compression codec availability (#656)
## Summary - Check Parquet compression codec availability with Arrow before opening the file writer. - Return a clear `InvalidArgument` when a requested Parquet codec is not built into Arrow. - Add a regression test that exercises an unavailable codec in the current Arrow build. ## Test Plan - `cmake --build build --target parquet_test data_test && ctest --test-dir build -R 'parquet_test|data_test' --output-on-failure` - `git diff --check`
1 parent 3e7b20a commit 136b468

2 files changed

Lines changed: 61 additions & 0 deletions

File tree

src/iceberg/parquet/parquet_writer.cc

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,11 @@
2020
#include "iceberg/parquet/parquet_writer.h"
2121

2222
#include <memory>
23+
#include <string_view>
2324

2425
#include <arrow/c/bridge.h>
2526
#include <arrow/record_batch.h>
27+
#include <arrow/util/compression.h>
2628
#include <arrow/util/key_value_metadata.h>
2729
#include <parquet/arrow/schema.h>
2830
#include <parquet/arrow/writer.h>
@@ -62,6 +64,14 @@ Result<::arrow::Compression::type> ParseCompression(const WriterProperties& prop
6264
}
6365
}
6466

67+
Status CheckCompressionAvailable(std::string_view compression_name,
68+
::arrow::Compression::type compression) {
69+
ICEBERG_PRECHECK(::arrow::util::Codec::IsAvailable(compression),
70+
"Parquet compression codec {} is not available in the current build",
71+
compression_name);
72+
return {};
73+
}
74+
6575
Result<std::optional<int32_t>> ParseCodecLevel(const WriterProperties& properties) {
6676
auto level_str = properties.Get(WriterProperties::kParquetCompressionLevel);
6777
if (level_str.empty()) {
@@ -98,6 +108,9 @@ class ParquetWriter::Impl {
98108
auto schema_node = std::static_pointer_cast<::parquet::schema::GroupNode>(
99109
schema_descriptor->schema_root());
100110

111+
ICEBERG_RETURN_UNEXPECTED(CheckCompressionAvailable(
112+
options.properties.Get(WriterProperties::kParquetCompression), compression));
113+
101114
ICEBERG_ASSIGN_OR_RAISE(output_stream_, OpenOutputStream(options));
102115
auto file_writer = ::parquet::ParquetFileWriter::Open(
103116
output_stream_, std::move(schema_node), std::move(writer_properties),

src/iceberg/test/parquet_test.cc

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,9 @@
1818
*/
1919

2020
#include <optional>
21+
#include <string>
22+
#include <utility>
23+
#include <vector>
2124

2225
#include <arrow/array.h>
2326
#include <arrow/c/bridge.h>
@@ -26,6 +29,7 @@
2629
#include <arrow/record_batch.h>
2730
#include <arrow/table.h>
2831
#include <arrow/type.h>
32+
#include <arrow/util/compression.h>
2933
#include <arrow/util/key_value_metadata.h>
3034
#include <parquet/arrow/reader.h>
3135
#include <parquet/arrow/writer.h>
@@ -124,6 +128,27 @@ void DoRoundtrip(std::shared_ptr<::arrow::Array> data, std::shared_ptr<Schema> s
124128
ASSERT_TRUE(out != nullptr) << "Reader.Next() returned no data";
125129
}
126130

131+
struct ParquetCodec {
132+
std::string name;
133+
::arrow::Compression::type compression;
134+
};
135+
136+
std::optional<ParquetCodec> FirstUnavailableParquetCodec() {
137+
const std::vector<ParquetCodec> codecs = {
138+
{.name = "snappy", .compression = ::arrow::Compression::SNAPPY},
139+
{.name = "gzip", .compression = ::arrow::Compression::GZIP},
140+
{.name = "brotli", .compression = ::arrow::Compression::BROTLI},
141+
{.name = "lz4", .compression = ::arrow::Compression::LZ4},
142+
{.name = "zstd", .compression = ::arrow::Compression::ZSTD},
143+
};
144+
for (const auto& codec : codecs) {
145+
if (!::arrow::util::Codec::IsAvailable(codec.compression)) {
146+
return codec;
147+
}
148+
}
149+
return std::nullopt;
150+
}
151+
127152
} // namespace
128153

129154
class ParquetReaderTest : public TempFileTestBase {
@@ -461,6 +486,29 @@ TEST_F(ParquetReadWrite, EmptyStruct) {
461486
IsError(ErrorKind::kNotImplemented));
462487
}
463488

489+
TEST_F(ParquetReadWrite, RejectsUnavailableCompressionCodec) {
490+
auto unavailable_codec = FirstUnavailableParquetCodec();
491+
if (!unavailable_codec.has_value()) {
492+
GTEST_SKIP() << "All optional Parquet compression codecs are available";
493+
}
494+
495+
auto schema = std::make_shared<Schema>(
496+
std::vector<SchemaField>{SchemaField::MakeRequired(1, "id", int32())});
497+
WriterProperties writer_properties;
498+
writer_properties.Set(WriterProperties::kParquetCompression, unavailable_codec->name);
499+
500+
auto writer = WriterFactoryRegistry::Open(
501+
FileFormatType::kParquet, {.path = "unavailable_codec.parquet",
502+
.schema = schema,
503+
.io = arrow::ArrowFileSystemFileIO::MakeMockFileIO(),
504+
.properties = std::move(writer_properties)});
505+
506+
EXPECT_THAT(writer, IsError(ErrorKind::kInvalidArgument));
507+
EXPECT_THAT(writer,
508+
HasErrorMessage("Parquet compression codec " + unavailable_codec->name +
509+
" is not available in the current build"));
510+
}
511+
464512
TEST_F(ParquetReadWrite, SimpleStructRoundTrip) {
465513
auto schema = std::make_shared<Schema>(std::vector<SchemaField>{
466514
SchemaField::MakeOptional(1, "a",

0 commit comments

Comments
 (0)