Skip to content

Commit 100bbe3

Browse files
authored
feat(puffin): add puffin file reader and writer (#624)
1 parent 8601f16 commit 100bbe3

14 files changed

Lines changed: 1228 additions & 16 deletions

src/iceberg/CMakeLists.txt

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -175,7 +175,9 @@ set(ICEBERG_DATA_SOURCES
175175
deletes/roaring_position_bitmap.cc
176176
puffin/file_metadata.cc
177177
puffin/json_serde.cc
178-
puffin/puffin_format.cc)
178+
puffin/puffin_format.cc
179+
puffin/puffin_reader.cc
180+
puffin/puffin_writer.cc)
179181

180182
set(ICEBERG_DATA_STATIC_BUILD_INTERFACE_LIBS)
181183
set(ICEBERG_DATA_SHARED_BUILD_INTERFACE_LIBS)

src/iceberg/meson.build

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -157,6 +157,8 @@ iceberg_data_sources = files(
157157
'puffin/file_metadata.cc',
158158
'puffin/json_serde.cc',
159159
'puffin/puffin_format.cc',
160+
'puffin/puffin_reader.cc',
161+
'puffin/puffin_writer.cc',
160162
)
161163

162164
# CRoaring does not export symbols, so on Windows it must

src/iceberg/puffin/meson.build

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,12 @@
1616
# under the License.
1717

1818
install_headers(
19-
['file_metadata.h', 'puffin_format.h', 'type_fwd.h'],
19+
[
20+
'file_metadata.h',
21+
'puffin_format.h',
22+
'puffin_reader.h',
23+
'puffin_writer.h',
24+
'type_fwd.h',
25+
],
2026
subdir: 'iceberg/puffin',
2127
)

src/iceberg/puffin/puffin_format.cc

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,18 @@ constexpr std::pair<int, int> GetFlagPosition(PuffinFlag flag) {
3636
std::unreachable();
3737
}
3838

39+
} // namespace
40+
41+
bool IsFlagSet(std::span<const uint8_t, 4> flags, PuffinFlag flag) {
42+
auto [byte_num, bit_num] = GetFlagPosition(flag);
43+
return (flags[byte_num] & (1 << bit_num)) != 0;
44+
}
45+
46+
void SetFlag(std::span<uint8_t, 4> flags, PuffinFlag flag) {
47+
auto [byte_num, bit_num] = GetFlagPosition(flag);
48+
flags[byte_num] |= (1 << bit_num);
49+
}
50+
3951
// TODO(zhaoxuan1994): Move compression logic to a unified codec interface.
4052
Result<std::vector<std::byte>> Compress(PuffinCompressionCodec codec,
4153
std::span<const std::byte> input) {
@@ -63,16 +75,4 @@ Result<std::vector<std::byte>> Decompress(PuffinCompressionCodec codec,
6375
std::unreachable();
6476
}
6577

66-
} // namespace
67-
68-
bool IsFlagSet(std::span<const uint8_t, 4> flags, PuffinFlag flag) {
69-
auto [byte_num, bit_num] = GetFlagPosition(flag);
70-
return (flags[byte_num] & (1 << bit_num)) != 0;
71-
}
72-
73-
void SetFlag(std::span<uint8_t, 4> flags, PuffinFlag flag) {
74-
auto [byte_num, bit_num] = GetFlagPosition(flag);
75-
flags[byte_num] |= (1 << bit_num);
76-
}
77-
7878
} // namespace iceberg::puffin

src/iceberg/puffin/puffin_format.h

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,10 @@
2323
/// Puffin file format constants and utilities.
2424

2525
#include <array>
26+
#include <cstddef>
2627
#include <cstdint>
2728
#include <span>
29+
#include <vector>
2830

2931
#include "iceberg/iceberg_data_export.h"
3032
#include "iceberg/puffin/file_metadata.h"
@@ -66,4 +68,12 @@ ICEBERG_DATA_EXPORT bool IsFlagSet(std::span<const uint8_t, 4> flags, PuffinFlag
6668
/// \brief Set a flag in the flags bytes.
6769
ICEBERG_DATA_EXPORT void SetFlag(std::span<uint8_t, 4> flags, PuffinFlag flag);
6870

71+
/// \brief Compress data using the specified codec.
72+
ICEBERG_DATA_EXPORT Result<std::vector<std::byte>> Compress(
73+
PuffinCompressionCodec codec, std::span<const std::byte> input);
74+
75+
/// \brief Decompress data using the specified codec.
76+
ICEBERG_DATA_EXPORT Result<std::vector<std::byte>> Decompress(
77+
PuffinCompressionCodec codec, std::span<const std::byte> input);
78+
6979
} // namespace iceberg::puffin
Lines changed: 271 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,271 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
#include "iceberg/puffin/puffin_reader.h"
21+
22+
#include <algorithm>
23+
#include <array>
24+
#include <cstdint>
25+
#include <cstring>
26+
#include <limits>
27+
#include <span>
28+
#include <string_view>
29+
30+
#include "iceberg/file_io.h"
31+
#include "iceberg/puffin/json_serde_internal.h"
32+
#include "iceberg/puffin/puffin_format.h"
33+
#include "iceberg/util/endian.h"
34+
#include "iceberg/util/macros.h"
35+
36+
namespace iceberg::puffin {
37+
38+
namespace {
39+
40+
struct FooterInfo {
41+
int32_t payload_size;
42+
PuffinCompressionCodec compression;
43+
};
44+
45+
Status CheckMagic(std::span<const std::byte> data, int64_t offset = 0) {
46+
ICEBERG_PRECHECK(offset >= 0, "Invalid file: magic offset {} is negative", offset);
47+
auto offset_size = static_cast<size_t>(offset);
48+
ICEBERG_PRECHECK(offset_size <= data.size() &&
49+
data.size() - offset_size >= PuffinFormat::kMagicLength,
50+
"Invalid file: buffer too small for magic at offset {}", offset);
51+
auto* begin = reinterpret_cast<const uint8_t*>(data.data() + offset_size);
52+
ICEBERG_PRECHECK(
53+
std::equal(PuffinFormat::kMagicV1.cbegin(), PuffinFormat::kMagicV1.cend(), begin),
54+
"Invalid file: expected magic at offset {}, got [{:#04x}, {:#04x}, {:#04x}, "
55+
"{:#04x}]",
56+
offset, begin[0], begin[1], begin[2], begin[3]);
57+
return {};
58+
}
59+
60+
Status CheckUnknownFlags(std::span<const uint8_t, 4> flags) {
61+
constexpr uint8_t kKnownBitsMask = 0x01;
62+
ICEBERG_PRECHECK(
63+
(flags[0] & ~kKnownBitsMask) == 0 && flags[1] == 0 && flags[2] == 0 &&
64+
flags[3] == 0,
65+
"Invalid file: unknown footer flags set [{:#04x}, {:#04x}, {:#04x}, {:#04x}]",
66+
flags[0], flags[1], flags[2], flags[3]);
67+
return {};
68+
}
69+
70+
Result<int32_t> FooterPayloadSize(std::span<const std::byte> footer_struct) {
71+
ICEBERG_PRECHECK(footer_struct.size() >= PuffinFormat::kFooterStructLength,
72+
"Invalid file: footer struct is too small");
73+
auto payload_size = ReadLittleEndian<int32_t>(
74+
footer_struct.data() + PuffinFormat::kFooterStructPayloadSizeOffset);
75+
ICEBERG_PRECHECK(payload_size >= 0, "Invalid file: negative payload size {}",
76+
payload_size);
77+
return payload_size;
78+
}
79+
80+
Result<std::array<uint8_t, 4>> DecodeFlags(std::span<const std::byte> footer_struct) {
81+
ICEBERG_PRECHECK(footer_struct.size() >= PuffinFormat::kFooterStructLength,
82+
"Invalid file: footer struct is too small");
83+
std::array<uint8_t, 4> flags{};
84+
std::memcpy(flags.data(), footer_struct.data() + PuffinFormat::kFooterStructFlagsOffset,
85+
flags.size());
86+
ICEBERG_RETURN_UNEXPECTED(CheckUnknownFlags(flags));
87+
return flags;
88+
}
89+
90+
PuffinCompressionCodec FooterCompressionCodec(std::span<const uint8_t, 4> flags) {
91+
if (IsFlagSet(flags, PuffinFlag::kFooterPayloadCompressed)) {
92+
return PuffinFormat::kDefaultFooterCompressionCodec;
93+
}
94+
return PuffinCompressionCodec::kNone;
95+
}
96+
97+
Status CheckFooterSize(int64_t footer_size, int32_t payload_size) {
98+
auto expected_footer_size = PuffinFormat::kFooterStartMagicLength +
99+
static_cast<int64_t>(payload_size) +
100+
PuffinFormat::kFooterStructLength;
101+
ICEBERG_PRECHECK(footer_size == expected_footer_size,
102+
"Invalid file: footer size {} does not match payload size {}",
103+
footer_size, payload_size);
104+
return {};
105+
}
106+
107+
Result<FooterInfo> DecodeFooterInfo(std::span<const std::byte> footer,
108+
int64_t footer_size) {
109+
ICEBERG_PRECHECK(footer_size >= PuffinFormat::kFooterStartMagicLength +
110+
PuffinFormat::kFooterStructLength,
111+
"Invalid file: footer size {} is too small", footer_size);
112+
ICEBERG_PRECHECK(static_cast<uint64_t>(footer_size) <= footer.size(),
113+
"Invalid file: footer size {} exceeds buffer size {}", footer_size,
114+
footer.size());
115+
116+
ICEBERG_RETURN_UNEXPECTED(CheckMagic(footer, PuffinFormat::kFooterStartMagicOffset));
117+
118+
auto footer_struct_offset = footer_size - PuffinFormat::kFooterStructLength;
119+
std::span<const std::byte> footer_struct(footer.data() + footer_struct_offset,
120+
PuffinFormat::kFooterStructLength);
121+
ICEBERG_RETURN_UNEXPECTED(
122+
CheckMagic(footer_struct, PuffinFormat::kFooterStructMagicOffset));
123+
124+
ICEBERG_ASSIGN_OR_RAISE(auto payload_size, FooterPayloadSize(footer_struct));
125+
ICEBERG_RETURN_UNEXPECTED(CheckFooterSize(footer_size, payload_size));
126+
127+
ICEBERG_ASSIGN_OR_RAISE(auto flags, DecodeFlags(footer_struct));
128+
return FooterInfo{.payload_size = payload_size,
129+
.compression = FooterCompressionCodec(flags)};
130+
}
131+
132+
Result<FileMetadata> ParseFileMetadata(std::span<const std::byte> payload,
133+
PuffinCompressionCodec compression) {
134+
std::vector<std::byte> decompressed;
135+
if (compression != PuffinCompressionCodec::kNone) {
136+
ICEBERG_ASSIGN_OR_RAISE(decompressed, Decompress(compression, payload));
137+
payload = decompressed;
138+
}
139+
140+
return FileMetadataFromJsonString(
141+
std::string_view(reinterpret_cast<const char*>(payload.data()), payload.size()));
142+
}
143+
144+
} // namespace
145+
146+
PuffinReader::PuffinReader(std::unique_ptr<SeekableInputStream> stream, int64_t file_size,
147+
std::optional<int64_t> known_footer_size)
148+
: stream_(std::move(stream)),
149+
file_size_(file_size),
150+
known_footer_size_(known_footer_size) {}
151+
152+
PuffinReader::~PuffinReader() = default;
153+
154+
Result<std::unique_ptr<PuffinReader>> PuffinReader::Make(
155+
std::unique_ptr<InputFile> input_file, std::optional<int64_t> footer_size,
156+
std::optional<int64_t> file_size) {
157+
ICEBERG_PRECHECK(input_file, "Input file must not be null");
158+
int64_t resolved_file_size = 0;
159+
if (file_size.has_value()) {
160+
ICEBERG_PRECHECK(*file_size >= 0, "File size must not be negative: {}", *file_size);
161+
resolved_file_size = *file_size;
162+
} else {
163+
ICEBERG_ASSIGN_OR_RAISE(resolved_file_size, input_file->Size());
164+
}
165+
if (footer_size.has_value()) {
166+
ICEBERG_PRECHECK(*footer_size > 0, "Footer size must be positive: {}", *footer_size);
167+
ICEBERG_PRECHECK(*footer_size <= resolved_file_size - PuffinFormat::kMagicLength,
168+
"Footer size {} exceeds file size {}", *footer_size,
169+
resolved_file_size);
170+
ICEBERG_PRECHECK(*footer_size <= std::numeric_limits<int32_t>::max(),
171+
"Footer size {} is too large", *footer_size);
172+
}
173+
ICEBERG_ASSIGN_OR_RAISE(auto stream, input_file->Open());
174+
return std::unique_ptr<PuffinReader>(
175+
new PuffinReader(std::move(stream), resolved_file_size, footer_size));
176+
}
177+
178+
Result<std::vector<std::byte>> PuffinReader::ReadBytes(int64_t offset, int64_t length) {
179+
ICEBERG_PRECHECK(!closed_, "Reader already closed");
180+
ICEBERG_PRECHECK(offset >= 0, "Offset must not be negative: {}", offset);
181+
ICEBERG_PRECHECK(length >= 0, "Length must not be negative: {}", length);
182+
ICEBERG_PRECHECK(offset <= file_size_, "Offset {} exceeds file size {}", offset,
183+
file_size_);
184+
ICEBERG_PRECHECK(length <= file_size_ - offset,
185+
"Length {} exceeds file size {} at offset {}", length, file_size_,
186+
offset);
187+
std::vector<std::byte> buf(length);
188+
ICEBERG_RETURN_UNEXPECTED(stream_->ReadFully(offset, buf));
189+
return buf;
190+
}
191+
192+
Result<int64_t> PuffinReader::FooterSize() {
193+
if (known_footer_size_.has_value()) {
194+
return *known_footer_size_;
195+
}
196+
197+
ICEBERG_ASSIGN_OR_RAISE(auto footer_struct,
198+
ReadBytes(file_size_ - PuffinFormat::kFooterStructLength,
199+
PuffinFormat::kFooterStructLength));
200+
ICEBERG_RETURN_UNEXPECTED(
201+
CheckMagic(footer_struct, PuffinFormat::kFooterStructMagicOffset));
202+
203+
ICEBERG_ASSIGN_OR_RAISE(auto payload_size, FooterPayloadSize(footer_struct));
204+
known_footer_size_ = PuffinFormat::kFooterStartMagicLength +
205+
static_cast<int64_t>(payload_size) +
206+
PuffinFormat::kFooterStructLength;
207+
return *known_footer_size_;
208+
}
209+
210+
Result<std::vector<std::byte>> PuffinReader::ReadFooter(int64_t footer_size) {
211+
return ReadBytes(file_size_ - footer_size, footer_size);
212+
}
213+
214+
Result<FileMetadata> PuffinReader::ReadFileMetadata() {
215+
ICEBERG_ASSIGN_OR_RAISE(auto header_bytes, ReadBytes(0, PuffinFormat::kMagicLength));
216+
ICEBERG_RETURN_UNEXPECTED(CheckMagic(header_bytes));
217+
218+
ICEBERG_ASSIGN_OR_RAISE(auto footer_size, FooterSize());
219+
ICEBERG_ASSIGN_OR_RAISE(auto footer, ReadFooter(footer_size));
220+
ICEBERG_ASSIGN_OR_RAISE(auto footer_info, DecodeFooterInfo(footer, footer_size));
221+
std::span<const std::byte> payload_bytes(
222+
footer.data() + PuffinFormat::kFooterStartMagicLength, footer_info.payload_size);
223+
return ParseFileMetadata(payload_bytes, footer_info.compression);
224+
}
225+
226+
Result<std::pair<BlobMetadata, std::vector<std::byte>>> PuffinReader::ReadBlob(
227+
const BlobMetadata& blob_metadata) {
228+
ICEBERG_ASSIGN_OR_RAISE(auto raw_data,
229+
ReadBytes(blob_metadata.offset, blob_metadata.length));
230+
231+
ICEBERG_ASSIGN_OR_RAISE(
232+
auto codec, PuffinCompressionCodecFromName(blob_metadata.compression_codec));
233+
if (codec == PuffinCompressionCodec::kNone) {
234+
return std::pair{blob_metadata, std::move(raw_data)};
235+
}
236+
237+
ICEBERG_ASSIGN_OR_RAISE(auto decompressed, Decompress(codec, raw_data));
238+
239+
return std::pair{blob_metadata, std::move(decompressed)};
240+
}
241+
242+
Result<std::vector<std::pair<BlobMetadata, std::vector<std::byte>>>>
243+
PuffinReader::ReadAll(const std::vector<BlobMetadata>& blobs) {
244+
// Sort by offset for sequential I/O access pattern
245+
std::vector<const BlobMetadata*> sorted;
246+
sorted.reserve(blobs.size());
247+
for (const auto& blob : blobs) {
248+
sorted.push_back(&blob);
249+
}
250+
std::ranges::sort(sorted,
251+
[](const auto* a, const auto* b) { return a->offset < b->offset; });
252+
253+
std::vector<std::pair<BlobMetadata, std::vector<std::byte>>> results;
254+
results.reserve(blobs.size());
255+
for (const auto* blob : sorted) {
256+
ICEBERG_ASSIGN_OR_RAISE(auto blob_pair, ReadBlob(*blob));
257+
results.push_back(std::move(blob_pair));
258+
}
259+
return results;
260+
}
261+
262+
Status PuffinReader::Close() {
263+
if (closed_) {
264+
return {};
265+
}
266+
ICEBERG_RETURN_UNEXPECTED(stream_->Close());
267+
closed_ = true;
268+
return {};
269+
}
270+
271+
} // namespace iceberg::puffin

0 commit comments

Comments
 (0)