From bc7e748c720e4718300bdfdbdf8bb30e72a4b5ec Mon Sep 17 00:00:00 2001 From: Daniel Adam Date: Thu, 12 Feb 2026 17:41:25 +0100 Subject: [PATCH 1/2] parquet/encoding: add endianness-aware BYTE_STREAM_SPLIT decoding - Add platform-specific decodeByteStreamSplitBatchWidth{4,8}InByteOrder for little-endian and s390x big-endian architectures. - Update ByteStreamSplitDecoder to use new endianness-aware decoding functions for correct behavior on all platforms. --- .../internal/encoding/byte_stream_split.go | 39 ++---------- .../encoding/byte_stream_split_big_endian.go | 61 +++++++++++++++++++ .../byte_stream_split_little_endian.go | 61 +++++++++++++++++++ 3 files changed, 127 insertions(+), 34 deletions(-) create mode 100644 parquet/internal/encoding/byte_stream_split_big_endian.go create mode 100644 parquet/internal/encoding/byte_stream_split_little_endian.go diff --git a/parquet/internal/encoding/byte_stream_split.go b/parquet/internal/encoding/byte_stream_split.go index fab61365..73e32634 100644 --- a/parquet/internal/encoding/byte_stream_split.go +++ b/parquet/internal/encoding/byte_stream_split.go @@ -88,38 +88,6 @@ func encodeByteStreamSplitWidth8(data []byte, in []byte) { } } -// decodeByteStreamSplitBatchWidth4 decodes the batch of nValues raw bytes representing a 4-byte datatype provided by 'data', -// into the output buffer 'out' using BYTE_STREAM_SPLIT encoding. -// 'out' must have space for at least len(data) bytes. -func decodeByteStreamSplitBatchWidth4(data []byte, nValues, stride int, out []byte) { - const width = 4 - debug.Assert(len(out) >= nValues*width, fmt.Sprintf("not enough space in output buffer for decoding, out: %d bytes, data: %d bytes", len(out), len(data))) - for element := 0; element < nValues; element++ { - out[width*element] = data[element] - out[width*element+1] = data[stride+element] - out[width*element+2] = data[2*stride+element] - out[width*element+3] = data[3*stride+element] - } -} - -// decodeByteStreamSplitBatchWidth8 decodes the batch of nValues raw bytes representing a 8-byte datatype provided by 'data', -// into the output buffer 'out' using BYTE_STREAM_SPLIT encoding. -// 'out' must have space for at least len(data) bytes. -func decodeByteStreamSplitBatchWidth8(data []byte, nValues, stride int, out []byte) { - const width = 8 - debug.Assert(len(out) >= nValues*width, fmt.Sprintf("not enough space in output buffer for decoding, out: %d bytes, data: %d bytes", len(out), len(data))) - for element := 0; element < nValues; element++ { - out[width*element] = data[element] - out[width*element+1] = data[stride+element] - out[width*element+2] = data[2*stride+element] - out[width*element+3] = data[3*stride+element] - out[width*element+4] = data[4*stride+element] - out[width*element+5] = data[5*stride+element] - out[width*element+6] = data[6*stride+element] - out[width*element+7] = data[7*stride+element] - } -} - // decodeByteStreamSplitBatchFLBA decodes the batch of nValues FixedLenByteArrays provided by 'data', // into the output slice 'out' using BYTE_STREAM_SPLIT encoding. // 'out' must have space for at least nValues slices. @@ -303,12 +271,15 @@ func (dec *ByteStreamSplitDecoder[T]) Decode(out []T) (int, error) { return 0, xerrors.New("parquet: eof exception") } + // reinterpret the output slice as bytes so that we can decode directly into it without an intermediate copy + // however, the byte stream split encoding is defined in little-endian order, so we need to decode the bytes + // into the output slice in the correct order based on the machine's endianness outBytes := arrow.GetBytes(out) switch typeLen { case 4: - decodeByteStreamSplitBatchWidth4(dec.data, toRead, dec.stride, outBytes) + decodeByteStreamSplitBatchWidth4InByteOrder(dec.data, toRead, dec.stride, outBytes) case 8: - decodeByteStreamSplitBatchWidth8(dec.data, toRead, dec.stride, outBytes) + decodeByteStreamSplitBatchWidth8InByteOrder(dec.data, toRead, dec.stride, outBytes) default: return 0, fmt.Errorf("encoding ByteStreamSplit is only defined for numeric type of width 4 or 8, found: %d", typeLen) } diff --git a/parquet/internal/encoding/byte_stream_split_big_endian.go b/parquet/internal/encoding/byte_stream_split_big_endian.go new file mode 100644 index 00000000..5920d9a6 --- /dev/null +++ b/parquet/internal/encoding/byte_stream_split_big_endian.go @@ -0,0 +1,61 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//go:build s390x + +package encoding + +import ( + "fmt" + + "github.com/apache/arrow-go/v18/parquet/internal/debug" +) + +// decodeByteStreamSplitBatchWidth4InByteOrder decodes the batch of nValues raw bytes representing a 4-byte datatype provided +// by 'data', into the output buffer 'out' using BYTE_STREAM_SPLIT encoding. The values are expected to be in little-endian +// byte order and are be decoded into the 'out' array in machine's native endianness. +// 'out' must have space for at least len(data) bytes. +func decodeByteStreamSplitBatchWidth4InByteOrder(data []byte, nValues, stride int, out []byte) { + const width = 4 + debug.Assert(len(out) >= nValues*width, fmt.Sprintf("not enough space in output buffer for decoding, out: %d bytes, data: %d bytes", len(out), len(data))) + for element := 0; element < nValues; element++ { + // Big Endian: most significant byte first + out[width*element+0] = data[3*stride+element] + out[width*element+1] = data[2*stride+element] + out[width*element+2] = data[stride+element] + out[width*element+3] = data[element] + } +} + +// decodeByteStreamSplitBatchWidth8InByteOrder decodes the batch of nValues raw bytes representing a 8-byte datatype provided +// by 'data', into the output buffer 'out' using BYTE_STREAM_SPLIT encoding. The values are expected to be in little-endian +// byte order and are be decoded into the 'out' array in machine's native endianness. +// 'out' must have space for at least len(data) bytes. +func decodeByteStreamSplitBatchWidth8InByteOrder(data []byte, nValues, stride int, out []byte) { + const width = 8 + debug.Assert(len(out) >= nValues*width, fmt.Sprintf("not enough space in output buffer for decoding, out: %d bytes, data: %d bytes", len(out), len(data))) + for element := 0; element < nValues; element++ { + // Big Endian: most significant byte first + out[width*element+0] = data[7*stride+element] + out[width*element+1] = data[6*stride+element] + out[width*element+2] = data[5*stride+element] + out[width*element+3] = data[4*stride+element] + out[width*element+4] = data[3*stride+element] + out[width*element+5] = data[2*stride+element] + out[width*element+6] = data[stride+element] + out[width*element+7] = data[element] + } +} diff --git a/parquet/internal/encoding/byte_stream_split_little_endian.go b/parquet/internal/encoding/byte_stream_split_little_endian.go new file mode 100644 index 00000000..35f47014 --- /dev/null +++ b/parquet/internal/encoding/byte_stream_split_little_endian.go @@ -0,0 +1,61 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//go:build !s390x + +package encoding + +import ( + "fmt" + + "github.com/apache/arrow-go/v18/parquet/internal/debug" +) + +// decodeByteStreamSplitBatchWidth4InByteOrder decodes the batch of nValues raw bytes representing a 4-byte datatype provided +// by 'data', into the output buffer 'out' using BYTE_STREAM_SPLIT encoding. The values are expected to be in little-endian +// byte order and are be decoded into the 'out' array in machine's native endianness. +// 'out' must have space for at least len(data) bytes. +func decodeByteStreamSplitBatchWidth4InByteOrder(data []byte, nValues, stride int, out []byte) { + const width = 4 + debug.Assert(len(out) >= nValues*width, fmt.Sprintf("not enough space in output buffer for decoding, out: %d bytes, data: %d bytes", len(out), len(data))) + for element := 0; element < nValues; element++ { + // Little Endian: least significant byte first + out[width*element+0] = data[element] + out[width*element+1] = data[stride+element] + out[width*element+2] = data[2*stride+element] + out[width*element+3] = data[3*stride+element] + } +} + +// decodeByteStreamSplitBatchWidth8InByteOrder decodes the batch of nValues raw bytes representing a 8-byte datatype provided +// by 'data', into the output buffer 'out' using BYTE_STREAM_SPLIT encoding. The values are expected to be in little-endian +// byte order and are be decoded into the 'out' array in machine's native endianness. +// 'out' must have space for at least len(data) bytes. +func decodeByteStreamSplitBatchWidth8InByteOrder(data []byte, nValues, stride int, out []byte) { + const width = 8 + debug.Assert(len(out) >= nValues*width, fmt.Sprintf("not enough space in output buffer for decoding, out: %d bytes, data: %d bytes", len(out), len(data))) + for element := 0; element < nValues; element++ { + // Little Endian: least significant byte first + out[width*element+0] = data[element] + out[width*element+1] = data[stride+element] + out[width*element+2] = data[2*stride+element] + out[width*element+3] = data[3*stride+element] + out[width*element+4] = data[4*stride+element] + out[width*element+5] = data[5*stride+element] + out[width*element+6] = data[6*stride+element] + out[width*element+7] = data[7*stride+element] + } +} From aa3a4727f708c4b688d3d68d6d0727308e00df00 Mon Sep 17 00:00:00 2001 From: Daniel Adam Date: Fri, 13 Feb 2026 19:06:28 +0100 Subject: [PATCH 2/2] fix: correct some tests in the parquet package to pass on big-endian systems Fix TestPageIndexRoundTripSuite and TestEncoding tests on big-endian systems --- parquet/file/file_writer_test.go | 25 ++++++++++++++++--- .../encoding/encoding_utils_big_endian.go | 8 ++++-- .../internal/encoding/plain_encoding_types.go | 24 ++++++++++-------- 3 files changed, 40 insertions(+), 17 deletions(-) diff --git a/parquet/file/file_writer_test.go b/parquet/file/file_writer_test.go index 5997b107..98434d58 100644 --- a/parquet/file/file_writer_test.go +++ b/parquet/file/file_writer_test.go @@ -18,6 +18,7 @@ package file_test import ( "bytes" + "encoding/binary" "fmt" "math" "reflect" @@ -27,6 +28,7 @@ import ( "github.com/apache/arrow-go/v18/arrow" "github.com/apache/arrow-go/v18/arrow/array" + "github.com/apache/arrow-go/v18/arrow/endian" "github.com/apache/arrow-go/v18/arrow/memory" "github.com/apache/arrow-go/v18/parquet" "github.com/apache/arrow-go/v18/parquet/compress" @@ -498,9 +500,11 @@ type errCloseWriter struct { func (c *errCloseWriter) Write(p []byte) (n int, err error) { return c.sink.Write(p) } + func (c *errCloseWriter) Close() error { return fmt.Errorf("error during close") } + func (c *errCloseWriter) Bytes() []byte { return c.sink.Bytes() } @@ -669,6 +673,13 @@ func NewColumnIndexObject(colIdx metadata.ColumnIndex) (ret ColumnIndexObject) { } func simpleEncode[T int32 | int64 | float32 | float64](val T) []byte { + if endian.IsBigEndian { + buf := bytes.NewBuffer(nil) + if err := binary.Write(buf, binary.LittleEndian, val); err != nil { + panic(err) + } + return buf.Bytes() + } return unsafe.Slice((*byte)(unsafe.Pointer(&val)), unsafe.Sizeof(val)) } @@ -987,10 +998,16 @@ func (t *PageIndexRoundTripSuite) TestMultiplePages() { t.Equal(t.columnIndexes, []ColumnIndexObject{ { nullPages: []bool{false, false, false, true}, - minValues: [][]byte{simpleEncode(int64(1)), simpleEncode(int64(3)), - simpleEncode(int64(6)), {}}, - maxValues: [][]byte{simpleEncode(int64(2)), simpleEncode(int64(4)), - simpleEncode(int64(6)), {}}, + minValues: [][]byte{ + simpleEncode(int64(1)), simpleEncode(int64(3)), + simpleEncode(int64(6)), + {}, + }, + maxValues: [][]byte{ + simpleEncode(int64(2)), simpleEncode(int64(4)), + simpleEncode(int64(6)), + {}, + }, boundaryOrder: metadata.Ascending, nullCounts: []int64{0, 0, 1, 2}, }, { diff --git a/parquet/internal/encoding/encoding_utils_big_endian.go b/parquet/internal/encoding/encoding_utils_big_endian.go index 96783cc9..02acdd66 100644 --- a/parquet/internal/encoding/encoding_utils_big_endian.go +++ b/parquet/internal/encoding/encoding_utils_big_endian.go @@ -31,7 +31,9 @@ func writeLE[T fixedLenTypes](enc *encoder, in []T) { case parquet.Int96: enc.append(getBytes(in)) default: - binary.Write(enc.sink, binary.LittleEndian, in) + if err := binary.Write(enc.sink, binary.LittleEndian, in); err != nil { + panic(err) + } } } @@ -42,6 +44,8 @@ func copyFrom[T fixedLenTypes](dst []T, src []byte) { copy(dst, fromBytes[T](src)) default: r := bytes.NewReader(src) - binary.Read(r, binary.LittleEndian, dst) + if err := binary.Read(r, binary.LittleEndian, dst); err != nil { + panic(err) + } } } diff --git a/parquet/internal/encoding/plain_encoding_types.go b/parquet/internal/encoding/plain_encoding_types.go index e378fed5..f026e7ed 100644 --- a/parquet/internal/encoding/plain_encoding_types.go +++ b/parquet/internal/encoding/plain_encoding_types.go @@ -89,7 +89,7 @@ func (dec *PlainDecoder[T]) Decode(out []T) (int, error) { dec.Type(), max, nbytes, len(dec.data)) } - copyFrom(out, dec.data[:nbytes]) + copyFrom(out[:max], dec.data[:nbytes]) dec.data = dec.data[nbytes:] dec.nvals -= max return max, nil @@ -130,13 +130,15 @@ func (dec *PlainDecoder[T]) DecodeSpaced(out []T, nullCount int, validBits []byt return nvalues, nil } -type PlainInt32Encoder = PlainEncoder[int32] -type PlainInt32Decoder = PlainDecoder[int32] -type PlainInt64Encoder = PlainEncoder[int64] -type PlainInt64Decoder = PlainDecoder[int64] -type PlainFloat32Encoder = PlainEncoder[float32] -type PlainFloat32Decoder = PlainDecoder[float32] -type PlainFloat64Encoder = PlainEncoder[float64] -type PlainFloat64Decoder = PlainDecoder[float64] -type PlainInt96Encoder = PlainEncoder[parquet.Int96] -type PlainInt96Decoder = PlainDecoder[parquet.Int96] +type ( + PlainInt32Encoder = PlainEncoder[int32] + PlainInt32Decoder = PlainDecoder[int32] + PlainInt64Encoder = PlainEncoder[int64] + PlainInt64Decoder = PlainDecoder[int64] + PlainFloat32Encoder = PlainEncoder[float32] + PlainFloat32Decoder = PlainDecoder[float32] + PlainFloat64Encoder = PlainEncoder[float64] + PlainFloat64Decoder = PlainDecoder[float64] + PlainInt96Encoder = PlainEncoder[parquet.Int96] + PlainInt96Decoder = PlainDecoder[parquet.Int96] +)