Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 21 additions & 4 deletions parquet/file/file_writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package file_test

import (
"bytes"
"encoding/binary"
"fmt"
"math"
"reflect"
Expand All @@ -27,6 +28,7 @@ import (

"github.com/apache/arrow-go/v18/arrow"
"github.com/apache/arrow-go/v18/arrow/array"
"github.com/apache/arrow-go/v18/arrow/endian"
"github.com/apache/arrow-go/v18/arrow/memory"
"github.com/apache/arrow-go/v18/parquet"
"github.com/apache/arrow-go/v18/parquet/compress"
Expand Down Expand Up @@ -498,9 +500,11 @@ type errCloseWriter struct {
func (c *errCloseWriter) Write(p []byte) (n int, err error) {
return c.sink.Write(p)
}

func (c *errCloseWriter) Close() error {
return fmt.Errorf("error during close")
}

func (c *errCloseWriter) Bytes() []byte {
return c.sink.Bytes()
}
Expand Down Expand Up @@ -669,6 +673,13 @@ func NewColumnIndexObject(colIdx metadata.ColumnIndex) (ret ColumnIndexObject) {
}

func simpleEncode[T int32 | int64 | float32 | float64](val T) []byte {
if endian.IsBigEndian {
buf := bytes.NewBuffer(nil)
if err := binary.Write(buf, binary.LittleEndian, val); err != nil {
panic(err)
}
return buf.Bytes()
}
Comment on lines +676 to +682
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there is a ToLE function in internal/utils/ that might be better for this. For example:

val = utils.ToLE(val)
return unsafe.Slice((*byte)(unsafe.Pointer(&val)), unsafe.Sizeof(val))

This would work because on LittleEndian systems, ToLE(val) is just defined as return val.

return unsafe.Slice((*byte)(unsafe.Pointer(&val)), unsafe.Sizeof(val))
}

Expand Down Expand Up @@ -987,10 +998,16 @@ func (t *PageIndexRoundTripSuite) TestMultiplePages() {
t.Equal(t.columnIndexes, []ColumnIndexObject{
{
nullPages: []bool{false, false, false, true},
minValues: [][]byte{simpleEncode(int64(1)), simpleEncode(int64(3)),
simpleEncode(int64(6)), {}},
maxValues: [][]byte{simpleEncode(int64(2)), simpleEncode(int64(4)),
simpleEncode(int64(6)), {}},
minValues: [][]byte{
simpleEncode(int64(1)), simpleEncode(int64(3)),
simpleEncode(int64(6)),
{},
},
maxValues: [][]byte{
simpleEncode(int64(2)), simpleEncode(int64(4)),
simpleEncode(int64(6)),
{},
},
boundaryOrder: metadata.Ascending, nullCounts: []int64{0, 0, 1, 2},
},
{
Expand Down
39 changes: 5 additions & 34 deletions parquet/internal/encoding/byte_stream_split.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,38 +88,6 @@ func encodeByteStreamSplitWidth8(data []byte, in []byte) {
}
}

// decodeByteStreamSplitBatchWidth4 decodes the batch of nValues raw bytes representing a 4-byte datatype provided by 'data',
// into the output buffer 'out' using BYTE_STREAM_SPLIT encoding.
// 'out' must have space for at least len(data) bytes.
func decodeByteStreamSplitBatchWidth4(data []byte, nValues, stride int, out []byte) {
const width = 4
debug.Assert(len(out) >= nValues*width, fmt.Sprintf("not enough space in output buffer for decoding, out: %d bytes, data: %d bytes", len(out), len(data)))
for element := 0; element < nValues; element++ {
out[width*element] = data[element]
out[width*element+1] = data[stride+element]
out[width*element+2] = data[2*stride+element]
out[width*element+3] = data[3*stride+element]
}
}

// decodeByteStreamSplitBatchWidth8 decodes the batch of nValues raw bytes representing a 8-byte datatype provided by 'data',
// into the output buffer 'out' using BYTE_STREAM_SPLIT encoding.
// 'out' must have space for at least len(data) bytes.
func decodeByteStreamSplitBatchWidth8(data []byte, nValues, stride int, out []byte) {
const width = 8
debug.Assert(len(out) >= nValues*width, fmt.Sprintf("not enough space in output buffer for decoding, out: %d bytes, data: %d bytes", len(out), len(data)))
for element := 0; element < nValues; element++ {
out[width*element] = data[element]
out[width*element+1] = data[stride+element]
out[width*element+2] = data[2*stride+element]
out[width*element+3] = data[3*stride+element]
out[width*element+4] = data[4*stride+element]
out[width*element+5] = data[5*stride+element]
out[width*element+6] = data[6*stride+element]
out[width*element+7] = data[7*stride+element]
}
}

// decodeByteStreamSplitBatchFLBA decodes the batch of nValues FixedLenByteArrays provided by 'data',
// into the output slice 'out' using BYTE_STREAM_SPLIT encoding.
// 'out' must have space for at least nValues slices.
Expand Down Expand Up @@ -303,12 +271,15 @@ func (dec *ByteStreamSplitDecoder[T]) Decode(out []T) (int, error) {
return 0, xerrors.New("parquet: eof exception")
}

// reinterpret the output slice as bytes so that we can decode directly into it without an intermediate copy
// however, the byte stream split encoding is defined in little-endian order, so we need to decode the bytes
// into the output slice in the correct order based on the machine's endianness
outBytes := arrow.GetBytes(out)
switch typeLen {
case 4:
decodeByteStreamSplitBatchWidth4(dec.data, toRead, dec.stride, outBytes)
decodeByteStreamSplitBatchWidth4InByteOrder(dec.data, toRead, dec.stride, outBytes)
case 8:
decodeByteStreamSplitBatchWidth8(dec.data, toRead, dec.stride, outBytes)
decodeByteStreamSplitBatchWidth8InByteOrder(dec.data, toRead, dec.stride, outBytes)
default:
return 0, fmt.Errorf("encoding ByteStreamSplit is only defined for numeric type of width 4 or 8, found: %d", typeLen)
}
Expand Down
61 changes: 61 additions & 0 deletions parquet/internal/encoding/byte_stream_split_big_endian.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

//go:build s390x

package encoding

import (
"fmt"

"github.com/apache/arrow-go/v18/parquet/internal/debug"
)

// decodeByteStreamSplitBatchWidth4InByteOrder decodes the batch of nValues raw bytes representing a 4-byte datatype provided
// by 'data', into the output buffer 'out' using BYTE_STREAM_SPLIT encoding. The values are expected to be in little-endian
// byte order and are be decoded into the 'out' array in machine's native endianness.
// 'out' must have space for at least len(data) bytes.
func decodeByteStreamSplitBatchWidth4InByteOrder(data []byte, nValues, stride int, out []byte) {
const width = 4
debug.Assert(len(out) >= nValues*width, fmt.Sprintf("not enough space in output buffer for decoding, out: %d bytes, data: %d bytes", len(out), len(data)))
for element := 0; element < nValues; element++ {
// Big Endian: most significant byte first
out[width*element+0] = data[3*stride+element]
out[width*element+1] = data[2*stride+element]
out[width*element+2] = data[stride+element]
out[width*element+3] = data[element]
}
}

// decodeByteStreamSplitBatchWidth8InByteOrder decodes the batch of nValues raw bytes representing a 8-byte datatype provided
// by 'data', into the output buffer 'out' using BYTE_STREAM_SPLIT encoding. The values are expected to be in little-endian
// byte order and are be decoded into the 'out' array in machine's native endianness.
// 'out' must have space for at least len(data) bytes.
func decodeByteStreamSplitBatchWidth8InByteOrder(data []byte, nValues, stride int, out []byte) {
const width = 8
debug.Assert(len(out) >= nValues*width, fmt.Sprintf("not enough space in output buffer for decoding, out: %d bytes, data: %d bytes", len(out), len(data)))
for element := 0; element < nValues; element++ {
// Big Endian: most significant byte first
out[width*element+0] = data[7*stride+element]
out[width*element+1] = data[6*stride+element]
out[width*element+2] = data[5*stride+element]
out[width*element+3] = data[4*stride+element]
out[width*element+4] = data[3*stride+element]
out[width*element+5] = data[2*stride+element]
out[width*element+6] = data[stride+element]
out[width*element+7] = data[element]
}
}
61 changes: 61 additions & 0 deletions parquet/internal/encoding/byte_stream_split_little_endian.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

//go:build !s390x

package encoding

import (
"fmt"

"github.com/apache/arrow-go/v18/parquet/internal/debug"
)

// decodeByteStreamSplitBatchWidth4InByteOrder decodes the batch of nValues raw bytes representing a 4-byte datatype provided
// by 'data', into the output buffer 'out' using BYTE_STREAM_SPLIT encoding. The values are expected to be in little-endian
// byte order and are be decoded into the 'out' array in machine's native endianness.
// 'out' must have space for at least len(data) bytes.
func decodeByteStreamSplitBatchWidth4InByteOrder(data []byte, nValues, stride int, out []byte) {
const width = 4
debug.Assert(len(out) >= nValues*width, fmt.Sprintf("not enough space in output buffer for decoding, out: %d bytes, data: %d bytes", len(out), len(data)))
for element := 0; element < nValues; element++ {
// Little Endian: least significant byte first
out[width*element+0] = data[element]
out[width*element+1] = data[stride+element]
out[width*element+2] = data[2*stride+element]
out[width*element+3] = data[3*stride+element]
}
}

// decodeByteStreamSplitBatchWidth8InByteOrder decodes the batch of nValues raw bytes representing a 8-byte datatype provided
// by 'data', into the output buffer 'out' using BYTE_STREAM_SPLIT encoding. The values are expected to be in little-endian
// byte order and are be decoded into the 'out' array in machine's native endianness.
// 'out' must have space for at least len(data) bytes.
func decodeByteStreamSplitBatchWidth8InByteOrder(data []byte, nValues, stride int, out []byte) {
const width = 8
debug.Assert(len(out) >= nValues*width, fmt.Sprintf("not enough space in output buffer for decoding, out: %d bytes, data: %d bytes", len(out), len(data)))
for element := 0; element < nValues; element++ {
// Little Endian: least significant byte first
out[width*element+0] = data[element]
out[width*element+1] = data[stride+element]
out[width*element+2] = data[2*stride+element]
out[width*element+3] = data[3*stride+element]
out[width*element+4] = data[4*stride+element]
out[width*element+5] = data[5*stride+element]
out[width*element+6] = data[6*stride+element]
out[width*element+7] = data[7*stride+element]
}
}
8 changes: 6 additions & 2 deletions parquet/internal/encoding/encoding_utils_big_endian.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@ func writeLE[T fixedLenTypes](enc *encoder, in []T) {
case parquet.Int96:
enc.append(getBytes(in))
default:
binary.Write(enc.sink, binary.LittleEndian, in)
if err := binary.Write(enc.sink, binary.LittleEndian, in); err != nil {
panic(err)
}
}
}

Expand All @@ -42,6 +44,8 @@ func copyFrom[T fixedLenTypes](dst []T, src []byte) {
copy(dst, fromBytes[T](src))
default:
r := bytes.NewReader(src)
binary.Read(r, binary.LittleEndian, dst)
if err := binary.Read(r, binary.LittleEndian, dst); err != nil {
panic(err)
}
}
}
24 changes: 13 additions & 11 deletions parquet/internal/encoding/plain_encoding_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func (dec *PlainDecoder[T]) Decode(out []T) (int, error) {
dec.Type(), max, nbytes, len(dec.data))
}

copyFrom(out, dec.data[:nbytes])
copyFrom(out[:max], dec.data[:nbytes])
dec.data = dec.data[nbytes:]
dec.nvals -= max
return max, nil
Expand Down Expand Up @@ -130,13 +130,15 @@ func (dec *PlainDecoder[T]) DecodeSpaced(out []T, nullCount int, validBits []byt
return nvalues, nil
}

type PlainInt32Encoder = PlainEncoder[int32]
type PlainInt32Decoder = PlainDecoder[int32]
type PlainInt64Encoder = PlainEncoder[int64]
type PlainInt64Decoder = PlainDecoder[int64]
type PlainFloat32Encoder = PlainEncoder[float32]
type PlainFloat32Decoder = PlainDecoder[float32]
type PlainFloat64Encoder = PlainEncoder[float64]
type PlainFloat64Decoder = PlainDecoder[float64]
type PlainInt96Encoder = PlainEncoder[parquet.Int96]
type PlainInt96Decoder = PlainDecoder[parquet.Int96]
type (
PlainInt32Encoder = PlainEncoder[int32]
PlainInt32Decoder = PlainDecoder[int32]
PlainInt64Encoder = PlainEncoder[int64]
PlainInt64Decoder = PlainDecoder[int64]
PlainFloat32Encoder = PlainEncoder[float32]
PlainFloat32Decoder = PlainDecoder[float32]
PlainFloat64Encoder = PlainEncoder[float64]
PlainFloat64Decoder = PlainDecoder[float64]
PlainInt96Encoder = PlainEncoder[parquet.Int96]
PlainInt96Decoder = PlainDecoder[parquet.Int96]
)
Loading