Skip to content

Commit dc0e426

Browse files
peter-tothmbutrovich
authored andcommitted
GH-3598: Expose getRowRanges(int) and add getCompressedBytesForRowRanges
### Rationale for this change Opening up APIs needed by a later materialization feature in Spark. External readers (e.g. a Spark-side scanner) need (a) the column-index-derived row ranges that may pass the configured filter for a row group, and (b) a metadata-only estimate of the on-disk compressed bytes those ranges correspond to for the currently requested columns, so they can plan I/O without reading column data. ### What changes are included in this PR? - `getRowRanges(int blockIndex)`: made public; returns row ranges that may pass the configured filter. With no filter, shortcuts to all rows of the row group. - `getCompressedBytesForRowRanges(int blockIndex, RowRanges rowRanges)`: metadata-only sum of compressed page sizes for the reader's currently requested columns whose pages overlap the given row ranges. Dictionary pages are not represented in OffsetIndex and are therefore excluded. ### Are these changes tested? Yes. `TestParquetFileReaderRowRanges` covers: no-filter row ranges cover all rows, empty ranges short-circuit to 0, full ranges equal the per-page OffsetIndex sum and are strictly less than the column-chunk total (proving dictionary-page exclusion), and partial ranges fall between 0 and the full total. ### Are there any user-facing changes? No. Closes #3598 Co-authored-by: Matt Butrovich <mbutrovich@gmail.com>
1 parent 9d9ddca commit dc0e426

2 files changed

Lines changed: 216 additions & 3 deletions

File tree

parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java

Lines changed: 57 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1489,9 +1489,23 @@ public ColumnIndexStore getColumnIndexStore(int blockIndex) {
14891489
return ciStore;
14901490
}
14911491

1492-
private RowRanges getRowRanges(int blockIndex) {
1493-
assert FilterCompat.isFilteringRequired(options.getRecordFilter())
1494-
: "Should not be invoked if filter is null or NOOP";
1492+
/**
1493+
* Computes the {@link RowRanges} within the given row group that may pass the configured filter
1494+
* (set via {@link ParquetReadOptions} or {@link ParquetInputFormat#setFilterPredicate}). If no
1495+
* filter is configured, returns a {@link RowRanges} covering all rows in the row group.
1496+
*
1497+
* <p>This computation is metadata-only: it consults each filter-referenced column's column
1498+
* index from the file footer; no column data is read from disk. The result can be passed to
1499+
* {@link #readFilteredRowGroup(int, RowRanges)} (intersected with any caller-supplied row
1500+
* ranges if desired) to read only the matching pages.
1501+
*
1502+
* @param blockIndex the row group (block) index
1503+
* @return row ranges within the block that may pass the configured filter
1504+
*/
1505+
public RowRanges getRowRanges(int blockIndex) {
1506+
if (!FilterCompat.isFilteringRequired(options.getRecordFilter())) {
1507+
return RowRanges.createSingle(blocks.get(blockIndex).getRowCount());
1508+
}
14951509
RowRanges rowRanges = blockRowRanges.get(blockIndex);
14961510
if (rowRanges == null) {
14971511
rowRanges = ColumnIndexFilter.calculateRowRanges(
@@ -1504,6 +1518,46 @@ private RowRanges getRowRanges(int blockIndex) {
15041518
return rowRanges;
15051519
}
15061520

1521+
/**
1522+
* Returns the total compressed byte count of this reader's requested columns' pages whose
1523+
* row ranges intersect {@code rowRanges} within the given row group. The set of columns is
1524+
* taken from the reader's currently configured requested schema (see
1525+
* {@link #setRequestedSchema}). Metadata-only: consults each column's {@link OffsetIndex}
1526+
* from the file footer; no column data is read.
1527+
*
1528+
* <p>Page size here is {@link OffsetIndex#getCompressedPageSize} (includes page header).
1529+
* Dictionary pages are not represented in {@link OffsetIndex} and are therefore excluded
1530+
* from the sum.
1531+
*
1532+
* @param blockIndex row group index
1533+
* @param rowRanges row ranges to intersect against pages
1534+
* @return sum of compressed page sizes across requested columns for pages overlapping
1535+
* {@code rowRanges}
1536+
* @throws ColumnIndexStore.MissingOffsetIndexException if any requested column lacks an
1537+
* offset index
1538+
*/
1539+
public long getCompressedBytesForRowRanges(int blockIndex, RowRanges rowRanges) {
1540+
if (rowRanges.rowCount() == 0 || paths.isEmpty()) {
1541+
return 0L;
1542+
}
1543+
BlockMetaData block = blocks.get(blockIndex);
1544+
long blockRowCount = block.getRowCount();
1545+
ColumnIndexStore ciStore = getColumnIndexStore(blockIndex);
1546+
long total = 0L;
1547+
for (ColumnPath path : paths.keySet()) {
1548+
OffsetIndex offsetIndex = ciStore.getOffsetIndex(path);
1549+
int pageCount = offsetIndex.getPageCount();
1550+
for (int i = 0; i < pageCount; i++) {
1551+
long from = offsetIndex.getFirstRowIndex(i);
1552+
long to = offsetIndex.getLastRowIndex(i, blockRowCount);
1553+
if (rowRanges.isOverlapping(from, to)) {
1554+
total += offsetIndex.getCompressedPageSize(i);
1555+
}
1556+
}
1557+
}
1558+
return total;
1559+
}
1560+
15071561
public boolean skipNextRowGroup() {
15081562
return advanceToNextBlock();
15091563
}
Lines changed: 159 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,159 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.parquet.hadoop;
20+
21+
import static org.apache.parquet.hadoop.ParquetFileWriter.Mode.OVERWRITE;
22+
import static org.junit.Assert.assertEquals;
23+
import static org.junit.Assert.assertTrue;
24+
25+
import java.io.File;
26+
import java.io.IOException;
27+
import java.util.PrimitiveIterator;
28+
import java.util.stream.IntStream;
29+
import org.apache.hadoop.conf.Configuration;
30+
import org.apache.hadoop.fs.Path;
31+
import org.apache.parquet.HadoopReadOptions;
32+
import org.apache.parquet.ParquetReadOptions;
33+
import org.apache.parquet.example.data.Group;
34+
import org.apache.parquet.example.data.simple.SimpleGroupFactory;
35+
import org.apache.parquet.hadoop.example.ExampleParquetWriter;
36+
import org.apache.parquet.hadoop.metadata.BlockMetaData;
37+
import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
38+
import org.apache.parquet.hadoop.util.HadoopInputFile;
39+
import org.apache.parquet.internal.column.columnindex.OffsetIndex;
40+
import org.apache.parquet.internal.filter2.columnindex.RowRanges;
41+
import org.apache.parquet.schema.MessageType;
42+
import org.apache.parquet.schema.MessageTypeParser;
43+
import org.junit.Before;
44+
import org.junit.Rule;
45+
import org.junit.Test;
46+
import org.junit.rules.TemporaryFolder;
47+
48+
/**
49+
* Tests {@link ParquetFileReader#getRowRanges(int)} and
50+
* {@link ParquetFileReader#getCompressedBytesForRowRanges(int, RowRanges)}.
51+
*/
52+
public class TestParquetFileReaderRowRanges {
53+
54+
private static final int ROW_COUNT = 10_000;
55+
private static final MessageType SCHEMA =
56+
MessageTypeParser.parseMessageType("message test { required int64 id; required int64 grp; }");
57+
58+
@Rule
59+
public final TemporaryFolder temp = new TemporaryFolder();
60+
61+
private Path file;
62+
63+
@Before
64+
public void writeFile() throws IOException {
65+
File f = temp.newFile();
66+
f.delete();
67+
file = new Path(f.toURI());
68+
69+
// Small page size produces many pages per column chunk; low-cardinality `grp`
70+
// ensures dictionary encoding kicks in so we can verify dictionary-page exclusion.
71+
try (ParquetWriter<Group> writer = ExampleParquetWriter.builder(file)
72+
.withType(SCHEMA)
73+
.withWriteMode(OVERWRITE)
74+
.withRowGroupSize(64L * 1024 * 1024)
75+
.withPageSize(4 * 1024)
76+
.withDictionaryEncoding(true)
77+
.build()) {
78+
SimpleGroupFactory factory = new SimpleGroupFactory(SCHEMA);
79+
for (int i = 0; i < ROW_COUNT; i++) {
80+
writer.write(factory.newGroup().append("id", (long) i).append("grp", (long) (i % 8)));
81+
}
82+
}
83+
}
84+
85+
private ParquetFileReader openReader() throws IOException {
86+
Configuration conf = new Configuration();
87+
ParquetReadOptions options = HadoopReadOptions.builder(conf).build();
88+
return ParquetFileReader.open(HadoopInputFile.fromPath(file, conf), options);
89+
}
90+
91+
@Test
92+
public void getRowRangesWithoutFilterCoversAllRows() throws IOException {
93+
try (ParquetFileReader reader = openReader()) {
94+
assertEquals(1, reader.getRowGroups().size());
95+
BlockMetaData block = reader.getRowGroups().get(0);
96+
97+
RowRanges ranges = reader.getRowRanges(0);
98+
99+
assertEquals(block.getRowCount(), ranges.rowCount());
100+
assertTrue(ranges.isOverlapping(0L, block.getRowCount() - 1));
101+
}
102+
}
103+
104+
@Test
105+
public void getCompressedBytesForEmptyRangesIsZero() throws IOException {
106+
try (ParquetFileReader reader = openReader()) {
107+
assertEquals(0L, reader.getCompressedBytesForRowRanges(0, RowRanges.EMPTY));
108+
}
109+
}
110+
111+
@Test
112+
public void getCompressedBytesForFullRangesEqualsOffsetIndexSum() throws IOException {
113+
try (ParquetFileReader reader = openReader()) {
114+
BlockMetaData block = reader.getRowGroups().get(0);
115+
RowRanges full = reader.getRowRanges(0);
116+
117+
long expected = 0L;
118+
long columnChunkTotal = 0L;
119+
for (ColumnChunkMetaData col : block.getColumns()) {
120+
OffsetIndex oi = reader.readOffsetIndex(col);
121+
for (int p = 0; p < oi.getPageCount(); p++) {
122+
expected += oi.getCompressedPageSize(p);
123+
}
124+
columnChunkTotal += col.getTotalSize();
125+
}
126+
127+
assertEquals(expected, reader.getCompressedBytesForRowRanges(0, full));
128+
129+
// Dictionary pages aren't represented in OffsetIndex, so the per-page sum
130+
// must be strictly smaller than the column-chunk totals (which include them).
131+
assertTrue(
132+
"expected dictionary-page exclusion: " + expected + " < " + columnChunkTotal,
133+
expected < columnChunkTotal);
134+
}
135+
}
136+
137+
@Test
138+
public void getCompressedBytesForPartialRangesIsBetweenZeroAndFull() throws IOException {
139+
try (ParquetFileReader reader = openReader()) {
140+
BlockMetaData block = reader.getRowGroups().get(0);
141+
RowRanges full = reader.getRowRanges(0);
142+
long fullBytes = reader.getCompressedBytesForRowRanges(0, full);
143+
144+
// Build a partial RowRanges from the first half of the pages of an arbitrary column;
145+
// since all columns share row counts, the resulting range applies to every column.
146+
OffsetIndex anyOi = reader.readOffsetIndex(block.getColumns().get(0));
147+
int halfPageCount = Math.max(1, anyOi.getPageCount() / 2);
148+
PrimitiveIterator.OfInt pages = IntStream.range(0, halfPageCount).iterator();
149+
RowRanges partial = RowRanges.create(block.getRowCount(), pages, anyOi);
150+
151+
long partialBytes = reader.getCompressedBytesForRowRanges(0, partial);
152+
153+
assertTrue("partial bytes should be > 0: " + partialBytes, partialBytes > 0);
154+
assertTrue(
155+
"partial bytes should be < full bytes: " + partialBytes + " < " + fullBytes,
156+
partialBytes < fullBytes);
157+
}
158+
}
159+
}

0 commit comments

Comments
 (0)