From da1f9e6e160147b46fa0212b2743dc1962b556c2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Isma=C3=ABl=20Mej=C3=ADa?= Date: Sun, 19 Apr 2026 10:50:32 +0000 Subject: [PATCH 1/2] GH-3493: Optimize PlainValuesReader with direct ByteBuffer reads Replace the LittleEndianDataInputStream wrapper with direct ByteBuffer access using LITTLE_ENDIAN byte order in PlainValuesReader. Each read{Integer,Long,Float,Double}() previously dispatched through 4 in.read() calls per value and assembled the result with manual bit shifts; it now compiles to a single ByteBuffer get*() JVM intrinsic. In initFromPage, the page data is obtained as a single contiguous ByteBuffer via ByteBufferInputStream.slice(available). The ByteBufferInputStream.slice() method handles both single-buffer (zero-copy view) and multi-buffer (copy into contiguous buffer) cases transparently. In practice page data is almost always a single contiguous buffer. Benchmark (IntEncodingBenchmark.decodePlain, 100,000 INT32 values per invocation): Pattern Before (ops/s) After (ops/s) Speedup SEQUENTIAL 92,918,297 1,143,149,235 12.3x RANDOM 92,126,888 1,147,547,093 12.5x LOW_CARDINALITY 93,005,451 1,142,666,760 12.3x HIGH_CARDINALITY 93,312,596 1,144,681,876 12.3x The improvement is consistent regardless of data distribution because the bottleneck was entirely in the dispatch overhead. All four numeric plain reader types (int, long, float, double) benefit equally. All 573 parquet-column tests pass. --- .../values/plain/PlainValuesReader.java | 74 ++++++------------- pom.xml | 2 + 2 files changed, 25 insertions(+), 51 deletions(-) diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/plain/PlainValuesReader.java b/parquet-column/src/main/java/org/apache/parquet/column/values/plain/PlainValuesReader.java index a0c7af7394..cab438a4b0 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/values/plain/PlainValuesReader.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/values/plain/PlainValuesReader.java @@ -19,25 +19,36 @@ package org.apache.parquet.column.values.plain; import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; import org.apache.parquet.bytes.ByteBufferInputStream; import org.apache.parquet.bytes.LittleEndianDataInputStream; import org.apache.parquet.column.values.ValuesReader; -import org.apache.parquet.io.ParquetDecodingException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * Plain encoding for float, double, int, long + * Plain encoding for float, double, int, long. + * + *

Reads directly from a {@link ByteBuffer} with {@link ByteOrder#LITTLE_ENDIAN} byte order, + * bypassing the {@link LittleEndianDataInputStream} wrapper to avoid per-value virtual dispatch + * overhead. The underlying page data is obtained as a single contiguous {@link ByteBuffer} via + * {@link ByteBufferInputStream#slice(int)}. */ public abstract class PlainValuesReader extends ValuesReader { private static final Logger LOG = LoggerFactory.getLogger(PlainValuesReader.class); - protected LittleEndianDataInputStream in; + ByteBuffer buffer; @Override public void initFromPage(int valueCount, ByteBufferInputStream stream) throws IOException { LOG.debug("init from page at offset {} for length {}", stream.position(), stream.available()); - this.in = new LittleEndianDataInputStream(stream.remainingStream()); + int available = stream.available(); + if (available > 0) { + this.buffer = stream.slice(available).order(ByteOrder.LITTLE_ENDIAN); + } else { + this.buffer = ByteBuffer.allocate(0).order(ByteOrder.LITTLE_ENDIAN); + } } @Override @@ -45,31 +56,16 @@ public void skip() { skip(1); } - void skipBytesFully(int n) throws IOException { - int skipped = 0; - while (skipped < n) { - skipped += in.skipBytes(n - skipped); - } - } - public static class DoublePlainValuesReader extends PlainValuesReader { @Override public void skip(int n) { - try { - skipBytesFully(n * 8); - } catch (IOException e) { - throw new ParquetDecodingException("could not skip " + n + " double values", e); - } + buffer.position(buffer.position() + n * 8); } @Override public double readDouble() { - try { - return in.readDouble(); - } catch (IOException e) { - throw new ParquetDecodingException("could not read double", e); - } + return buffer.getDouble(); } } @@ -77,20 +73,12 @@ public static class FloatPlainValuesReader extends PlainValuesReader { @Override public void skip(int n) { - try { - skipBytesFully(n * 4); - } catch (IOException e) { - throw new ParquetDecodingException("could not skip " + n + " floats", e); - } + buffer.position(buffer.position() + n * 4); } @Override public float readFloat() { - try { - return in.readFloat(); - } catch (IOException e) { - throw new ParquetDecodingException("could not read float", e); - } + return buffer.getFloat(); } } @@ -98,20 +86,12 @@ public static class IntegerPlainValuesReader extends PlainValuesReader { @Override public void skip(int n) { - try { - in.skipBytes(n * 4); - } catch (IOException e) { - throw new ParquetDecodingException("could not skip " + n + " ints", e); - } + buffer.position(buffer.position() + n * 4); } @Override public int readInteger() { - try { - return in.readInt(); - } catch (IOException e) { - throw new ParquetDecodingException("could not read int", e); - } + return buffer.getInt(); } } @@ -119,20 +99,12 @@ public static class LongPlainValuesReader extends PlainValuesReader { @Override public void skip(int n) { - try { - in.skipBytes(n * 8); - } catch (IOException e) { - throw new ParquetDecodingException("could not skip " + n + " longs", e); - } + buffer.position(buffer.position() + n * 8); } @Override public long readLong() { - try { - return in.readLong(); - } catch (IOException e) { - throw new ParquetDecodingException("could not read long", e); - } + return buffer.getLong(); } } } diff --git a/pom.xml b/pom.xml index 3b87ba5f12..e0b04801e2 100644 --- a/pom.xml +++ b/pom.xml @@ -569,6 +569,8 @@ org.apache.parquet.internal.column.columnindex.IndexIterator org.apache.parquet.column.values.bytestreamsplit.ByteStreamSplitValuesReader#gatherElementDataFromStreams(byte[]) + + org.apache.parquet.column.values.plain.PlainValuesReader#in org.apache.parquet.arrow.schema.SchemaMapping$TypeMappingVisitor#visit(org.apache.parquet.arrow.schema.SchemaMapping$MapTypeMapping) From 9c06a9467654c2839a5ab7ba6b24d237677c23ad Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Isma=C3=ABl=20Mej=C3=ADa?= Date: Sun, 19 Apr 2026 11:21:47 +0000 Subject: [PATCH 2/2] GH-3493: Deprecate LittleEndianDataInputStream and migrate last test usage After GH-3493 replaced the only production usage of LittleEndianDataInputStream in PlainValuesReader with direct ByteBuffer reads, the class has no remaining production callers. Mark it @Deprecated and document the faster alternative. Migrate the only remaining usage in TestColumnChunkPageWriteStore.intValue() to ByteBuffer.getInt() with LITTLE_ENDIAN order, reading directly from BytesInput.toByteBuffer() instead of round-tripping through a ByteArrayOutputStream + ByteArrayInputStream + LittleEndianDataInputStream. Per-call readInt() on the deprecated class performs 4 virtual in.read() dispatches and manually reassembles the value with bit shifts. The ByteBuffer.getInt() replacement is a HotSpot intrinsic that compiles to a single unaligned load on x86/ARM. The class is left in place (only @Deprecated) for source/binary compatibility of any downstream code that may still reference it. It can be removed in a future major release. All 308 parquet-common, 573 parquet-column, and TestColumnChunkPageWriteStore column-order tests pass. (The two pre-existing JDK Hadoop getSubject failures in TestColumnChunkPageWriteStore are unrelated to this change.) --- .../bytes/LittleEndianDataInputStream.java | 16 +++++++++++++++- .../hadoop/TestColumnChunkPageWriteStore.java | 11 ++--------- 2 files changed, 17 insertions(+), 10 deletions(-) diff --git a/parquet-common/src/main/java/org/apache/parquet/bytes/LittleEndianDataInputStream.java b/parquet-common/src/main/java/org/apache/parquet/bytes/LittleEndianDataInputStream.java index 723971f70a..fe002324e5 100644 --- a/parquet-common/src/main/java/org/apache/parquet/bytes/LittleEndianDataInputStream.java +++ b/parquet-common/src/main/java/org/apache/parquet/bytes/LittleEndianDataInputStream.java @@ -21,10 +21,24 @@ import java.io.EOFException; import java.io.IOException; import java.io.InputStream; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; /** - * Based on DataInputStream but little endian and without the String/char methods + * Based on DataInputStream but little endian and without the String/char methods. + * + * @deprecated This class has no remaining production usages and is + * significantly slower than reading directly from a {@link ByteBuffer} configured with + * {@link ByteOrder#LITTLE_ENDIAN}. Each {@link #readInt()} performs four virtual + * {@code in.read()} calls and reassembles the value with bit shifts, while + * {@link ByteBuffer#getInt()} on a little-endian buffer is a HotSpot intrinsic that + * compiles to a single unaligned load on x86/ARM. For new code, prefer + * {@link ByteBufferInputStream#slice(int)} followed by + * {@code buffer.order(ByteOrder.LITTLE_ENDIAN)} and {@link ByteBuffer#getInt()} / + * {@link ByteBuffer#getLong()} / {@link ByteBuffer#getFloat()} / + * {@link ByteBuffer#getDouble()}. This class will be removed in a future release. */ +@Deprecated public final class LittleEndianDataInputStream extends InputStream { private final InputStream in; diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestColumnChunkPageWriteStore.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestColumnChunkPageWriteStore.java index a17cf678f5..8d735b55ab 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestColumnChunkPageWriteStore.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestColumnChunkPageWriteStore.java @@ -38,9 +38,8 @@ import static org.mockito.ArgumentMatchers.same; import static org.mockito.Mockito.inOrder; -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; import java.io.IOException; +import java.nio.ByteOrder; import java.util.HashMap; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; @@ -49,7 +48,6 @@ import org.apache.parquet.bytes.BytesInput; import org.apache.parquet.bytes.DirectByteBufferAllocator; import org.apache.parquet.bytes.HeapByteBufferAllocator; -import org.apache.parquet.bytes.LittleEndianDataInputStream; import org.apache.parquet.bytes.TrackingByteBufferAllocator; import org.apache.parquet.column.ColumnDescriptor; import org.apache.parquet.column.Encoding; @@ -249,12 +247,7 @@ public void test(Configuration config, ByteBufferAllocator allocator) throws Exc } private int intValue(BytesInput in) throws IOException { - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - in.writeAllTo(baos); - LittleEndianDataInputStream os = new LittleEndianDataInputStream(new ByteArrayInputStream(baos.toByteArray())); - int i = os.readInt(); - os.close(); - return i; + return in.toByteBuffer().order(ByteOrder.LITTLE_ENDIAN).getInt(); } @Test