Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.google.api.gax.rpc.FixedHeaderProvider;
import com.google.auth.Credentials;
import com.google.auth.oauth2.GoogleCredentials;
import com.google.cloud.ReadChannel;
import com.google.cloud.storage.Blob;
import com.google.cloud.storage.BlobId;
import com.google.cloud.storage.BlobInfo;
Expand Down Expand Up @@ -390,6 +391,31 @@ public InputStream open(URI uri)
}
}

@Override
public InputStream openForRead(URI uri, long offset, long length)
throws IOException {
if (offset < 0 || length < 0) {
throw new IllegalArgumentException(
"offset and length must be non-negative, got offset=" + offset + ", length=" + length);
}
try {
Blob blob = getBlob(new GcsUri(uri));
// ReadChannel.limit is the absolute, exclusive end position; seek positions the start. This issues a
// ranged GET so only [offset, offset + length) is transferred (truncated at end-of-file).
ReadChannel reader = blob.reader();
reader.seek(offset);
reader.limit(offset + length);
return Channels.newInputStream(reader);
} catch (StorageException e) {
throw new IOException(e);
}
}

@Override
public boolean supportsRangedRead() {
return true;
}

private Bucket getBucket(GcsUri gcsUri) {
return _storage.get(gcsUri.getBucketName());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.google.common.io.Closer;
import java.io.BufferedWriter;
import java.io.IOException;
import java.io.InputStream;
import java.io.UncheckedIOException;
import java.net.URI;
import java.nio.file.Files;
Expand Down Expand Up @@ -170,6 +171,36 @@ private Stream<GcsUri> listFilesToStream(GcsUri gcsUri)
return Arrays.asList(_pinotFS.listFiles(gcsUri.getUri(), true)).stream().map(URI::create).map(GcsUri::new);
}

@Test
public void testOpenForRead()
throws Exception {
skipIfNotConfigured();
assertTrue(_pinotFS.supportsRangedRead());

byte[] data = new byte[256];
for (int i = 0; i < data.length; i++) {
data[i] = (byte) i;
}
Path localFile = _localTmpDir.resolve("rangeFile");
Files.write(localFile, data);
GcsUri gcsUri = _dataDir.resolve("rangeFile");
_pinotFS.copyFromLocalFile(localFile.toFile(), gcsUri.getUri());
URI uri = gcsUri.getUri();

// Mid-file range [10, 30)
try (InputStream in = _pinotFS.openForRead(uri, 10, 20)) {
assertEquals(in.readAllBytes(), Arrays.copyOfRange(data, 10, 30));
}
// Length beyond EOF is truncated at end-of-file
try (InputStream in = _pinotFS.openForRead(uri, 250, 100)) {
assertEquals(in.readAllBytes(), Arrays.copyOfRange(data, 250, 256));
}
// Whole file
try (InputStream in = _pinotFS.openForRead(uri, 0, 256)) {
assertEquals(in.readAllBytes(), data);
}
}

@Test
public void testGcs()
throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,13 @@
import java.io.BufferedInputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FilterInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.RandomAccessFile;
import java.net.URI;
import java.net.URLDecoder;
import java.nio.channels.Channels;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
Expand Down Expand Up @@ -195,6 +198,30 @@ public InputStream open(URI uri)
return new BufferedInputStream(new FileInputStream(toFile(uri)));
}

@Override
public InputStream openForRead(URI uri, long offset, long length)
throws IOException {
if (offset < 0 || length < 0) {
throw new IllegalArgumentException(
"offset and length must be non-negative, got offset=" + offset + ", length=" + length);
}
RandomAccessFile randomAccessFile = new RandomAccessFile(toFile(uri), "r");
try {
randomAccessFile.seek(offset);
// Closing the stream returned by Channels.newInputStream closes the channel, which closes the
// RandomAccessFile, so the caller closing the returned stream releases the file handle.
return new RangeInputStream(Channels.newInputStream(randomAccessFile.getChannel()), length);
} catch (IOException | RuntimeException e) {
randomAccessFile.close();
throw e;
}
}

@Override
public boolean supportsRangedRead() {
return true;
}

private static File toFile(URI uri) {
// NOTE: Do not use new File(uri) because scheme might not exist and it does not decode '+' to ' '
// Do not use uri.getPath() because it does not decode '+' to ' '
Expand Down Expand Up @@ -248,4 +275,49 @@ public static class LocalPinotFSException extends IOException {
super(e);
}
}

/**
* Wraps an InputStream to expose at most {@code limit} bytes. Closing this stream propagates to the
* delegate (which releases the underlying file handle).
*/
private static class RangeInputStream extends FilterInputStream {
private long _remaining;

RangeInputStream(InputStream in, long limit) {
super(in);
_remaining = limit;
}

@Override
public int read()
throws IOException {
if (_remaining <= 0) {
return -1;
}
int b = in.read();
if (b >= 0) {
_remaining--;
}
return b;
}

@Override
public int read(byte[] b, int off, int len)
throws IOException {
if (_remaining <= 0) {
return -1;
}
int read = in.read(b, off, (int) Math.min(len, _remaining));
if (read > 0) {
_remaining -= read;
}
return read;
}

@Override
public int available()
throws IOException {
return (int) Math.min(in.available(), _remaining);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,32 @@ boolean touch(URI uri)
InputStream open(URI uri)
throws IOException;

/**
* Opens a byte range {@code [offset, offset + length)} of a file and returns an InputStream over just that range,
* without reading (or downloading) the whole file. This is intended for filesystems backed by remote object stores
* that support range requests (e.g. S3/GCS), enabling targeted reads of file regions (such as Parquet footers and
* column chunks) instead of fetching entire objects.
* <p>The default implementation throws {@link UnsupportedOperationException}; implementations that support ranged
* reads must override both this method and {@link #supportsRangedRead()}. Callers should gate usage on
* {@link #supportsRangedRead()} rather than catching the exception.
* @param uri location of the file to open
* @param offset starting byte offset (inclusive, {@code >= 0})
* @param length number of bytes to read ({@code >= 0}); reads are truncated at end-of-file
* @return a new InputStream over the requested byte range; the caller is responsible for closing it
* @throws IOException on any IO error - missing file, not a file etc
*/
default InputStream openForRead(URI uri, long offset, long length)
throws IOException {
throw new UnsupportedOperationException(getClass().getSimpleName() + " does not support ranged reads");
}

/**
* @return true if this filesystem supports {@link #openForRead(URI, long, long)}. Default is false.
*/
default boolean supportsRangedRead() {
return false;
}

/**
* For certain filesystems, we may need to close the filesystem and do relevant operations to prevent leaks.
* By default, this method does nothing.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
Expand Down Expand Up @@ -310,4 +311,45 @@ public void testListFilesWithMetadata()
expectedRecursive.containsAll(fileMetadata.stream().map(FileMetadata::getFilePath).collect(Collectors.toSet())),
fileMetadata.toString());
}

@Test
public void testOpenForRead()
throws IOException {
LocalPinotFS localPinotFS = new LocalPinotFS();
Assert.assertTrue(localPinotFS.supportsRangedRead());

File rangeFile = new File(_absoluteTmpDirPath, "rangeFile");
byte[] data = new byte[256];
for (int i = 0; i < data.length; i++) {
data[i] = (byte) i;
}
FileUtils.writeByteArrayToFile(rangeFile, data);
URI uri = rangeFile.toURI();

// Mid-file range [10, 30)
try (InputStream in = localPinotFS.openForRead(uri, 10, 20)) {
Assert.assertEquals(in.readAllBytes(), Arrays.copyOfRange(data, 10, 30));
}
// From the start
try (InputStream in = localPinotFS.openForRead(uri, 0, 5)) {
Assert.assertEquals(in.readAllBytes(), Arrays.copyOfRange(data, 0, 5));
}
// Length beyond EOF is truncated at end-of-file
try (InputStream in = localPinotFS.openForRead(uri, 250, 100)) {
Assert.assertEquals(in.readAllBytes(), Arrays.copyOfRange(data, 250, 256));
}
// Zero length yields an empty stream
try (InputStream in = localPinotFS.openForRead(uri, 30, 0)) {
Assert.assertEquals(in.readAllBytes().length, 0);
}
// Whole file
try (InputStream in = localPinotFS.openForRead(uri, 0, 256)) {
Assert.assertEquals(in.readAllBytes(), data);
}
// Negative arguments are rejected
Assert.assertThrows(IllegalArgumentException.class, () -> localPinotFS.openForRead(uri, -1, 10));
Assert.assertThrows(IllegalArgumentException.class, () -> localPinotFS.openForRead(uri, 0, -1));

Assert.assertTrue(rangeFile.delete());
}
}
Loading