diff --git a/storage/inkless/src/main/java/io/aiven/inkless/produce/CacheStoreJob.java b/storage/inkless/src/main/java/io/aiven/inkless/produce/CacheStoreJob.java
index 3f13754c000..7e29d63295f 100644
--- a/storage/inkless/src/main/java/io/aiven/inkless/produce/CacheStoreJob.java
+++ b/storage/inkless/src/main/java/io/aiven/inkless/produce/CacheStoreJob.java
@@ -21,7 +21,6 @@
import java.util.Collections;
import java.util.Set;
-import java.util.concurrent.Future;
import java.util.function.Consumer;
import io.aiven.inkless.TimeUtils;
@@ -32,32 +31,43 @@
import io.aiven.inkless.generated.CacheKey;
import io.aiven.inkless.generated.FileExtent;
-public class CacheStoreJob implements Runnable {
+/**
+ * Handles caching of uploaded file data to the object cache.
+ *
+ *
Implements {@link Consumer} for use with {@code CompletableFuture.thenAcceptAsync()}.
+ * When the upload completes successfully, the data is stored to cache.
+ *
+ *
Thread Safety: The {@link #accept} method is safe to call from any thread.
+ * The caller controls the execution context via {@code thenAcceptAsync(..., executor)}.
+ */
+class CacheStoreJob implements Consumer {
private final Time time;
private final ObjectCache cache;
private final KeyAlignmentStrategy keyAlignmentStrategy;
private final byte[] data;
- private final Future uploadFuture;
private final Consumer cacheStoreDurationCallback;
- public CacheStoreJob(Time time, ObjectCache cache, KeyAlignmentStrategy keyAlignmentStrategy, byte[] data, Future uploadFuture, Consumer cacheStoreDurationCallback) {
+ public CacheStoreJob(Time time,
+ ObjectCache cache,
+ KeyAlignmentStrategy keyAlignmentStrategy,
+ byte[] data,
+ Consumer cacheStoreDurationCallback) {
this.time = time;
this.cache = cache;
this.keyAlignmentStrategy = keyAlignmentStrategy;
this.data = data;
- this.uploadFuture = uploadFuture;
this.cacheStoreDurationCallback = cacheStoreDurationCallback;
}
+ /**
+ * Stores the uploaded file data to cache.
+ *
+ * This method is only called when upload succeeds (via thenAcceptAsync).
+ */
@Override
- public void run() {
- try {
- ObjectKey objectKey = uploadFuture.get();
- storeToCache(objectKey);
- } catch (final Throwable e) {
- // If the upload failed there's nothing to cache and we succeed vacuously.
- }
+ public void accept(ObjectKey objectKey) {
+ storeToCache(objectKey);
}
private void storeToCache(ObjectKey objectKey) {
diff --git a/storage/inkless/src/main/java/io/aiven/inkless/produce/FileCommitJob.java b/storage/inkless/src/main/java/io/aiven/inkless/produce/FileCommitJob.java
index a8aa1cb904c..48de7bafc24 100644
--- a/storage/inkless/src/main/java/io/aiven/inkless/produce/FileCommitJob.java
+++ b/storage/inkless/src/main/java/io/aiven/inkless/produce/FileCommitJob.java
@@ -23,10 +23,8 @@
import org.slf4j.LoggerFactory;
import java.util.List;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
import java.util.function.Consumer;
-import java.util.function.Supplier;
+import java.util.function.Function;
import io.aiven.inkless.TimeUtils;
import io.aiven.inkless.common.ObjectFormat;
@@ -40,24 +38,27 @@
/**
* The job of committing the already uploaded file to the control plane.
*
- *
If the file was uploaded successfully, commit to the control plane happens. Otherwise, it doesn't.
+ *
This class implements {@link Function} for use with {@code CompletableFuture.thenApplyAsync()}.
+ * When the upload completes successfully, {@link #apply} is invoked to perform the actual commit.
+ * This eliminates blocking wait on upload completion, allowing the commit executor to
+ * only do actual commit work instead of waiting for S3 latency.
+ *
+ *
Upload failures are handled separately in the future chain and don't reach this job.
*/
-class FileCommitJob implements Supplier> {
+class FileCommitJob implements Function> {
private static final Logger LOGGER = LoggerFactory.getLogger(FileCommitJob.class);
private final int brokerId;
private final ClosedFile file;
- private final Future uploadFuture;
private final Time time;
private final ControlPlane controlPlane;
private final ObjectDeleter objectDeleter;
- private final long commitSubmitTimeMs;
+ private volatile long commitSubmitTimeMs;
private final Consumer durationCallback;
private final Consumer commitWaitDurationCallback;
FileCommitJob(final int brokerId,
final ClosedFile file,
- final Future uploadFuture,
final Time time,
final ControlPlane controlPlane,
final ObjectDeleter objectDeleter,
@@ -65,7 +66,6 @@ class FileCommitJob implements Supplier> {
final Consumer commitWaitDurationCallback) {
this.brokerId = brokerId;
this.file = file;
- this.uploadFuture = uploadFuture;
this.controlPlane = controlPlane;
this.time = time;
this.objectDeleter = objectDeleter;
@@ -75,48 +75,44 @@ class FileCommitJob implements Supplier> {
this.commitWaitDurationCallback = commitWaitDurationCallback;
}
+ /**
+ * Resets the submit time to now. Call this when the commit is actually ready to be executed
+ * (e.g., after upload completes in async mode) to measure only the executor queue wait time,
+ * not the time waiting for previous commits in the chain.
+ */
+ void markReadyToCommit() {
+ this.commitSubmitTimeMs = time.milliseconds();
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ * Commits the uploaded file to the control plane.
+ * This method is only called when upload succeeds (via thenApplyAsync).
+ */
@Override
- public List get() {
- // The wait for upload is already measured, and should take the upload time or less if it was already completed.
- final UploadResult uploadResult = waitForUpload();
- // Measure the duration from the commit job submission to the moment we start committing.
- // and should account for the wait time to execute the commit job on a single-threaded executor.
+ public List apply(ObjectKey objectKey) {
+ // Measure the duration from markReadyToCommit() to the moment we start committing.
+ // markReadyToCommit() is called after upload completion and chain wait,
+ // so this measures only the executor queue wait time.
commitWaitDurationCallback.accept(time.milliseconds() - commitSubmitTimeMs);
- return TimeUtils.measureDurationMsSupplier(time, () -> doCommit(uploadResult), durationCallback);
- }
- private UploadResult waitForUpload() {
- try {
- final ObjectKey objectKey = uploadFuture.get();
- return new UploadResult(objectKey, null);
- } catch (final ExecutionException e) {
- LOGGER.error("Failed upload", e);
- return new UploadResult(null, e.getCause());
- } catch (final InterruptedException e) {
- // This is not expected as we try to shut down the executor gracefully.
- LOGGER.error("Interrupted", e);
- throw new RuntimeException(e);
- }
+ return TimeUtils.measureDurationMsSupplier(time, () -> doCommit(objectKey), durationCallback);
}
- private List doCommit(final UploadResult result) {
- if (result.objectKey != null) {
- LOGGER.debug("Uploaded {} successfully, committing", result.objectKey);
- try {
- final var commitBatchResponses = controlPlane.commitFile(result.objectKey.value(), ObjectFormat.WRITE_AHEAD_MULTI_SEGMENT, brokerId, file.size(), file.commitBatchRequests());
- LOGGER.debug("Committed successfully");
- return commitBatchResponses;
- } catch (final Exception e) {
- LOGGER.error("Commit failed", e);
- if (e instanceof ControlPlaneException) {
- // only attempt to remove the uploaded file if it is a control plane error
- tryDeleteFile(result.objectKey(), e);
- }
- throw e;
+ private List doCommit(final ObjectKey objectKey) {
+ LOGGER.debug("Uploaded {} successfully, committing", objectKey);
+ try {
+ final var commitBatchResponses = controlPlane.commitFile(objectKey.value(), ObjectFormat.WRITE_AHEAD_MULTI_SEGMENT, brokerId, file.size(), file.commitBatchRequests());
+ LOGGER.debug("Committed successfully");
+ return commitBatchResponses;
+ } catch (final Exception e) {
+ LOGGER.error("Commit failed", e);
+ if (e instanceof ControlPlaneException) {
+ // only attempt to remove the uploaded file if it is a control plane error
+ tryDeleteFile(objectKey, e);
}
- } else {
- // no need to log here, it was already logged in waitForUpload
- throw new FileUploadException(result.uploadError);
+ throw e;
}
}
@@ -141,6 +137,4 @@ private void tryDeleteFile(ObjectKey objectKey, Exception e) {
}
}
- private record UploadResult(ObjectKey objectKey, Throwable uploadError) {
- }
}
diff --git a/storage/inkless/src/main/java/io/aiven/inkless/produce/FileCommitter.java b/storage/inkless/src/main/java/io/aiven/inkless/produce/FileCommitter.java
index dff48286e97..0c8b6c4c56b 100644
--- a/storage/inkless/src/main/java/io/aiven/inkless/produce/FileCommitter.java
+++ b/storage/inkless/src/main/java/io/aiven/inkless/produce/FileCommitter.java
@@ -35,7 +35,6 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
@@ -99,6 +98,10 @@ class FileCommitter implements Closeable {
private final AtomicInteger totalFilesInProgress = new AtomicInteger(0);
private final AtomicInteger totalBytesInProgress = new AtomicInteger(0);
+ // Tracks the previous commit to ensure commits happen in submission order, not upload completion order.
+ // Each new commit waits for both its upload AND the previous commit to complete before starting.
+ private CompletableFuture> previousCommitFuture = CompletableFuture.completedFuture(null);
+
@DoNotMutate
FileCommitter(final int brokerId,
final ControlPlane controlPlane,
@@ -175,83 +178,144 @@ void commit(final ClosedFile file) throws InterruptedException {
lock.lock();
try {
final Instant uploadAndCommitStart = TimeUtils.durationMeasurementNow(time);
- CompletableFuture> commitFuture;
+ final CompletableFuture> commitFuture;
+
if (file.isEmpty()) {
- // If the file is empty, skip uploading and committing, but proceed with the later steps.
commitFuture = CompletableFuture.completedFuture(Collections.emptyList());
} else {
- metrics.fileAdded(file.size());
- metrics.batchesAdded(file.commitBatchRequests().size());
- totalFilesInProgress.addAndGet(1);
- totalBytesInProgress.addAndGet(file.size());
-
- // Start uploading and add to the commit queue (as Runnable).
- // This ensures files are uploaded in concurrently, but committed to the control plane sequentially,
- // because `executorServiceCommit` is single-threaded.
- final FileUploadJob uploadJob = FileUploadJob.createFromByteArray(
- objectKeyCreator,
- storage,
- time,
- maxFileUploadAttempts,
- fileUploadRetryBackoff,
- file.data(),
- metrics::fileUploadFinished
- );
- final Future uploadFuture = executorServiceUpload.submit(uploadJob);
-
- final FileCommitJob commitJob = new FileCommitJob(
- brokerId,
- file,
- uploadFuture,
- time,
- controlPlane,
- storage,
- metrics::fileCommitFinished,
- metrics::fileCommitWaitFinished
- );
- commitFuture = CompletableFuture.supplyAsync(commitJob, executorServiceCommit)
- .whenComplete((result, error) -> {
- totalFilesInProgress.addAndGet(-1);
- totalBytesInProgress.addAndGet(-file.size());
- if (error != null) {
- // at this point the commit has failed and need to check whether it failed on upload or commit
- LOGGER.error("Failed to commit diskless file {}", file, error);
- if (error.getCause() instanceof FileUploadException) {
- metrics.fileUploadFailed();
- } else {
- metrics.fileCommitFailed();
- }
- } else {
- // only mark as finished if everything succeeded
- metrics.fileFinished(file.start(), uploadAndCommitStart);
- }
- });
-
- final CacheStoreJob cacheStoreJob = new CacheStoreJob(
- time,
- objectCache,
- keyAlignmentStrategy,
- file.data(),
- uploadFuture,
- metrics::cacheStoreFinished
- );
- executorServiceCacheStore.submit(cacheStoreJob);
+ commitFuture = commitNonEmptyFile(file, uploadAndCommitStart);
}
- commitFuture.whenComplete((commitBatchResponses, throwable) -> {
- final AppendCompleter completerJob = new AppendCompleter(file, batchCoordinateCache);
- if (commitBatchResponses != null) {
- completerJob.finishCommitSuccessfully(commitBatchResponses);
- metrics.writeCompleted();
- } else {
- completerJob.finishCommitWithError();
- metrics.writeFailed();
- }
- });
+
+ attachCompletionHandler(file, commitFuture);
} finally {
lock.unlock();
}
}
+ private CompletableFuture> commitNonEmptyFile(
+ final ClosedFile file,
+ final Instant uploadAndCommitStart) {
+
+ updateMetricsOnStart(file);
+
+ final CompletableFuture uploadFuture = startUpload(file);
+ final FileCommitJob commitJob = createCommitJob(file);
+ final CacheStoreJob cacheStoreJob = createCacheStoreJob(file);
+
+ return commitWithPipeline(file, uploadFuture, commitJob, cacheStoreJob, uploadAndCommitStart);
+ }
+
+ private void updateMetricsOnStart(final ClosedFile file) {
+ metrics.fileAdded(file.size());
+ metrics.batchesAdded(file.commitBatchRequests().size());
+ totalFilesInProgress.addAndGet(1);
+ totalBytesInProgress.addAndGet(file.size());
+ }
+
+ private CompletableFuture startUpload(final ClosedFile file) {
+ final FileUploadJob uploadJob = FileUploadJob.createFromByteArray(
+ objectKeyCreator, storage, time,
+ maxFileUploadAttempts, fileUploadRetryBackoff,
+ file.data(), metrics::fileUploadFinished
+ );
+ return CompletableFuture.supplyAsync(
+ () -> {
+ try {
+ return uploadJob.call();
+ } catch (final Exception e) {
+ throw new FileUploadException(e);
+ }
+ },
+ executorServiceUpload
+ );
+ }
+
+ private FileCommitJob createCommitJob(final ClosedFile file) {
+ return new FileCommitJob(
+ brokerId, file, time, controlPlane, storage,
+ metrics::fileCommitFinished, metrics::fileCommitWaitFinished
+ );
+ }
+
+ private CacheStoreJob createCacheStoreJob(final ClosedFile file) {
+ return new CacheStoreJob(
+ time, objectCache, keyAlignmentStrategy,
+ file.data(), metrics::cacheStoreFinished
+ );
+ }
+
+ /**
+ * Pipelined commit: callbacks triggered when upload completes, no thread blocking.
+ * Commits are chained to ensure submission order is preserved.
+ * Cache store is fire-and-forget (runs only on success).
+ */
+ private CompletableFuture> commitWithPipeline(
+ final ClosedFile file,
+ final CompletableFuture uploadFuture,
+ final FileCommitJob commitJob,
+ final CacheStoreJob cacheStoreJob,
+ final Instant uploadAndCommitStart) {
+
+ // Wait for previous commit to complete (success OR failure) before starting this one.
+ // Using handle() ensures we don't propagate previous failures - each commit is independent.
+ // We only use previousCommitFuture for ordering, not for error propagation.
+ final CompletableFuture> prevCommitBarrier = previousCommitFuture.handle((result, error) -> null);
+
+ // Chain commit to run after upload completes AND previous commit completes
+ final CompletableFuture> commitFuture = uploadFuture
+ .thenCombine(prevCommitBarrier, (objectKey, ignored) -> objectKey)
+ .thenApplyAsync(objectKey -> {
+ // Reset the submit time now that we're ready to commit.
+ // This ensures FileCommitWaitTime measures only the executor queue wait,
+ // not the time waiting for upload completion or previous commits in the chain.
+ commitJob.markReadyToCommit();
+ return commitJob.apply(objectKey);
+ }, executorServiceCommit)
+ .whenComplete((result, error) -> handleCommitResult(file, uploadAndCommitStart, error));
+
+ // Cache store is fire-and-forget, runs only on successful upload
+ uploadFuture.thenAcceptAsync(cacheStoreJob, executorServiceCacheStore);
+
+ // Update chain for next commit
+ previousCommitFuture = commitFuture;
+
+ return commitFuture;
+ }
+
+ private void handleCommitResult(final ClosedFile file, final Instant uploadAndCommitStart, final Throwable error) {
+ totalFilesInProgress.addAndGet(-1);
+ totalBytesInProgress.addAndGet(-file.size());
+ if (error != null) {
+ LOGGER.error("Failed to commit diskless file {}", file, error);
+ // Unwrap CompletionException if present to check for the actual cause
+ final Throwable cause = (error instanceof java.util.concurrent.CompletionException && error.getCause() != null)
+ ? error.getCause()
+ : error;
+ if (cause instanceof FileUploadException) {
+ metrics.fileUploadFailed();
+ } else {
+ metrics.fileCommitFailed();
+ }
+ } else {
+ metrics.fileFinished(file.start(), uploadAndCommitStart);
+ }
+ }
+
+ private void attachCompletionHandler(
+ final ClosedFile file,
+ final CompletableFuture> commitFuture) {
+ commitFuture.whenComplete((commitBatchResponses, throwable) -> {
+ final AppendCompleter completerJob = new AppendCompleter(file, batchCoordinateCache);
+ if (commitBatchResponses != null) {
+ completerJob.finishCommitSuccessfully(commitBatchResponses);
+ metrics.writeCompleted();
+ } else {
+ completerJob.finishCommitWithError();
+ metrics.writeFailed();
+ }
+ });
+ }
+
int totalFilesInProgress() {
return totalFilesInProgress.get();
}
@@ -260,8 +324,30 @@ int totalBytesInProgress() {
return totalBytesInProgress.get();
}
+ private static final long SHUTDOWN_TIMEOUT_SECONDS = 30;
+
@Override
public void close() throws IOException {
+ // First, await any pending async work to ensure callbacks are scheduled before shutdown.
+ // This prevents RejectedExecutionException for in-flight uploads.
+ final CompletableFuture> pendingWork;
+ lock.lock();
+ try {
+ pendingWork = previousCommitFuture;
+ } finally {
+ lock.unlock();
+ }
+
+ try {
+ // Wait for pending work with a timeout to avoid indefinite blocking
+ pendingWork
+ .orTimeout(SHUTDOWN_TIMEOUT_SECONDS, TimeUnit.SECONDS)
+ .handle((result, error) -> null) // Ignore errors, we just want to wait
+ .join();
+ } catch (final Exception e) {
+ LOGGER.warn("Timeout waiting for pending commits during shutdown", e);
+ }
+
// Reject new upload work immediately.
executorServiceUpload.shutdown();
// Cache is best-effort; cancel immediately rather than waiting.
diff --git a/storage/inkless/src/test/java/io/aiven/inkless/produce/CacheStoreJobTest.java b/storage/inkless/src/test/java/io/aiven/inkless/produce/CacheStoreJobTest.java
new file mode 100644
index 00000000000..7317b6be5bb
--- /dev/null
+++ b/storage/inkless/src/test/java/io/aiven/inkless/produce/CacheStoreJobTest.java
@@ -0,0 +1,170 @@
+/*
+ * Inkless
+ * Copyright (C) 2024 - 2025 Aiven OY
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see .
+ */
+package io.aiven.inkless.produce;
+
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.Time;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Captor;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+import org.mockito.junit.jupiter.MockitoSettings;
+import org.mockito.quality.Strictness;
+
+import java.util.function.Consumer;
+
+import io.aiven.inkless.cache.FixedBlockAlignment;
+import io.aiven.inkless.cache.KeyAlignmentStrategy;
+import io.aiven.inkless.cache.ObjectCache;
+import io.aiven.inkless.common.ObjectKey;
+import io.aiven.inkless.common.PlainObjectKey;
+import io.aiven.inkless.generated.CacheKey;
+import io.aiven.inkless.generated.FileExtent;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+@ExtendWith(MockitoExtension.class)
+@MockitoSettings(strictness = Strictness.STRICT_STUBS)
+class CacheStoreJobTest {
+
+ static final ObjectKey OBJECT_KEY = PlainObjectKey.create("prefix", "test-object");
+ static final byte[] DATA = new byte[] {1, 2, 3, 4, 5, 6, 7, 8, 9, 10};
+
+ @Mock
+ ObjectCache objectCache;
+ @Mock
+ Consumer cacheStoreDurationCallback;
+ @Captor
+ ArgumentCaptor cacheKeyCaptor;
+ @Captor
+ ArgumentCaptor fileExtentCaptor;
+
+ Time time;
+ KeyAlignmentStrategy keyAlignmentStrategy;
+
+ @BeforeEach
+ void setUp() {
+ time = new MockTime();
+ // Use a block size larger than data to get a single cache key
+ keyAlignmentStrategy = new FixedBlockAlignment(Integer.MAX_VALUE);
+ }
+
+ @Test
+ void acceptStoresDataToCache() {
+ final CacheStoreJob job = new CacheStoreJob(
+ time,
+ objectCache,
+ keyAlignmentStrategy,
+ DATA,
+ cacheStoreDurationCallback
+ );
+
+ // Simulate successful upload completion (thenAcceptAsync only calls on success)
+ job.accept(OBJECT_KEY);
+
+ // Verify cache.put was called
+ verify(objectCache).put(cacheKeyCaptor.capture(), fileExtentCaptor.capture());
+
+ // Verify the cache key
+ final CacheKey capturedKey = cacheKeyCaptor.getValue();
+ assertThat(capturedKey.object()).isEqualTo(OBJECT_KEY.value());
+ assertThat(capturedKey.range().offset()).isZero();
+ assertThat(capturedKey.range().length()).isEqualTo(Integer.MAX_VALUE);
+
+ // Verify the file extent contains the data
+ final FileExtent capturedExtent = fileExtentCaptor.getValue();
+ assertThat(capturedExtent.object()).isEqualTo(OBJECT_KEY.value());
+ assertThat(capturedExtent.data()).isEqualTo(DATA);
+
+ // Verify duration callback was invoked
+ verify(cacheStoreDurationCallback).accept(any());
+ }
+
+ @Test
+ void acceptWithMultipleBlocksStoresAllBlocks() {
+ // Use a smaller block size to create multiple cache entries
+ final int blockSize = 4;
+ keyAlignmentStrategy = new FixedBlockAlignment(blockSize);
+
+ final CacheStoreJob job = new CacheStoreJob(
+ time,
+ objectCache,
+ keyAlignmentStrategy,
+ DATA, // 10 bytes = 3 blocks (0-3, 4-7, 8-11)
+ cacheStoreDurationCallback
+ );
+
+ // Simulate successful upload completion
+ job.accept(OBJECT_KEY);
+
+ // Verify cache.put was called 3 times (for each block)
+ verify(objectCache, times(3)).put(cacheKeyCaptor.capture(), fileExtentCaptor.capture());
+
+ // Verify duration callback was invoked 3 times
+ verify(cacheStoreDurationCallback, times(3)).accept(any());
+ }
+
+ @Test
+ void acceptStoresCorrectDataRange() {
+ final CacheStoreJob job = new CacheStoreJob(
+ time,
+ objectCache,
+ keyAlignmentStrategy,
+ DATA,
+ cacheStoreDurationCallback
+ );
+
+ job.accept(OBJECT_KEY);
+
+ verify(objectCache).put(cacheKeyCaptor.capture(), fileExtentCaptor.capture());
+
+ final FileExtent extent = fileExtentCaptor.getValue();
+ assertThat(extent.range().offset()).isZero();
+ assertThat(extent.range().length()).isEqualTo(DATA.length);
+ assertThat(extent.data()).hasSize(DATA.length);
+ }
+
+ @Test
+ void acceptWithEmptyDataStoresEmptyExtent() {
+ final byte[] emptyData = new byte[0];
+
+ final CacheStoreJob job = new CacheStoreJob(
+ time,
+ objectCache,
+ keyAlignmentStrategy,
+ emptyData,
+ cacheStoreDurationCallback
+ );
+
+ job.accept(OBJECT_KEY);
+
+ // With empty data and MAX_VALUE block size, alignment still produces one block
+ // but the extent will have empty data
+ verify(objectCache).put(cacheKeyCaptor.capture(), fileExtentCaptor.capture());
+
+ final FileExtent extent = fileExtentCaptor.getValue();
+ assertThat(extent.data()).isEmpty();
+ }
+}
diff --git a/storage/inkless/src/test/java/io/aiven/inkless/produce/FileCommitJobTest.java b/storage/inkless/src/test/java/io/aiven/inkless/produce/FileCommitJobTest.java
index 9e0104a3112..347db1969ef 100644
--- a/storage/inkless/src/test/java/io/aiven/inkless/produce/FileCommitJobTest.java
+++ b/storage/inkless/src/test/java/io/aiven/inkless/produce/FileCommitJobTest.java
@@ -51,7 +51,6 @@
import io.aiven.inkless.control_plane.ControlPlaneException;
import io.aiven.inkless.control_plane.InMemoryControlPlane;
import io.aiven.inkless.storage_backend.common.ObjectDeleter;
-import io.aiven.inkless.storage_backend.common.StorageBackendException;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.never;
@@ -127,10 +126,10 @@ void commitFinishedSuccessfully() {
when(time.nanoseconds()).thenReturn(10_000_000L, 20_000_000L);
final ClosedFile file = new ClosedFile(Instant.EPOCH, REQUESTS, awaitingFuturesByRequest, COMMIT_BATCH_REQUESTS, Map.of(), DATA);
- final CompletableFuture uploadFuture = CompletableFuture.completedFuture(OBJECT_KEY);
- final FileCommitJob job = new FileCommitJob(BROKER_ID, file, uploadFuture, time, controlPlane, objectDeleter, commitTimeDurationCallback, commitWaitTimeDurationCallback);
+ final FileCommitJob job = new FileCommitJob(BROKER_ID, file, time, controlPlane, objectDeleter, commitTimeDurationCallback, commitWaitTimeDurationCallback);
- job.get();
+ // Simulate successful upload completion via callback
+ job.apply(OBJECT_KEY);
verify(commitWaitTimeDurationCallback).accept(eq(200L));
verify(commitTimeDurationCallback).accept(eq(10L));
@@ -153,72 +152,85 @@ void commitFinishedSuccessfullyZeroBatches() {
when(time.nanoseconds()).thenReturn(10_000_000L, 20_000_000L);
final ClosedFile file = new ClosedFile(Instant.EPOCH, REQUESTS, awaitingFuturesByRequest, COMMIT_BATCH_REQUESTS, Map.of(), DATA);
- final CompletableFuture uploadFuture = CompletableFuture.completedFuture(OBJECT_KEY);
- final FileCommitJob job = new FileCommitJob(BROKER_ID, file, uploadFuture, time, controlPlane, objectDeleter, commitTimeDurationCallback, commitWaitTimeDurationCallback);
+ final FileCommitJob job = new FileCommitJob(BROKER_ID, file, time, controlPlane, objectDeleter, commitTimeDurationCallback, commitWaitTimeDurationCallback);
- job.get();
+ // Simulate successful upload completion via callback
+ job.apply(OBJECT_KEY);
verify(commitWaitTimeDurationCallback).accept(eq(200L));
verify(commitTimeDurationCallback).accept(eq(10L));
}
- @Test
- void commitFinishedWithError() {
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ void deleteObjectWhenFailureOnCommitIsFromControlPlane(boolean isSafeToDelete) throws Exception {
final Map>> awaitingFuturesByRequest = Map.of(
0, new CompletableFuture<>(),
1, new CompletableFuture<>()
);
- when(time.milliseconds()).thenReturn(10_000L, 10_200L);
- when(time.nanoseconds()).thenReturn(10_000_000L, 20_000_000L);
+ when(controlPlane.commitFile(eq(OBJECT_KEY_MAIN_PART), eq(ObjectFormat.WRITE_AHEAD_MULTI_SEGMENT), eq(BROKER_ID), eq(FILE_SIZE), eq(COMMIT_BATCH_REQUESTS)))
+ .thenThrow(new ControlPlaneException("test"));
+ when(controlPlane.isSafeToDeleteFile(eq(OBJECT_KEY_MAIN_PART))).thenReturn(isSafeToDelete);
final ClosedFile file = new ClosedFile(Instant.EPOCH, REQUESTS, awaitingFuturesByRequest, COMMIT_BATCH_REQUESTS, Map.of(), DATA);
- final CompletableFuture uploadFuture = CompletableFuture.failedFuture(new StorageBackendException("test"));
- final FileCommitJob job = new FileCommitJob(BROKER_ID, file, uploadFuture, time, controlPlane, objectDeleter, commitTimeDurationCallback, commitWaitTimeDurationCallback);
+ final FileCommitJob job = new FileCommitJob(BROKER_ID, file, time, controlPlane, objectDeleter, commitTimeDurationCallback, commitWaitTimeDurationCallback);
- Assert.assertThrows(RuntimeException.class, job::get);
+ // Simulate successful upload but control plane commit failure
+ Assert.assertThrows(ControlPlaneException.class, () -> job.apply(OBJECT_KEY));
- verify(commitWaitTimeDurationCallback).accept(eq(200L));
- verify(commitTimeDurationCallback).accept(eq(10L));
+ verify(objectDeleter, times(isSafeToDelete ? 1 : 0)).delete(eq(OBJECT_KEY));
}
- @ParameterizedTest
- @ValueSource(booleans = {true, false})
- void deleteObjectWhenFailureOnCommitIsFromControlPlane(boolean isSafeToDelete) throws Exception {
+ @Test
+ void doNotDeleteObjectWhenFailureOnCommitIsNotFromControlPlane() throws Exception {
final Map>> awaitingFuturesByRequest = Map.of(
0, new CompletableFuture<>(),
1, new CompletableFuture<>()
);
when(controlPlane.commitFile(eq(OBJECT_KEY_MAIN_PART), eq(ObjectFormat.WRITE_AHEAD_MULTI_SEGMENT), eq(BROKER_ID), eq(FILE_SIZE), eq(COMMIT_BATCH_REQUESTS)))
- .thenThrow(new ControlPlaneException("test"));
- when(controlPlane.isSafeToDeleteFile(eq(OBJECT_KEY_MAIN_PART))).thenReturn(isSafeToDelete);
+ .thenThrow(new RuntimeException("test"));
final ClosedFile file = new ClosedFile(Instant.EPOCH, REQUESTS, awaitingFuturesByRequest, COMMIT_BATCH_REQUESTS, Map.of(), DATA);
- final CompletableFuture uploadFuture = CompletableFuture.completedFuture(OBJECT_KEY);
- final FileCommitJob job = new FileCommitJob(BROKER_ID, file, uploadFuture, time, controlPlane, objectDeleter, commitTimeDurationCallback, commitWaitTimeDurationCallback);
+ final FileCommitJob job = new FileCommitJob(BROKER_ID, file, time, controlPlane, objectDeleter, commitTimeDurationCallback, commitWaitTimeDurationCallback);
- Assert.assertThrows(RuntimeException.class, job::get);
+ // Simulate successful upload but commit failure (non-control-plane error)
+ Assert.assertThrows(RuntimeException.class, () -> job.apply(OBJECT_KEY));
- verify(objectDeleter, times(isSafeToDelete ? 1 : 0)).delete(eq(OBJECT_KEY));
+ verify(objectDeleter, never()).delete(eq(OBJECT_KEY));
}
@Test
- void doNotDeleteObjectWhenFailureOnCommitIsNotFromControlPlane() throws Exception {
+ void markReadyToCommitResetsWaitTime() {
final Map>> awaitingFuturesByRequest = Map.of(
0, new CompletableFuture<>(),
1, new CompletableFuture<>()
);
+ final List commitBatchResponses = List.of(
+ CommitBatchResponse.success(0, 10, 0, "objectKey", COMMIT_BATCH_REQUESTS.get(0)),
+ CommitBatchResponse.of(Errors.INVALID_TOPIC_EXCEPTION, -1, -1, -1),
+ CommitBatchResponse.success(20, 10, 0, "objectKey", COMMIT_BATCH_REQUESTS.get(2)),
+ CommitBatchResponse.success(30, 10, 0, "objectKey", COMMIT_BATCH_REQUESTS.get(3))
+ );
+
when(controlPlane.commitFile(eq(OBJECT_KEY_MAIN_PART), eq(ObjectFormat.WRITE_AHEAD_MULTI_SEGMENT), eq(BROKER_ID), eq(FILE_SIZE), eq(COMMIT_BATCH_REQUESTS)))
- .thenThrow(new RuntimeException("test"));
+ .thenReturn(commitBatchResponses);
+ // Job created at T=10000, markReadyToCommit at T=15000, onUploadComplete at T=15050
+ when(time.milliseconds()).thenReturn(10_000L, 15_000L, 15_050L);
+ when(time.nanoseconds()).thenReturn(10_000_000L, 20_000_000L);
final ClosedFile file = new ClosedFile(Instant.EPOCH, REQUESTS, awaitingFuturesByRequest, COMMIT_BATCH_REQUESTS, Map.of(), DATA);
- final CompletableFuture uploadFuture = CompletableFuture.completedFuture(OBJECT_KEY);
- final FileCommitJob job = new FileCommitJob(BROKER_ID, file, uploadFuture, time, controlPlane, objectDeleter, commitTimeDurationCallback, commitWaitTimeDurationCallback);
+ final FileCommitJob job = new FileCommitJob(BROKER_ID, file, time, controlPlane, objectDeleter, commitTimeDurationCallback, commitWaitTimeDurationCallback);
- Assert.assertThrows(RuntimeException.class, job::get);
+ // Simulate async pipeline: reset submit time when actually ready to commit
+ job.markReadyToCommit();
- verify(objectDeleter, never()).delete(eq(OBJECT_KEY));
+ // Simulate successful upload completion via callback
+ job.apply(OBJECT_KEY);
+
+ // Wait time should be 50ms (from markReadyToCommit to apply), not 5050ms (from constructor)
+ verify(commitWaitTimeDurationCallback).accept(eq(50L));
}
}
diff --git a/storage/inkless/src/test/java/io/aiven/inkless/produce/FileCommitterTest.java b/storage/inkless/src/test/java/io/aiven/inkless/produce/FileCommitterTest.java
index 567e9968acc..bd3612aa26c 100644
--- a/storage/inkless/src/test/java/io/aiven/inkless/produce/FileCommitterTest.java
+++ b/storage/inkless/src/test/java/io/aiven/inkless/produce/FileCommitterTest.java
@@ -23,10 +23,10 @@
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.utils.Time;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
-import org.mockito.ArgumentCaptor;
-import org.mockito.Captor;
import org.mockito.InOrder;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
@@ -37,11 +37,13 @@
import java.io.InputStream;
import java.time.Duration;
import java.time.Instant;
+import java.util.ArrayList;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import io.aiven.inkless.cache.BatchCoordinateCache;
@@ -56,16 +58,22 @@
import io.aiven.inkless.control_plane.CommitBatchRequest;
import io.aiven.inkless.control_plane.ControlPlane;
import io.aiven.inkless.control_plane.ControlPlaneException;
+import io.aiven.inkless.generated.CacheKey;
+import io.aiven.inkless.generated.FileExtent;
import io.aiven.inkless.storage_backend.common.StorageBackend;
import io.aiven.inkless.storage_backend.common.StorageBackendException;
+import static java.util.concurrent.TimeUnit.SECONDS;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.awaitility.Awaitility.await;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
@@ -108,30 +116,38 @@ public ObjectKey create(String value) {
@Mock
Time time;
@Mock
- ExecutorService executorServiceUpload;
- @Mock
- ExecutorService executorServiceCommit;
- @Mock
- ExecutorService executorServiceCacheStore;
- @Mock
FileCommitterMetrics metrics;
- @Captor
- ArgumentCaptor> uploadCallableCaptor;
- @Captor
- ArgumentCaptor commitRunnableCaptor;
+ // Use real executors that run tasks immediately for testing async flow
+ private ExecutorService executorServiceUpload;
+ private ExecutorService executorServiceCommit;
+ private ExecutorService executorServiceCacheStore;
+
+ @BeforeEach
+ void setUp() {
+ // Single-threaded real executors for deterministic ordering in tests.
+ // Tasks still execute asynchronously on background threads, but one at a time per executor.
+ executorServiceUpload = Executors.newSingleThreadExecutor();
+ executorServiceCommit = Executors.newSingleThreadExecutor();
+ executorServiceCacheStore = Executors.newSingleThreadExecutor();
+ }
+
+ @AfterEach
+ void tearDown() {
+ executorServiceUpload.shutdownNow();
+ executorServiceCommit.shutdownNow();
+ executorServiceCacheStore.shutdownNow();
+ }
@Test
- @SuppressWarnings("unchecked")
void success() throws Exception {
doNothing()
.when(storage).upload(eq(OBJECT_KEY), any(InputStream.class), eq((long) FILE.data().length));
+ when(controlPlane.commitFile(any(), any(), anyInt(), anyLong(), any()))
+ .thenReturn(List.of());
when(time.nanoseconds()).thenReturn(10_000_000L);
-
- final CompletableFuture uploadFuture = CompletableFuture.completedFuture(OBJECT_KEY);
- when(executorServiceUpload.submit(any(Callable.class)))
- .thenReturn(uploadFuture);
+ when(time.milliseconds()).thenReturn(10L);
final FileCommitter committer = new FileCommitter(
BROKER_ID, controlPlane, OBJECT_KEY_CREATOR, storage,
@@ -148,39 +164,24 @@ void success() throws Exception {
committer.commit(FILE);
- assertThat(committer.totalFilesInProgress()).isOne();
- assertThat(committer.totalBytesInProgress()).isEqualTo(FILE.data().length);
-
- verify(executorServiceUpload).submit(uploadCallableCaptor.capture());
- final Callable uploadCallable = uploadCallableCaptor.getValue();
-
- uploadCallable.call();
-
- verify(executorServiceCommit).execute(commitRunnableCaptor.capture());
- final Runnable commitRunnable = commitRunnableCaptor.getValue();
-
- commitRunnable.run();
-
- assertThat(committer.totalFilesInProgress()).isZero();
- assertThat(committer.totalBytesInProgress()).isZero();
-
- verify(metrics).fileAdded(eq(FILE.size()));
- verify(metrics).fileUploadFinished(eq(0L));
- verify(metrics).fileCommitFinished(eq(0L));
- verify(metrics).fileFinished(eq(Instant.EPOCH), eq(Instant.ofEpochMilli(10L)));
+ // Wait for async operations to complete - writeCompleted is the final callback
+ await().atMost(5, SECONDS).untilAsserted(() -> {
+ verify(metrics).fileAdded(eq(FILE.size()));
+ verify(metrics).fileUploadFinished(anyLong());
+ verify(metrics).fileCommitFinished(anyLong());
+ verify(metrics).fileCommitWaitFinished(anyLong());
+ verify(metrics).fileFinished(eq(Instant.EPOCH), any());
+ verify(metrics).writeCompleted();
+ });
}
@Test
- @SuppressWarnings("unchecked")
void commitFailed() throws Exception {
doNothing()
.when(storage).upload(eq(OBJECT_KEY), any(InputStream.class), eq((long) FILE.data().length));
when(time.nanoseconds()).thenReturn(10_000_000L);
-
- final CompletableFuture uploadFuture = CompletableFuture.completedFuture(OBJECT_KEY);
- when(executorServiceUpload.submit(any(Callable.class)))
- .thenReturn(uploadFuture);
+ when(time.milliseconds()).thenReturn(10L);
when(controlPlane.commitFile(any(), any(), anyInt(), anyLong(), any()))
.thenThrow(new ControlPlaneException("error"));
@@ -197,46 +198,29 @@ void commitFailed() throws Exception {
committer.commit(FILE);
- assertThat(committer.totalFilesInProgress()).isOne();
- assertThat(committer.totalBytesInProgress()).isEqualTo(FILE.data().length);
-
- verify(executorServiceUpload).submit(uploadCallableCaptor.capture());
- final Callable uploadCallable = uploadCallableCaptor.getValue();
-
- uploadCallable.call();
-
- verify(executorServiceCommit).execute(commitRunnableCaptor.capture());
- final Runnable commitRunnable = commitRunnableCaptor.getValue();
-
- commitRunnable.run();
-
- assertThat(committer.totalFilesInProgress()).isZero();
- assertThat(committer.totalBytesInProgress()).isZero();
-
- verify(metrics).fileAdded(eq(FILE.size()));
- verify(metrics).fileUploadFinished(eq(0L));
- verify(metrics).fileCommitFinished(eq(0L));
- verify(metrics, times(0)).fileFinished(any(), any());
- verify(metrics).fileCommitFailed();
- verify(metrics).writeFailed();
+ // Wait for async operations to complete - writeFailed is the final callback
+ await().atMost(5, SECONDS).untilAsserted(() -> {
+ verify(metrics).fileAdded(eq(FILE.size()));
+ verify(metrics).fileUploadFinished(anyLong());
+ verify(metrics).fileCommitFinished(anyLong());
+ verify(metrics, times(0)).fileFinished(any(), any());
+ verify(metrics).fileCommitFailed();
+ verify(metrics).writeFailed();
+ });
}
@Test
- @SuppressWarnings("unchecked")
void uploadFailed() throws Exception {
- doNothing()
+ doThrow(new StorageBackendException("test"))
.when(storage).upload(eq(OBJECT_KEY), any(InputStream.class), eq((long) FILE.data().length));
when(time.nanoseconds()).thenReturn(10_000_000L);
-
- final CompletableFuture uploadFuture = CompletableFuture.failedFuture(new StorageBackendException("test"));
- when(executorServiceUpload.submit(any(Callable.class)))
- .thenReturn(uploadFuture);
+ when(time.milliseconds()).thenReturn(10L);
final FileCommitter committer = new FileCommitter(
BROKER_ID, controlPlane, OBJECT_KEY_CREATOR, storage,
KEY_ALIGNMENT_STRATEGY, OBJECT_CACHE, BATCH_COORDINATE_CACHE, time,
- 3, Duration.ofMillis(100),
+ 1, Duration.ofMillis(100),
executorServiceUpload, executorServiceCommit, executorServiceCacheStore,
metrics);
@@ -245,59 +229,57 @@ void uploadFailed() throws Exception {
committer.commit(FILE);
- assertThat(committer.totalFilesInProgress()).isOne();
- assertThat(committer.totalBytesInProgress()).isEqualTo(FILE.data().length);
-
- verify(executorServiceUpload).submit(uploadCallableCaptor.capture());
- final Callable uploadCallable = uploadCallableCaptor.getValue();
-
- uploadCallable.call();
-
- verify(executorServiceCommit).execute(commitRunnableCaptor.capture());
- final Runnable commitRunnable = commitRunnableCaptor.getValue();
-
- commitRunnable.run();
-
- assertThat(committer.totalFilesInProgress()).isZero();
- assertThat(committer.totalBytesInProgress()).isZero();
-
- verify(metrics).fileAdded(eq(FILE.size()));
- verify(metrics).fileUploadFinished(eq(0L));
- verify(metrics).fileCommitFinished(eq(0L));
- verify(metrics, times(0)).fileFinished(any(), any());
- verify(metrics).fileUploadFailed();
- verify(metrics).writeFailed();
+ // Wait for async operations to complete - writeFailed is the final callback
+ await().atMost(5, SECONDS).untilAsserted(() -> {
+ verify(metrics).fileAdded(eq(FILE.size()));
+ verify(metrics).fileUploadFinished(anyLong());
+ // fileCommitFinished is NOT called when upload fails because thenApplyAsync
+ // doesn't run the commit job on failure
+ verify(metrics, times(0)).fileCommitFinished(anyLong());
+ verify(metrics, times(0)).fileFinished(any(), any());
+ verify(metrics).fileUploadFailed();
+ verify(metrics).writeFailed();
+ });
}
@Test
void closeGraceful() throws IOException, InterruptedException {
- when(executorServiceCommit.awaitTermination(30, TimeUnit.SECONDS)).thenReturn(true);
+ // Use mock executors for this test since we're testing shutdown behavior
+ ExecutorService mockUpload = org.mockito.Mockito.mock(ExecutorService.class);
+ ExecutorService mockCommit = org.mockito.Mockito.mock(ExecutorService.class);
+ ExecutorService mockCacheStore = org.mockito.Mockito.mock(ExecutorService.class);
+
+ when(mockCommit.awaitTermination(30, TimeUnit.SECONDS)).thenReturn(true);
final FileCommitter committer = new FileCommitter(
BROKER_ID, controlPlane, OBJECT_KEY_CREATOR, storage,
KEY_ALIGNMENT_STRATEGY, OBJECT_CACHE, BATCH_COORDINATE_CACHE, time,
3, Duration.ofMillis(100),
- executorServiceUpload, executorServiceCommit, executorServiceCacheStore, metrics);
+ mockUpload, mockCommit, mockCacheStore, metrics);
committer.close();
// Upload pool rejects new work immediately.
- verify(executorServiceUpload).shutdown();
+ verify(mockUpload).shutdown();
// Cache is best-effort, cancelled immediately.
- verify(executorServiceCacheStore).shutdownNow();
+ verify(mockCacheStore).shutdownNow();
// Commits are awaited (they internally wait for their paired uploads).
- verify(executorServiceCommit).awaitTermination(30, TimeUnit.SECONDS);
+ verify(mockCommit).awaitTermination(30, TimeUnit.SECONDS);
// Graceful termination: no force-shutdown needed on commit pool.
- verify(executorServiceCommit, never()).shutdownNow();
+ verify(mockCommit, never()).shutdownNow();
// Remaining uploads with no queued commit are force-stopped.
- verify(executorServiceUpload).shutdownNow();
+ verify(mockUpload).shutdownNow();
verify(metrics).close();
}
@Test
void closeCommitPoolTimesOutThenTerminates() throws IOException, InterruptedException {
+ ExecutorService mockUpload = org.mockito.Mockito.mock(ExecutorService.class);
+ ExecutorService mockCommit = org.mockito.Mockito.mock(ExecutorService.class);
+ ExecutorService mockCacheStore = org.mockito.Mockito.mock(ExecutorService.class);
+
// First await returns false (timeout), after shutdownNow second await succeeds.
- when(executorServiceCommit.awaitTermination(30, TimeUnit.SECONDS))
+ when(mockCommit.awaitTermination(30, TimeUnit.SECONDS))
.thenReturn(false)
.thenReturn(true);
@@ -305,25 +287,29 @@ void closeCommitPoolTimesOutThenTerminates() throws IOException, InterruptedExce
BROKER_ID, controlPlane, OBJECT_KEY_CREATOR, storage,
KEY_ALIGNMENT_STRATEGY, OBJECT_CACHE, BATCH_COORDINATE_CACHE, time,
3, Duration.ofMillis(100),
- executorServiceUpload, executorServiceCommit, executorServiceCacheStore, metrics);
+ mockUpload, mockCommit, mockCacheStore, metrics);
committer.close();
- final InOrder commitOrder = inOrder(executorServiceCommit);
+ final InOrder commitOrder = inOrder(mockCommit);
// Commit pool: shutdown → await → timeout → shutdownNow → await again.
- commitOrder.verify(executorServiceCommit).shutdown();
- commitOrder.verify(executorServiceCommit).awaitTermination(30, TimeUnit.SECONDS);
- commitOrder.verify(executorServiceCommit).shutdownNow();
- commitOrder.verify(executorServiceCommit).awaitTermination(30, TimeUnit.SECONDS);
+ commitOrder.verify(mockCommit).shutdown();
+ commitOrder.verify(mockCommit).awaitTermination(30, TimeUnit.SECONDS);
+ commitOrder.verify(mockCommit).shutdownNow();
+ commitOrder.verify(mockCommit).awaitTermination(30, TimeUnit.SECONDS);
- verify(executorServiceUpload).shutdownNow();
+ verify(mockUpload).shutdownNow();
verify(metrics).close();
}
@Test
void closeCommitPoolNeverTerminates() throws IOException, InterruptedException {
+ ExecutorService mockUpload = org.mockito.Mockito.mock(ExecutorService.class);
+ ExecutorService mockCommit = org.mockito.Mockito.mock(ExecutorService.class);
+ ExecutorService mockCacheStore = org.mockito.Mockito.mock(ExecutorService.class);
+
// Both awaits return false — pool never terminates.
- when(executorServiceCommit.awaitTermination(30, TimeUnit.SECONDS))
+ when(mockCommit.awaitTermination(30, TimeUnit.SECONDS))
.thenReturn(false)
.thenReturn(false);
@@ -331,33 +317,37 @@ void closeCommitPoolNeverTerminates() throws IOException, InterruptedException {
BROKER_ID, controlPlane, OBJECT_KEY_CREATOR, storage,
KEY_ALIGNMENT_STRATEGY, OBJECT_CACHE, BATCH_COORDINATE_CACHE, time,
3, Duration.ofMillis(100),
- executorServiceUpload, executorServiceCommit, executorServiceCacheStore, metrics);
+ mockUpload, mockCommit, mockCacheStore, metrics);
committer.close();
- verify(executorServiceCommit).shutdownNow();
+ verify(mockCommit).shutdownNow();
// Cleanup still completes despite commit pool not terminating.
- verify(executorServiceUpload).shutdownNow();
+ verify(mockUpload).shutdownNow();
verify(metrics).close();
}
@Test
void closeInterruptedDuringAwait() throws IOException, InterruptedException {
- when(executorServiceCommit.awaitTermination(30, TimeUnit.SECONDS))
+ ExecutorService mockUpload = org.mockito.Mockito.mock(ExecutorService.class);
+ ExecutorService mockCommit = org.mockito.Mockito.mock(ExecutorService.class);
+ ExecutorService mockCacheStore = org.mockito.Mockito.mock(ExecutorService.class);
+
+ when(mockCommit.awaitTermination(30, TimeUnit.SECONDS))
.thenThrow(new InterruptedException("shutdown interrupted"));
final FileCommitter committer = new FileCommitter(
BROKER_ID, controlPlane, OBJECT_KEY_CREATOR, storage,
KEY_ALIGNMENT_STRATEGY, OBJECT_CACHE, BATCH_COORDINATE_CACHE, time,
3, Duration.ofMillis(100),
- executorServiceUpload, executorServiceCommit, executorServiceCacheStore, metrics);
+ mockUpload, mockCommit, mockCacheStore, metrics);
committer.close();
// On interrupt: commit pool is force-shutdown.
- verify(executorServiceCommit).shutdownNow();
+ verify(mockCommit).shutdownNow();
// Rest of cleanup still runs.
- verify(executorServiceUpload).shutdownNow();
+ verify(mockUpload).shutdownNow();
verify(metrics).close();
// Interrupt flag is preserved.
assertThat(Thread.interrupted()).isTrue();
@@ -479,4 +469,305 @@ void commitNull() {
.isInstanceOf(NullPointerException.class)
.hasMessage("file cannot be null");
}
+
+ @Test
+ void commitsInSubmissionOrder() throws Exception {
+ // Commits are guaranteed to happen in submission order even if upload2 completes before upload1.
+ // This is critical for correct offset assignment.
+
+ final CountDownLatch upload1Started = new CountDownLatch(1);
+ final CountDownLatch upload2Completed = new CountDownLatch(1);
+ final List commitOrder = new ArrayList<>();
+
+ // File 1 upload will wait until file 2 upload completes
+ doAnswer(invocation -> {
+ upload1Started.countDown();
+ upload2Completed.await();
+ Thread.sleep(50); // Ensure upload2 definitely completes first
+ return null;
+ }).doAnswer(invocation -> {
+ upload2Completed.countDown();
+ return null;
+ }).when(storage).upload(any(), any(InputStream.class), anyLong());
+
+ // Track commit order by capturing the requestId from the CommitBatchRequest
+ when(controlPlane.commitFile(any(), any(), anyInt(), anyLong(), any()))
+ .thenAnswer(invocation -> {
+ @SuppressWarnings("unchecked")
+ List requests = (List) invocation.getArgument(4);
+ synchronized (commitOrder) {
+ // Record which file committed (by its requestId)
+ commitOrder.add(requests.get(0).requestId());
+ }
+ return List.of();
+ });
+
+ when(time.nanoseconds()).thenReturn(10_000_000L);
+ when(time.milliseconds()).thenReturn(10L);
+
+ ExecutorService multiThreadUpload = Executors.newFixedThreadPool(2);
+
+ try {
+ final FileCommitter committer = new FileCommitter(
+ BROKER_ID, controlPlane, OBJECT_KEY_CREATOR, storage,
+ KEY_ALIGNMENT_STRATEGY, OBJECT_CACHE, BATCH_COORDINATE_CACHE, time,
+ 1, Duration.ofMillis(100),
+ multiThreadUpload, executorServiceCommit, executorServiceCacheStore,
+ metrics);
+
+ final ClosedFile file1 = new ClosedFile(
+ Instant.EPOCH,
+ Map.of(1, Map.of(TID0P0, MemoryRecords.EMPTY)),
+ Map.of(1, new CompletableFuture<>()),
+ List.of(CommitBatchRequest.of(1, TID0P0, 0, 0, 0, 0, 0, TimestampType.CREATE_TIME)),
+ Map.of(),
+ new byte[10]);
+
+ final ClosedFile file2 = new ClosedFile(
+ Instant.EPOCH,
+ Map.of(2, Map.of(TID0P0, MemoryRecords.EMPTY)),
+ Map.of(2, new CompletableFuture<>()),
+ List.of(CommitBatchRequest.of(2, TID0P0, 0, 0, 0, 0, 0, TimestampType.CREATE_TIME)),
+ Map.of(),
+ new byte[10]);
+
+ committer.commit(file1);
+ upload1Started.await();
+ committer.commit(file2);
+
+ await().atMost(5, SECONDS).untilAsserted(() -> {
+ synchronized (commitOrder) {
+ assertThat(commitOrder).hasSize(2);
+ }
+ });
+
+ // Verify submission order is maintained
+ synchronized (commitOrder) {
+ assertThat(commitOrder).containsExactly(1, 2);
+ }
+ } finally {
+ multiThreadUpload.shutdownNow();
+ }
+ }
+
+ @Test
+ void failedCommitDoesNotBlockSubsequentCommits() throws Exception {
+ // Verify that if the first commit fails (e.g., control plane error),
+ // the second commit can still succeed. This ensures error isolation.
+
+ final CountDownLatch firstCommitFailed = new CountDownLatch(1);
+ final CountDownLatch secondCommitStarted = new CountDownLatch(1);
+
+ doNothing().when(storage).upload(any(), any(InputStream.class), anyLong());
+
+ // First commit fails with control plane exception, second succeeds
+ when(controlPlane.commitFile(any(), any(), anyInt(), anyLong(), any()))
+ .thenAnswer(invocation -> {
+ firstCommitFailed.countDown();
+ throw new ControlPlaneException("first commit error");
+ })
+ .thenAnswer(invocation -> {
+ secondCommitStarted.countDown();
+ return List.of();
+ });
+
+ when(time.nanoseconds()).thenReturn(10_000_000L);
+ when(time.milliseconds()).thenReturn(10L);
+
+ final FileCommitter committer = new FileCommitter(
+ BROKER_ID, controlPlane, OBJECT_KEY_CREATOR, storage,
+ KEY_ALIGNMENT_STRATEGY, OBJECT_CACHE, BATCH_COORDINATE_CACHE, time,
+ 1, Duration.ofMillis(100),
+ executorServiceUpload, executorServiceCommit, executorServiceCacheStore,
+ metrics);
+
+ final ClosedFile file1 = new ClosedFile(
+ Instant.EPOCH,
+ Map.of(1, Map.of(TID0P0, MemoryRecords.EMPTY)),
+ Map.of(1, new CompletableFuture<>()),
+ List.of(CommitBatchRequest.of(1, TID0P0, 0, 0, 0, 0, 0, TimestampType.CREATE_TIME)),
+ Map.of(),
+ new byte[10]);
+
+ final ClosedFile file2 = new ClosedFile(
+ Instant.EPOCH,
+ Map.of(2, Map.of(TID0P0, MemoryRecords.EMPTY)),
+ Map.of(2, new CompletableFuture<>()),
+ List.of(CommitBatchRequest.of(2, TID0P0, 0, 0, 0, 0, 0, TimestampType.CREATE_TIME)),
+ Map.of(),
+ new byte[10]);
+
+ committer.commit(file1);
+ committer.commit(file2);
+
+ // Wait for both to complete
+ await().atMost(5, SECONDS).untilAsserted(() -> {
+ verify(metrics).writeFailed(); // first commit failed
+ verify(metrics).writeCompleted(); // second commit succeeded
+ });
+
+ // Verify the second commit was actually attempted
+ assertThat(secondCommitStarted.await(1, SECONDS)).isTrue();
+ }
+
+ @Test
+ void asyncModeCloseWhileUploadInFlightStillCommits() throws Exception {
+ // Verify that in async mode, calling close() while an upload is still in-flight
+ // does not cause the commit to be rejected - the close() method awaits pending work.
+
+ final CountDownLatch uploadStarted = new CountDownLatch(1);
+ final CountDownLatch closeStarted = new CountDownLatch(1);
+ final CountDownLatch commitCompleted = new CountDownLatch(1);
+
+ // Upload will block until we signal it to proceed after close() starts
+ doAnswer(invocation -> {
+ uploadStarted.countDown();
+ // Wait for close() to be called
+ closeStarted.await();
+ // Give close() a moment to start waiting for pending futures
+ Thread.sleep(100);
+ return null;
+ }).when(storage).upload(any(), any(InputStream.class), anyLong());
+
+ when(controlPlane.commitFile(any(), any(), anyInt(), anyLong(), any()))
+ .thenAnswer(invocation -> {
+ commitCompleted.countDown();
+ return List.of();
+ });
+
+ when(time.nanoseconds()).thenReturn(10_000_000L);
+ when(time.milliseconds()).thenReturn(10L);
+
+ final FileCommitter committer = new FileCommitter(
+ BROKER_ID, controlPlane, OBJECT_KEY_CREATOR, storage,
+ KEY_ALIGNMENT_STRATEGY, OBJECT_CACHE, BATCH_COORDINATE_CACHE, time,
+ 1, Duration.ofMillis(100),
+ executorServiceUpload, executorServiceCommit, executorServiceCacheStore,
+ metrics); // async mode
+
+ // Submit a file for commit
+ committer.commit(FILE);
+
+ // Wait for upload to start
+ uploadStarted.await();
+
+ // Start close() on a separate thread - it should wait for the pending commit
+ CompletableFuture closeFuture = CompletableFuture.runAsync(() -> {
+ closeStarted.countDown();
+ try {
+ committer.close();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ });
+
+ // Wait for close to complete - it should wait for the commit
+ closeFuture.get(10, SECONDS);
+
+ // Verify the commit actually happened despite close() being called during upload
+ assertThat(commitCompleted.await(1, SECONDS)).isTrue();
+ verify(controlPlane).commitFile(any(), any(), anyInt(), anyLong(), any());
+ verify(metrics).writeCompleted();
+ }
+
+ @Test
+ void cacheStoreRunsAfterUploadCompletesNotAfterCommit() throws Exception {
+ // Verify that cache store is triggered by upload completion, not commit completion.
+ // This ensures cache store starts as soon as upload is done, without waiting for
+ // previous commits to finish.
+
+ final CountDownLatch upload1Completed = new CountDownLatch(1);
+ final CountDownLatch cacheStore1Started = new CountDownLatch(1);
+ final CountDownLatch commit1Started = new CountDownLatch(1);
+ final CountDownLatch allowCommit1ToComplete = new CountDownLatch(1);
+
+ // Track whether cache store started before commit completed
+ final java.util.concurrent.atomic.AtomicBoolean cacheStoreStartedBeforeCommitCompleted =
+ new java.util.concurrent.atomic.AtomicBoolean(false);
+
+ doAnswer(invocation -> {
+ upload1Completed.countDown();
+ return null;
+ }).when(storage).upload(any(), any(InputStream.class), anyLong());
+
+ when(controlPlane.commitFile(any(), any(), anyInt(), anyLong(), any()))
+ .thenAnswer(invocation -> {
+ commit1Started.countDown();
+ // Block the commit
+ allowCommit1ToComplete.await();
+ return List.of();
+ });
+
+ when(time.nanoseconds()).thenReturn(10_000_000L);
+ when(time.milliseconds()).thenReturn(10L);
+
+ // Use a custom ObjectCache that tracks when put is called
+ final ObjectCache trackingCache = new ObjectCache() {
+ @Override
+ public FileExtent get(CacheKey key) {
+ return null;
+ }
+
+ @Override
+ public void put(CacheKey key, FileExtent value) {
+ // Check if commit has completed yet
+ if (allowCommit1ToComplete.getCount() > 0) {
+ // Commit hasn't been signaled to complete, so cache store started before commit finished
+ cacheStoreStartedBeforeCommitCompleted.set(true);
+ }
+ cacheStore1Started.countDown();
+ }
+
+ @Override
+ public CompletableFuture computeIfAbsent(
+ CacheKey key, java.util.function.Function load, java.util.concurrent.Executor loadExecutor) {
+ return CompletableFuture.supplyAsync(() -> load.apply(key), loadExecutor);
+ }
+
+ @Override
+ public boolean remove(CacheKey key) {
+ return false;
+ }
+
+ @Override
+ public long size() {
+ return 0;
+ }
+
+ @Override
+ public void close() {
+ }
+ };
+
+ final FileCommitter committer = new FileCommitter(
+ BROKER_ID, controlPlane, OBJECT_KEY_CREATOR, storage,
+ KEY_ALIGNMENT_STRATEGY, trackingCache, BATCH_COORDINATE_CACHE, time,
+ 1, Duration.ofMillis(100),
+ executorServiceUpload, executorServiceCommit, executorServiceCacheStore,
+ metrics);
+
+ committer.commit(FILE);
+
+ // Wait for upload to complete
+ assertThat(upload1Completed.await(5, SECONDS)).isTrue();
+
+ // Cache store should start after upload, even while commit is blocked
+ assertThat(cacheStore1Started.await(5, SECONDS))
+ .as("Cache store should start after upload completes")
+ .isTrue();
+
+ // Verify cache store started while commit was still blocked
+ assertThat(cacheStoreStartedBeforeCommitCompleted.get())
+ .as("Cache store should run in parallel with commit, not after it")
+ .isTrue();
+
+ // Allow commit to complete
+ allowCommit1ToComplete.countDown();
+
+ // Wait for completion
+ await().atMost(5, SECONDS).untilAsserted(() -> {
+ verify(metrics).writeCompleted();
+ });
+ }
+
}
diff --git a/storage/inkless/src/test/java/io/aiven/inkless/produce/WriterPropertyTest.java b/storage/inkless/src/test/java/io/aiven/inkless/produce/WriterPropertyTest.java
index 40ea5af8ae8..a296c934a25 100644
--- a/storage/inkless/src/test/java/io/aiven/inkless/produce/WriterPropertyTest.java
+++ b/storage/inkless/src/test/java/io/aiven/inkless/produce/WriterPropertyTest.java
@@ -203,28 +203,14 @@ void test(final int requestCount,
Arbitraries.longs().between(uploadDurationAvg - 5, uploadDurationAvg + 5))
);
final CommitterHandler committerHandler = new CommitterHandler(
- uploaderHandler,
- new MockExecutorServiceWithFutureSupport(),
- new Timer("commit",
- time,
- Instant.ofEpochMilli(time.milliseconds()),
- Arbitraries.longs().between(commitDurationAvg - 2, commitDurationAvg + 2))
+ new MockExecutorServiceWithFutureSupport()
);
final CompleterHandler completerHandler = new CompleterHandler(
committerHandler,
- new MockExecutorServiceWithFutureSupport(),
- new Timer("complete",
- time,
- Instant.ofEpochMilli(time.milliseconds()),
- Arbitraries.longs().between(completeDurationAvg - 2, completeDurationAvg + 2))
+ new MockExecutorService()
);
final CacheStoreHandler cacheStoreHandler = new CacheStoreHandler(
- uploaderHandler,
- new MockExecutorServiceWithFutureSupport(),
- new Timer("cacheStore",
- time,
- Instant.ofEpochMilli(time.milliseconds()),
- Arbitraries.longs().between(cacheStoreDurationAvg - 2, cacheStoreDurationAvg + 2))
+ new MockExecutorService()
);
try(final FileCommitter fileCommitter = new FileCommitter(
11,
@@ -281,9 +267,17 @@ void test(final int requestCount,
requester.handleFinishedRequests();
commitTicker.maybeTick();
uploaderHandler.maybeRunNext();
+ // In async mode, upload callbacks add commit/cache tasks.
+ // Process them immediately without waiting for timer.
committerHandler.maybeRunNext();
+ while (!committerHandler.executorService.queue.isEmpty()) {
+ committerHandler.executorService.runNextIfExists();
+ }
completerHandler.maybeRunNext();
cacheStoreHandler.maybeRunNext();
+ while (!cacheStoreHandler.executorService.queue.isEmpty()) {
+ cacheStoreHandler.executorService.runNextIfExists();
+ }
time.sleep(1);
}
assertThat(finished).withFailMessage(String.format("Not finished in %d virtual ms", maxTime)).isTrue();
@@ -574,10 +568,11 @@ public Future submit(final Callable task) {
@Override
boolean runNextIfExists() throws InterruptedException {
- assertThat(returnedFutures.size()).isEqualTo(queue.size());
+ // In async mode, callbacks can add tasks during execution, making sizes unbalanced
final boolean result = super.runNextIfExists();
if (result) {
- assert returnedFutures.take().isDone();
+ final Future> completed = returnedFutures.take();
+ assertThat(completed.isDone()).isTrue();
}
return result;
}
@@ -607,32 +602,20 @@ boolean oldestFutureIsDone() {
}
private static class CommitterHandler {
- private final UploaderHandler uploaderHandler;
private final MockExecutorServiceWithFutureSupport executorService;
- private final Timer timer;
- private CommitterHandler(final UploaderHandler uploaderHandler,
- final MockExecutorServiceWithFutureSupport executorService,
- final Timer timer) {
- this.uploaderHandler = uploaderHandler;
+ private CommitterHandler(final MockExecutorServiceWithFutureSupport executorService) {
this.executorService = executorService;
- this.timer = timer;
}
void maybeRunNext() throws InterruptedException {
- if (!timer.happensNow()) {
- return;
- }
- if (!uploaderHandler.oldestFutureIsDone()) {
- // Otherwise it'd block indefinitely.
- return;
- }
+ // In async mode, run immediately - no timer delay since tasks are
+ // only queued via callbacks when uploads complete
executorService.runNextIfExists();
}
boolean oldestFutureIsDone() {
- return uploaderHandler.oldestFutureIsDone()
- && Optional.ofNullable(executorService.returnedFutures.peek())
+ return Optional.ofNullable(executorService.returnedFutures.peek())
.map(Future::isDone)
.orElse(true);
}
@@ -641,22 +624,15 @@ boolean oldestFutureIsDone() {
private static class CompleterHandler {
private final CommitterHandler committerHandler;
private final MockExecutorService executorService;
- private final Timer timer;
private CompleterHandler(final CommitterHandler committerHandler,
- final MockExecutorService executorService,
- final Timer timer) {
+ final MockExecutorService executorService) {
this.committerHandler = committerHandler;
this.executorService = executorService;
- this.timer = timer;
}
void maybeRunNext() throws InterruptedException {
- if (!timer.happensNow()) {
- return;
- }
if (!committerHandler.oldestFutureIsDone()) {
- // Otherwise it'd block indefinitely.
return;
}
executorService.runNextIfExists();
@@ -664,26 +640,14 @@ void maybeRunNext() throws InterruptedException {
}
private static class CacheStoreHandler {
- private final UploaderHandler uploaderHandler;
private final MockExecutorService executorService;
- private final Timer timer;
- private CacheStoreHandler(final UploaderHandler uploaderHandler,
- final MockExecutorService executorService,
- final Timer timer) {
- this.uploaderHandler = uploaderHandler;
+ private CacheStoreHandler(final MockExecutorService executorService) {
this.executorService = executorService;
- this.timer = timer;
}
void maybeRunNext() throws InterruptedException {
- if (!timer.happensNow()) {
- return;
- }
- if (!uploaderHandler.oldestFutureIsDone()) {
- // Otherwise it'd block indefinitely.
- return;
- }
+ // In async mode, run immediately - no timer delay
executorService.runNextIfExists();
}
}