From 19eb9a1105cfa7d96092b253498c3ee99c9a9560 Mon Sep 17 00:00:00 2001 From: David Cromberge Date: Fri, 26 Jun 2026 10:00:42 +0100 Subject: [PATCH] Add ranged-read support to PinotFS (openForRead) Add additive default methods openForRead(uri, offset, length) and supportsRangedRead() to the PinotFS SPI. Defaults throw / return false so existing implementations are unaffected. Implement for LocalPinotFS (via RandomAccessFile) and GcsPinotFS (via ReadChannel seek/limit), enabling targeted reads of byte ranges (e.g. Parquet footers and column chunks) without downloading whole objects. Co-Authored-By: Claude Opus 4.8 (1M context) --- .../pinot/plugin/filesystem/GcsPinotFS.java | 26 +++++++ .../plugin/filesystem/GcsPinotFSTest.java | 31 ++++++++ .../pinot/spi/filesystem/LocalPinotFS.java | 72 +++++++++++++++++++ .../apache/pinot/spi/filesystem/PinotFS.java | 26 +++++++ .../spi/filesystem/LocalPinotFSTest.java | 42 +++++++++++ 5 files changed, 197 insertions(+) diff --git a/pinot-plugins/pinot-file-system/pinot-gcs/src/main/java/org/apache/pinot/plugin/filesystem/GcsPinotFS.java b/pinot-plugins/pinot-file-system/pinot-gcs/src/main/java/org/apache/pinot/plugin/filesystem/GcsPinotFS.java index 3bddede54024..e1a2d7e66ff4 100644 --- a/pinot-plugins/pinot-file-system/pinot-gcs/src/main/java/org/apache/pinot/plugin/filesystem/GcsPinotFS.java +++ b/pinot-plugins/pinot-file-system/pinot-gcs/src/main/java/org/apache/pinot/plugin/filesystem/GcsPinotFS.java @@ -22,6 +22,7 @@ import com.google.api.gax.rpc.FixedHeaderProvider; import com.google.auth.Credentials; import com.google.auth.oauth2.GoogleCredentials; +import com.google.cloud.ReadChannel; import com.google.cloud.storage.Blob; import com.google.cloud.storage.BlobId; import com.google.cloud.storage.BlobInfo; @@ -390,6 +391,31 @@ public InputStream open(URI uri) } } + @Override + public InputStream openForRead(URI uri, long offset, long length) + throws IOException { + if (offset < 0 || length < 0) { + throw new IllegalArgumentException( + "offset and length must be non-negative, got offset=" + offset + ", length=" + length); + } + try { + Blob blob = getBlob(new GcsUri(uri)); + // ReadChannel.limit is the absolute, exclusive end position; seek positions the start. This issues a + // ranged GET so only [offset, offset + length) is transferred (truncated at end-of-file). + ReadChannel reader = blob.reader(); + reader.seek(offset); + reader.limit(offset + length); + return Channels.newInputStream(reader); + } catch (StorageException e) { + throw new IOException(e); + } + } + + @Override + public boolean supportsRangedRead() { + return true; + } + private Bucket getBucket(GcsUri gcsUri) { return _storage.get(gcsUri.getBucketName()); } diff --git a/pinot-plugins/pinot-file-system/pinot-gcs/src/test/java/org/apache/pinot/plugin/filesystem/GcsPinotFSTest.java b/pinot-plugins/pinot-file-system/pinot-gcs/src/test/java/org/apache/pinot/plugin/filesystem/GcsPinotFSTest.java index 2c552e0c7e7d..6399a4a1fb92 100644 --- a/pinot-plugins/pinot-file-system/pinot-gcs/src/test/java/org/apache/pinot/plugin/filesystem/GcsPinotFSTest.java +++ b/pinot-plugins/pinot-file-system/pinot-gcs/src/test/java/org/apache/pinot/plugin/filesystem/GcsPinotFSTest.java @@ -21,6 +21,7 @@ import com.google.common.io.Closer; import java.io.BufferedWriter; import java.io.IOException; +import java.io.InputStream; import java.io.UncheckedIOException; import java.net.URI; import java.nio.file.Files; @@ -170,6 +171,36 @@ private Stream listFilesToStream(GcsUri gcsUri) return Arrays.asList(_pinotFS.listFiles(gcsUri.getUri(), true)).stream().map(URI::create).map(GcsUri::new); } + @Test + public void testOpenForRead() + throws Exception { + skipIfNotConfigured(); + assertTrue(_pinotFS.supportsRangedRead()); + + byte[] data = new byte[256]; + for (int i = 0; i < data.length; i++) { + data[i] = (byte) i; + } + Path localFile = _localTmpDir.resolve("rangeFile"); + Files.write(localFile, data); + GcsUri gcsUri = _dataDir.resolve("rangeFile"); + _pinotFS.copyFromLocalFile(localFile.toFile(), gcsUri.getUri()); + URI uri = gcsUri.getUri(); + + // Mid-file range [10, 30) + try (InputStream in = _pinotFS.openForRead(uri, 10, 20)) { + assertEquals(in.readAllBytes(), Arrays.copyOfRange(data, 10, 30)); + } + // Length beyond EOF is truncated at end-of-file + try (InputStream in = _pinotFS.openForRead(uri, 250, 100)) { + assertEquals(in.readAllBytes(), Arrays.copyOfRange(data, 250, 256)); + } + // Whole file + try (InputStream in = _pinotFS.openForRead(uri, 0, 256)) { + assertEquals(in.readAllBytes(), data); + } + } + @Test public void testGcs() throws Exception { diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/filesystem/LocalPinotFS.java b/pinot-spi/src/main/java/org/apache/pinot/spi/filesystem/LocalPinotFS.java index 68ae7d013f83..1e71b8420404 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/filesystem/LocalPinotFS.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/filesystem/LocalPinotFS.java @@ -21,10 +21,13 @@ import java.io.BufferedInputStream; import java.io.File; import java.io.FileInputStream; +import java.io.FilterInputStream; import java.io.IOException; import java.io.InputStream; +import java.io.RandomAccessFile; import java.net.URI; import java.net.URLDecoder; +import java.nio.channels.Channels; import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Path; @@ -195,6 +198,30 @@ public InputStream open(URI uri) return new BufferedInputStream(new FileInputStream(toFile(uri))); } + @Override + public InputStream openForRead(URI uri, long offset, long length) + throws IOException { + if (offset < 0 || length < 0) { + throw new IllegalArgumentException( + "offset and length must be non-negative, got offset=" + offset + ", length=" + length); + } + RandomAccessFile randomAccessFile = new RandomAccessFile(toFile(uri), "r"); + try { + randomAccessFile.seek(offset); + // Closing the stream returned by Channels.newInputStream closes the channel, which closes the + // RandomAccessFile, so the caller closing the returned stream releases the file handle. + return new RangeInputStream(Channels.newInputStream(randomAccessFile.getChannel()), length); + } catch (IOException | RuntimeException e) { + randomAccessFile.close(); + throw e; + } + } + + @Override + public boolean supportsRangedRead() { + return true; + } + private static File toFile(URI uri) { // NOTE: Do not use new File(uri) because scheme might not exist and it does not decode '+' to ' ' // Do not use uri.getPath() because it does not decode '+' to ' ' @@ -248,4 +275,49 @@ public static class LocalPinotFSException extends IOException { super(e); } } + + /** + * Wraps an InputStream to expose at most {@code limit} bytes. Closing this stream propagates to the + * delegate (which releases the underlying file handle). + */ + private static class RangeInputStream extends FilterInputStream { + private long _remaining; + + RangeInputStream(InputStream in, long limit) { + super(in); + _remaining = limit; + } + + @Override + public int read() + throws IOException { + if (_remaining <= 0) { + return -1; + } + int b = in.read(); + if (b >= 0) { + _remaining--; + } + return b; + } + + @Override + public int read(byte[] b, int off, int len) + throws IOException { + if (_remaining <= 0) { + return -1; + } + int read = in.read(b, off, (int) Math.min(len, _remaining)); + if (read > 0) { + _remaining -= read; + } + return read; + } + + @Override + public int available() + throws IOException { + return (int) Math.min(in.available(), _remaining); + } + } } diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/filesystem/PinotFS.java b/pinot-spi/src/main/java/org/apache/pinot/spi/filesystem/PinotFS.java index 2d26e5d20357..f6ece14ffc02 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/filesystem/PinotFS.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/filesystem/PinotFS.java @@ -285,6 +285,32 @@ boolean touch(URI uri) InputStream open(URI uri) throws IOException; + /** + * Opens a byte range {@code [offset, offset + length)} of a file and returns an InputStream over just that range, + * without reading (or downloading) the whole file. This is intended for filesystems backed by remote object stores + * that support range requests (e.g. S3/GCS), enabling targeted reads of file regions (such as Parquet footers and + * column chunks) instead of fetching entire objects. + *

The default implementation throws {@link UnsupportedOperationException}; implementations that support ranged + * reads must override both this method and {@link #supportsRangedRead()}. Callers should gate usage on + * {@link #supportsRangedRead()} rather than catching the exception. + * @param uri location of the file to open + * @param offset starting byte offset (inclusive, {@code >= 0}) + * @param length number of bytes to read ({@code >= 0}); reads are truncated at end-of-file + * @return a new InputStream over the requested byte range; the caller is responsible for closing it + * @throws IOException on any IO error - missing file, not a file etc + */ + default InputStream openForRead(URI uri, long offset, long length) + throws IOException { + throw new UnsupportedOperationException(getClass().getSimpleName() + " does not support ranged reads"); + } + + /** + * @return true if this filesystem supports {@link #openForRead(URI, long, long)}. Default is false. + */ + default boolean supportsRangedRead() { + return false; + } + /** * For certain filesystems, we may need to close the filesystem and do relevant operations to prevent leaks. * By default, this method does nothing. diff --git a/pinot-spi/src/test/java/org/apache/pinot/spi/filesystem/LocalPinotFSTest.java b/pinot-spi/src/test/java/org/apache/pinot/spi/filesystem/LocalPinotFSTest.java index 7b3338bd1112..f0ca5e660eae 100644 --- a/pinot-spi/src/test/java/org/apache/pinot/spi/filesystem/LocalPinotFSTest.java +++ b/pinot-spi/src/test/java/org/apache/pinot/spi/filesystem/LocalPinotFSTest.java @@ -20,6 +20,7 @@ import java.io.File; import java.io.IOException; +import java.io.InputStream; import java.net.URI; import java.util.ArrayList; import java.util.Arrays; @@ -310,4 +311,45 @@ public void testListFilesWithMetadata() expectedRecursive.containsAll(fileMetadata.stream().map(FileMetadata::getFilePath).collect(Collectors.toSet())), fileMetadata.toString()); } + + @Test + public void testOpenForRead() + throws IOException { + LocalPinotFS localPinotFS = new LocalPinotFS(); + Assert.assertTrue(localPinotFS.supportsRangedRead()); + + File rangeFile = new File(_absoluteTmpDirPath, "rangeFile"); + byte[] data = new byte[256]; + for (int i = 0; i < data.length; i++) { + data[i] = (byte) i; + } + FileUtils.writeByteArrayToFile(rangeFile, data); + URI uri = rangeFile.toURI(); + + // Mid-file range [10, 30) + try (InputStream in = localPinotFS.openForRead(uri, 10, 20)) { + Assert.assertEquals(in.readAllBytes(), Arrays.copyOfRange(data, 10, 30)); + } + // From the start + try (InputStream in = localPinotFS.openForRead(uri, 0, 5)) { + Assert.assertEquals(in.readAllBytes(), Arrays.copyOfRange(data, 0, 5)); + } + // Length beyond EOF is truncated at end-of-file + try (InputStream in = localPinotFS.openForRead(uri, 250, 100)) { + Assert.assertEquals(in.readAllBytes(), Arrays.copyOfRange(data, 250, 256)); + } + // Zero length yields an empty stream + try (InputStream in = localPinotFS.openForRead(uri, 30, 0)) { + Assert.assertEquals(in.readAllBytes().length, 0); + } + // Whole file + try (InputStream in = localPinotFS.openForRead(uri, 0, 256)) { + Assert.assertEquals(in.readAllBytes(), data); + } + // Negative arguments are rejected + Assert.assertThrows(IllegalArgumentException.class, () -> localPinotFS.openForRead(uri, -1, 10)); + Assert.assertThrows(IllegalArgumentException.class, () -> localPinotFS.openForRead(uri, 0, -1)); + + Assert.assertTrue(rangeFile.delete()); + } }