Skip to content

Commit 949b99b

Browse files
committed
GH-3628: prevent NPE & unclosed releaser
1 parent 65884b5 commit 949b99b

2 files changed

Lines changed: 157 additions & 3 deletions

File tree

parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageReadStore.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -410,9 +410,14 @@ void setReleaser(ByteBufferReleaser releaser) {
410410

411411
@Override
412412
public void close() {
413-
for (ColumnChunkPageReader reader : readers.values()) {
414-
reader.releaseBuffers();
413+
try {
414+
for (ColumnChunkPageReader reader : readers.values()) {
415+
reader.releaseBuffers();
416+
}
417+
} finally {
418+
if (releaser != null) {
419+
releaser.close();
420+
}
415421
}
416-
releaser.close();
417422
}
418423
}
Lines changed: 149 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,149 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.parquet.hadoop;
20+
21+
import static org.apache.parquet.column.Encoding.PLAIN;
22+
import static org.apache.parquet.column.Encoding.RLE;
23+
import static org.junit.Assert.assertEquals;
24+
25+
import java.nio.ByteBuffer;
26+
import java.util.Collections;
27+
import org.apache.parquet.ParquetReadOptions;
28+
import org.apache.parquet.bytes.ByteBufferAllocator;
29+
import org.apache.parquet.bytes.ByteBufferReleaser;
30+
import org.apache.parquet.bytes.BytesInput;
31+
import org.apache.parquet.bytes.HeapByteBufferAllocator;
32+
import org.apache.parquet.bytes.TrackingByteBufferAllocator;
33+
import org.apache.parquet.column.ColumnDescriptor;
34+
import org.apache.parquet.column.page.DataPage;
35+
import org.apache.parquet.column.page.DataPageV1;
36+
import org.apache.parquet.compression.CompressionCodecFactory.BytesInputDecompressor;
37+
import org.apache.parquet.hadoop.ColumnChunkPageReadStore.ColumnChunkPageReader;
38+
import org.apache.parquet.schema.PrimitiveType;
39+
import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName;
40+
import org.apache.parquet.schema.Type.Repetition;
41+
import org.junit.Test;
42+
43+
public class TestColumnChunkPageReadStore {
44+
45+
private static final ColumnDescriptor COLUMN = new ColumnDescriptor(
46+
new String[] {"x"}, new PrimitiveType(Repetition.OPTIONAL, PrimitiveTypeName.INT32, "x"), 0, 0);
47+
48+
private static final BytesInputDecompressor NOOP_DECOMPRESSOR = new BytesInputDecompressor() {
49+
@Override
50+
public BytesInput decompress(BytesInput bytes, int decompressedSize) {
51+
return bytes;
52+
}
53+
54+
@Override
55+
public void decompress(ByteBuffer input, int compressedSize, ByteBuffer output, int decompressedSize) {}
56+
57+
@Override
58+
public void release() {}
59+
};
60+
61+
@Test
62+
public void closeWithoutSetReleaserDoesNotThrow() {
63+
try (TrackingByteBufferAllocator allocator = TrackingByteBufferAllocator.wrap(new HeapByteBufferAllocator())) {
64+
ParquetReadOptions options =
65+
ParquetReadOptions.builder().withAllocator(allocator).build();
66+
67+
ColumnChunkPageReadStore store = new ColumnChunkPageReadStore(0L);
68+
store.addColumn(COLUMN, newReaderWithoutPages(options));
69+
70+
// setReleaser() is intentionally NOT called here.
71+
store.close();
72+
}
73+
}
74+
75+
@Test
76+
public void closeReleasesReleaserEvenWhenReaderThrows() throws Exception {
77+
RuntimeException releaseFailure = new RuntimeException("release boom");
78+
79+
ByteBufferAllocator throwingAllocator = new ByteBufferAllocator() {
80+
@Override
81+
public ByteBuffer allocate(int size) {
82+
return ByteBuffer.allocateDirect(size);
83+
}
84+
85+
@Override
86+
public void release(ByteBuffer b) {
87+
throw releaseFailure;
88+
}
89+
90+
@Override
91+
public boolean isDirect() {
92+
return true;
93+
}
94+
};
95+
96+
try (TrackingByteBufferAllocator storeAllocator =
97+
TrackingByteBufferAllocator.wrap(new HeapByteBufferAllocator())) {
98+
ColumnChunkPageReadStore store = new ColumnChunkPageReadStore(1L);
99+
store.addColumn(COLUMN, newReaderWithQueuedBuffer(throwingAllocator));
100+
101+
// The store-level releaser holds a tracked buffer that must be released by close()'s finally block.
102+
ByteBufferReleaser storeReleaser = new ByteBufferReleaser(storeAllocator);
103+
storeReleaser.releaseLater(storeAllocator.allocate(8));
104+
store.setReleaser(storeReleaser);
105+
106+
try {
107+
store.close();
108+
throw new AssertionError("Expected close() to propagate the reader failure");
109+
} catch (RuntimeException e) {
110+
assertEquals(releaseFailure, e);
111+
}
112+
}
113+
}
114+
115+
private static ColumnChunkPageReader newReaderWithoutPages(ParquetReadOptions options) {
116+
return new ColumnChunkPageReader(
117+
NOOP_DECOMPRESSOR, Collections.<DataPage>emptyList(), null, null, 0L, null, null, 0, 0, options);
118+
}
119+
120+
private static ColumnChunkPageReader newReaderWithQueuedBuffer(ByteBufferAllocator allocator) {
121+
ParquetReadOptions options = ParquetReadOptions.builder()
122+
.withAllocator(allocator)
123+
.useOffHeapDecryptBuffer(true)
124+
.build();
125+
126+
ByteBuffer pageBytes = ByteBuffer.allocateDirect(4);
127+
pageBytes.putInt(0, 42);
128+
DataPageV1 page = new DataPageV1(BytesInput.from(pageBytes), 1, 4, null, RLE, RLE, PLAIN);
129+
130+
ColumnChunkPageReader reader = new ColumnChunkPageReader(
131+
NOOP_DECOMPRESSOR,
132+
Collections.<DataPage>singletonList(page),
133+
null,
134+
null,
135+
1L,
136+
null,
137+
null,
138+
0,
139+
0,
140+
options);
141+
142+
// Reading the page through the off-heap path queues a buffer into the reader's internal releaser, so
143+
// releaseBuffers() will later invoke the throwing allocator's release().
144+
if (reader.readPage() == null) {
145+
throw new IllegalStateException("Expected a page to be read");
146+
}
147+
return reader;
148+
}
149+
}

0 commit comments

Comments
 (0)