From 7bbcc95b5e336f54f725d3c8d630d05724f66f98 Mon Sep 17 00:00:00 2001 From: Titouan Chary Date: Fri, 13 Mar 2026 08:56:36 +0100 Subject: [PATCH] perf(produce): implement async commit pipeline to reduce FileCommitWaitTime Replace blocking commit pattern with pipelined CompletableFuture chain: - Upload starts immediately when file is closed - Commits are chained via previousCommitFuture to preserve submission order - Each commit waits for its upload AND previous commit before starting - Cache store is fire-and-forget (thenAcceptAsync on upload success) Key changes: - FileCommitJob now implements Function> instead of BiFunction, only handling the success path - CacheStoreJob now implements Consumer, invoked only on successful uploads via thenAcceptAsync - Upload failures are detected via CompletionException unwrapping in handleCommitResult() - markReadyToCommit() resets wait time measurement to exclude upload and previous commit wait times This reduces FileCommitWaitTime by removing thread blocking while waiting for uploads to complete. The commit thread now only runs the actual commit operation. Co-Authored-By: Claude Opus 4.5 --- .../aiven/inkless/produce/CacheStoreJob.java | 34 +- .../aiven/inkless/produce/FileCommitJob.java | 88 ++- .../aiven/inkless/produce/FileCommitter.java | 224 +++++--- .../inkless/produce/CacheStoreJobTest.java | 170 ++++++ .../inkless/produce/FileCommitJobTest.java | 74 +-- .../inkless/produce/FileCommitterTest.java | 525 ++++++++++++++---- .../inkless/produce/WriterPropertyTest.java | 78 +-- 7 files changed, 860 insertions(+), 333 deletions(-) create mode 100644 storage/inkless/src/test/java/io/aiven/inkless/produce/CacheStoreJobTest.java diff --git a/storage/inkless/src/main/java/io/aiven/inkless/produce/CacheStoreJob.java b/storage/inkless/src/main/java/io/aiven/inkless/produce/CacheStoreJob.java index 3f13754c000..7e29d63295f 100644 --- a/storage/inkless/src/main/java/io/aiven/inkless/produce/CacheStoreJob.java +++ b/storage/inkless/src/main/java/io/aiven/inkless/produce/CacheStoreJob.java @@ -21,7 +21,6 @@ import java.util.Collections; import java.util.Set; -import java.util.concurrent.Future; import java.util.function.Consumer; import io.aiven.inkless.TimeUtils; @@ -32,32 +31,43 @@ import io.aiven.inkless.generated.CacheKey; import io.aiven.inkless.generated.FileExtent; -public class CacheStoreJob implements Runnable { +/** + * Handles caching of uploaded file data to the object cache. + * + *

Implements {@link Consumer} for use with {@code CompletableFuture.thenAcceptAsync()}. + * When the upload completes successfully, the data is stored to cache. + * + *

Thread Safety: The {@link #accept} method is safe to call from any thread. + * The caller controls the execution context via {@code thenAcceptAsync(..., executor)}. + */ +class CacheStoreJob implements Consumer { private final Time time; private final ObjectCache cache; private final KeyAlignmentStrategy keyAlignmentStrategy; private final byte[] data; - private final Future uploadFuture; private final Consumer cacheStoreDurationCallback; - public CacheStoreJob(Time time, ObjectCache cache, KeyAlignmentStrategy keyAlignmentStrategy, byte[] data, Future uploadFuture, Consumer cacheStoreDurationCallback) { + public CacheStoreJob(Time time, + ObjectCache cache, + KeyAlignmentStrategy keyAlignmentStrategy, + byte[] data, + Consumer cacheStoreDurationCallback) { this.time = time; this.cache = cache; this.keyAlignmentStrategy = keyAlignmentStrategy; this.data = data; - this.uploadFuture = uploadFuture; this.cacheStoreDurationCallback = cacheStoreDurationCallback; } + /** + * Stores the uploaded file data to cache. + * + *

This method is only called when upload succeeds (via thenAcceptAsync). + */ @Override - public void run() { - try { - ObjectKey objectKey = uploadFuture.get(); - storeToCache(objectKey); - } catch (final Throwable e) { - // If the upload failed there's nothing to cache and we succeed vacuously. - } + public void accept(ObjectKey objectKey) { + storeToCache(objectKey); } private void storeToCache(ObjectKey objectKey) { diff --git a/storage/inkless/src/main/java/io/aiven/inkless/produce/FileCommitJob.java b/storage/inkless/src/main/java/io/aiven/inkless/produce/FileCommitJob.java index a8aa1cb904c..48de7bafc24 100644 --- a/storage/inkless/src/main/java/io/aiven/inkless/produce/FileCommitJob.java +++ b/storage/inkless/src/main/java/io/aiven/inkless/produce/FileCommitJob.java @@ -23,10 +23,8 @@ import org.slf4j.LoggerFactory; import java.util.List; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; import java.util.function.Consumer; -import java.util.function.Supplier; +import java.util.function.Function; import io.aiven.inkless.TimeUtils; import io.aiven.inkless.common.ObjectFormat; @@ -40,24 +38,27 @@ /** * The job of committing the already uploaded file to the control plane. * - *

If the file was uploaded successfully, commit to the control plane happens. Otherwise, it doesn't. + *

This class implements {@link Function} for use with {@code CompletableFuture.thenApplyAsync()}. + * When the upload completes successfully, {@link #apply} is invoked to perform the actual commit. + * This eliminates blocking wait on upload completion, allowing the commit executor to + * only do actual commit work instead of waiting for S3 latency. + * + *

Upload failures are handled separately in the future chain and don't reach this job. */ -class FileCommitJob implements Supplier> { +class FileCommitJob implements Function> { private static final Logger LOGGER = LoggerFactory.getLogger(FileCommitJob.class); private final int brokerId; private final ClosedFile file; - private final Future uploadFuture; private final Time time; private final ControlPlane controlPlane; private final ObjectDeleter objectDeleter; - private final long commitSubmitTimeMs; + private volatile long commitSubmitTimeMs; private final Consumer durationCallback; private final Consumer commitWaitDurationCallback; FileCommitJob(final int brokerId, final ClosedFile file, - final Future uploadFuture, final Time time, final ControlPlane controlPlane, final ObjectDeleter objectDeleter, @@ -65,7 +66,6 @@ class FileCommitJob implements Supplier> { final Consumer commitWaitDurationCallback) { this.brokerId = brokerId; this.file = file; - this.uploadFuture = uploadFuture; this.controlPlane = controlPlane; this.time = time; this.objectDeleter = objectDeleter; @@ -75,48 +75,44 @@ class FileCommitJob implements Supplier> { this.commitWaitDurationCallback = commitWaitDurationCallback; } + /** + * Resets the submit time to now. Call this when the commit is actually ready to be executed + * (e.g., after upload completes in async mode) to measure only the executor queue wait time, + * not the time waiting for previous commits in the chain. + */ + void markReadyToCommit() { + this.commitSubmitTimeMs = time.milliseconds(); + } + + /** + * {@inheritDoc} + * + *

Commits the uploaded file to the control plane. + * This method is only called when upload succeeds (via thenApplyAsync). + */ @Override - public List get() { - // The wait for upload is already measured, and should take the upload time or less if it was already completed. - final UploadResult uploadResult = waitForUpload(); - // Measure the duration from the commit job submission to the moment we start committing. - // and should account for the wait time to execute the commit job on a single-threaded executor. + public List apply(ObjectKey objectKey) { + // Measure the duration from markReadyToCommit() to the moment we start committing. + // markReadyToCommit() is called after upload completion and chain wait, + // so this measures only the executor queue wait time. commitWaitDurationCallback.accept(time.milliseconds() - commitSubmitTimeMs); - return TimeUtils.measureDurationMsSupplier(time, () -> doCommit(uploadResult), durationCallback); - } - private UploadResult waitForUpload() { - try { - final ObjectKey objectKey = uploadFuture.get(); - return new UploadResult(objectKey, null); - } catch (final ExecutionException e) { - LOGGER.error("Failed upload", e); - return new UploadResult(null, e.getCause()); - } catch (final InterruptedException e) { - // This is not expected as we try to shut down the executor gracefully. - LOGGER.error("Interrupted", e); - throw new RuntimeException(e); - } + return TimeUtils.measureDurationMsSupplier(time, () -> doCommit(objectKey), durationCallback); } - private List doCommit(final UploadResult result) { - if (result.objectKey != null) { - LOGGER.debug("Uploaded {} successfully, committing", result.objectKey); - try { - final var commitBatchResponses = controlPlane.commitFile(result.objectKey.value(), ObjectFormat.WRITE_AHEAD_MULTI_SEGMENT, brokerId, file.size(), file.commitBatchRequests()); - LOGGER.debug("Committed successfully"); - return commitBatchResponses; - } catch (final Exception e) { - LOGGER.error("Commit failed", e); - if (e instanceof ControlPlaneException) { - // only attempt to remove the uploaded file if it is a control plane error - tryDeleteFile(result.objectKey(), e); - } - throw e; + private List doCommit(final ObjectKey objectKey) { + LOGGER.debug("Uploaded {} successfully, committing", objectKey); + try { + final var commitBatchResponses = controlPlane.commitFile(objectKey.value(), ObjectFormat.WRITE_AHEAD_MULTI_SEGMENT, brokerId, file.size(), file.commitBatchRequests()); + LOGGER.debug("Committed successfully"); + return commitBatchResponses; + } catch (final Exception e) { + LOGGER.error("Commit failed", e); + if (e instanceof ControlPlaneException) { + // only attempt to remove the uploaded file if it is a control plane error + tryDeleteFile(objectKey, e); } - } else { - // no need to log here, it was already logged in waitForUpload - throw new FileUploadException(result.uploadError); + throw e; } } @@ -141,6 +137,4 @@ private void tryDeleteFile(ObjectKey objectKey, Exception e) { } } - private record UploadResult(ObjectKey objectKey, Throwable uploadError) { - } } diff --git a/storage/inkless/src/main/java/io/aiven/inkless/produce/FileCommitter.java b/storage/inkless/src/main/java/io/aiven/inkless/produce/FileCommitter.java index dff48286e97..0c8b6c4c56b 100644 --- a/storage/inkless/src/main/java/io/aiven/inkless/produce/FileCommitter.java +++ b/storage/inkless/src/main/java/io/aiven/inkless/produce/FileCommitter.java @@ -35,7 +35,6 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.Lock; @@ -99,6 +98,10 @@ class FileCommitter implements Closeable { private final AtomicInteger totalFilesInProgress = new AtomicInteger(0); private final AtomicInteger totalBytesInProgress = new AtomicInteger(0); + // Tracks the previous commit to ensure commits happen in submission order, not upload completion order. + // Each new commit waits for both its upload AND the previous commit to complete before starting. + private CompletableFuture previousCommitFuture = CompletableFuture.completedFuture(null); + @DoNotMutate FileCommitter(final int brokerId, final ControlPlane controlPlane, @@ -175,83 +178,144 @@ void commit(final ClosedFile file) throws InterruptedException { lock.lock(); try { final Instant uploadAndCommitStart = TimeUtils.durationMeasurementNow(time); - CompletableFuture> commitFuture; + final CompletableFuture> commitFuture; + if (file.isEmpty()) { - // If the file is empty, skip uploading and committing, but proceed with the later steps. commitFuture = CompletableFuture.completedFuture(Collections.emptyList()); } else { - metrics.fileAdded(file.size()); - metrics.batchesAdded(file.commitBatchRequests().size()); - totalFilesInProgress.addAndGet(1); - totalBytesInProgress.addAndGet(file.size()); - - // Start uploading and add to the commit queue (as Runnable). - // This ensures files are uploaded in concurrently, but committed to the control plane sequentially, - // because `executorServiceCommit` is single-threaded. - final FileUploadJob uploadJob = FileUploadJob.createFromByteArray( - objectKeyCreator, - storage, - time, - maxFileUploadAttempts, - fileUploadRetryBackoff, - file.data(), - metrics::fileUploadFinished - ); - final Future uploadFuture = executorServiceUpload.submit(uploadJob); - - final FileCommitJob commitJob = new FileCommitJob( - brokerId, - file, - uploadFuture, - time, - controlPlane, - storage, - metrics::fileCommitFinished, - metrics::fileCommitWaitFinished - ); - commitFuture = CompletableFuture.supplyAsync(commitJob, executorServiceCommit) - .whenComplete((result, error) -> { - totalFilesInProgress.addAndGet(-1); - totalBytesInProgress.addAndGet(-file.size()); - if (error != null) { - // at this point the commit has failed and need to check whether it failed on upload or commit - LOGGER.error("Failed to commit diskless file {}", file, error); - if (error.getCause() instanceof FileUploadException) { - metrics.fileUploadFailed(); - } else { - metrics.fileCommitFailed(); - } - } else { - // only mark as finished if everything succeeded - metrics.fileFinished(file.start(), uploadAndCommitStart); - } - }); - - final CacheStoreJob cacheStoreJob = new CacheStoreJob( - time, - objectCache, - keyAlignmentStrategy, - file.data(), - uploadFuture, - metrics::cacheStoreFinished - ); - executorServiceCacheStore.submit(cacheStoreJob); + commitFuture = commitNonEmptyFile(file, uploadAndCommitStart); } - commitFuture.whenComplete((commitBatchResponses, throwable) -> { - final AppendCompleter completerJob = new AppendCompleter(file, batchCoordinateCache); - if (commitBatchResponses != null) { - completerJob.finishCommitSuccessfully(commitBatchResponses); - metrics.writeCompleted(); - } else { - completerJob.finishCommitWithError(); - metrics.writeFailed(); - } - }); + + attachCompletionHandler(file, commitFuture); } finally { lock.unlock(); } } + private CompletableFuture> commitNonEmptyFile( + final ClosedFile file, + final Instant uploadAndCommitStart) { + + updateMetricsOnStart(file); + + final CompletableFuture uploadFuture = startUpload(file); + final FileCommitJob commitJob = createCommitJob(file); + final CacheStoreJob cacheStoreJob = createCacheStoreJob(file); + + return commitWithPipeline(file, uploadFuture, commitJob, cacheStoreJob, uploadAndCommitStart); + } + + private void updateMetricsOnStart(final ClosedFile file) { + metrics.fileAdded(file.size()); + metrics.batchesAdded(file.commitBatchRequests().size()); + totalFilesInProgress.addAndGet(1); + totalBytesInProgress.addAndGet(file.size()); + } + + private CompletableFuture startUpload(final ClosedFile file) { + final FileUploadJob uploadJob = FileUploadJob.createFromByteArray( + objectKeyCreator, storage, time, + maxFileUploadAttempts, fileUploadRetryBackoff, + file.data(), metrics::fileUploadFinished + ); + return CompletableFuture.supplyAsync( + () -> { + try { + return uploadJob.call(); + } catch (final Exception e) { + throw new FileUploadException(e); + } + }, + executorServiceUpload + ); + } + + private FileCommitJob createCommitJob(final ClosedFile file) { + return new FileCommitJob( + brokerId, file, time, controlPlane, storage, + metrics::fileCommitFinished, metrics::fileCommitWaitFinished + ); + } + + private CacheStoreJob createCacheStoreJob(final ClosedFile file) { + return new CacheStoreJob( + time, objectCache, keyAlignmentStrategy, + file.data(), metrics::cacheStoreFinished + ); + } + + /** + * Pipelined commit: callbacks triggered when upload completes, no thread blocking. + * Commits are chained to ensure submission order is preserved. + * Cache store is fire-and-forget (runs only on success). + */ + private CompletableFuture> commitWithPipeline( + final ClosedFile file, + final CompletableFuture uploadFuture, + final FileCommitJob commitJob, + final CacheStoreJob cacheStoreJob, + final Instant uploadAndCommitStart) { + + // Wait for previous commit to complete (success OR failure) before starting this one. + // Using handle() ensures we don't propagate previous failures - each commit is independent. + // We only use previousCommitFuture for ordering, not for error propagation. + final CompletableFuture prevCommitBarrier = previousCommitFuture.handle((result, error) -> null); + + // Chain commit to run after upload completes AND previous commit completes + final CompletableFuture> commitFuture = uploadFuture + .thenCombine(prevCommitBarrier, (objectKey, ignored) -> objectKey) + .thenApplyAsync(objectKey -> { + // Reset the submit time now that we're ready to commit. + // This ensures FileCommitWaitTime measures only the executor queue wait, + // not the time waiting for upload completion or previous commits in the chain. + commitJob.markReadyToCommit(); + return commitJob.apply(objectKey); + }, executorServiceCommit) + .whenComplete((result, error) -> handleCommitResult(file, uploadAndCommitStart, error)); + + // Cache store is fire-and-forget, runs only on successful upload + uploadFuture.thenAcceptAsync(cacheStoreJob, executorServiceCacheStore); + + // Update chain for next commit + previousCommitFuture = commitFuture; + + return commitFuture; + } + + private void handleCommitResult(final ClosedFile file, final Instant uploadAndCommitStart, final Throwable error) { + totalFilesInProgress.addAndGet(-1); + totalBytesInProgress.addAndGet(-file.size()); + if (error != null) { + LOGGER.error("Failed to commit diskless file {}", file, error); + // Unwrap CompletionException if present to check for the actual cause + final Throwable cause = (error instanceof java.util.concurrent.CompletionException && error.getCause() != null) + ? error.getCause() + : error; + if (cause instanceof FileUploadException) { + metrics.fileUploadFailed(); + } else { + metrics.fileCommitFailed(); + } + } else { + metrics.fileFinished(file.start(), uploadAndCommitStart); + } + } + + private void attachCompletionHandler( + final ClosedFile file, + final CompletableFuture> commitFuture) { + commitFuture.whenComplete((commitBatchResponses, throwable) -> { + final AppendCompleter completerJob = new AppendCompleter(file, batchCoordinateCache); + if (commitBatchResponses != null) { + completerJob.finishCommitSuccessfully(commitBatchResponses); + metrics.writeCompleted(); + } else { + completerJob.finishCommitWithError(); + metrics.writeFailed(); + } + }); + } + int totalFilesInProgress() { return totalFilesInProgress.get(); } @@ -260,8 +324,30 @@ int totalBytesInProgress() { return totalBytesInProgress.get(); } + private static final long SHUTDOWN_TIMEOUT_SECONDS = 30; + @Override public void close() throws IOException { + // First, await any pending async work to ensure callbacks are scheduled before shutdown. + // This prevents RejectedExecutionException for in-flight uploads. + final CompletableFuture pendingWork; + lock.lock(); + try { + pendingWork = previousCommitFuture; + } finally { + lock.unlock(); + } + + try { + // Wait for pending work with a timeout to avoid indefinite blocking + pendingWork + .orTimeout(SHUTDOWN_TIMEOUT_SECONDS, TimeUnit.SECONDS) + .handle((result, error) -> null) // Ignore errors, we just want to wait + .join(); + } catch (final Exception e) { + LOGGER.warn("Timeout waiting for pending commits during shutdown", e); + } + // Reject new upload work immediately. executorServiceUpload.shutdown(); // Cache is best-effort; cancel immediately rather than waiting. diff --git a/storage/inkless/src/test/java/io/aiven/inkless/produce/CacheStoreJobTest.java b/storage/inkless/src/test/java/io/aiven/inkless/produce/CacheStoreJobTest.java new file mode 100644 index 00000000000..7317b6be5bb --- /dev/null +++ b/storage/inkless/src/test/java/io/aiven/inkless/produce/CacheStoreJobTest.java @@ -0,0 +1,170 @@ +/* + * Inkless + * Copyright (C) 2024 - 2025 Aiven OY + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package io.aiven.inkless.produce; + +import org.apache.kafka.common.utils.MockTime; +import org.apache.kafka.common.utils.Time; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.ArgumentCaptor; +import org.mockito.Captor; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.mockito.junit.jupiter.MockitoSettings; +import org.mockito.quality.Strictness; + +import java.util.function.Consumer; + +import io.aiven.inkless.cache.FixedBlockAlignment; +import io.aiven.inkless.cache.KeyAlignmentStrategy; +import io.aiven.inkless.cache.ObjectCache; +import io.aiven.inkless.common.ObjectKey; +import io.aiven.inkless.common.PlainObjectKey; +import io.aiven.inkless.generated.CacheKey; +import io.aiven.inkless.generated.FileExtent; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +@ExtendWith(MockitoExtension.class) +@MockitoSettings(strictness = Strictness.STRICT_STUBS) +class CacheStoreJobTest { + + static final ObjectKey OBJECT_KEY = PlainObjectKey.create("prefix", "test-object"); + static final byte[] DATA = new byte[] {1, 2, 3, 4, 5, 6, 7, 8, 9, 10}; + + @Mock + ObjectCache objectCache; + @Mock + Consumer cacheStoreDurationCallback; + @Captor + ArgumentCaptor cacheKeyCaptor; + @Captor + ArgumentCaptor fileExtentCaptor; + + Time time; + KeyAlignmentStrategy keyAlignmentStrategy; + + @BeforeEach + void setUp() { + time = new MockTime(); + // Use a block size larger than data to get a single cache key + keyAlignmentStrategy = new FixedBlockAlignment(Integer.MAX_VALUE); + } + + @Test + void acceptStoresDataToCache() { + final CacheStoreJob job = new CacheStoreJob( + time, + objectCache, + keyAlignmentStrategy, + DATA, + cacheStoreDurationCallback + ); + + // Simulate successful upload completion (thenAcceptAsync only calls on success) + job.accept(OBJECT_KEY); + + // Verify cache.put was called + verify(objectCache).put(cacheKeyCaptor.capture(), fileExtentCaptor.capture()); + + // Verify the cache key + final CacheKey capturedKey = cacheKeyCaptor.getValue(); + assertThat(capturedKey.object()).isEqualTo(OBJECT_KEY.value()); + assertThat(capturedKey.range().offset()).isZero(); + assertThat(capturedKey.range().length()).isEqualTo(Integer.MAX_VALUE); + + // Verify the file extent contains the data + final FileExtent capturedExtent = fileExtentCaptor.getValue(); + assertThat(capturedExtent.object()).isEqualTo(OBJECT_KEY.value()); + assertThat(capturedExtent.data()).isEqualTo(DATA); + + // Verify duration callback was invoked + verify(cacheStoreDurationCallback).accept(any()); + } + + @Test + void acceptWithMultipleBlocksStoresAllBlocks() { + // Use a smaller block size to create multiple cache entries + final int blockSize = 4; + keyAlignmentStrategy = new FixedBlockAlignment(blockSize); + + final CacheStoreJob job = new CacheStoreJob( + time, + objectCache, + keyAlignmentStrategy, + DATA, // 10 bytes = 3 blocks (0-3, 4-7, 8-11) + cacheStoreDurationCallback + ); + + // Simulate successful upload completion + job.accept(OBJECT_KEY); + + // Verify cache.put was called 3 times (for each block) + verify(objectCache, times(3)).put(cacheKeyCaptor.capture(), fileExtentCaptor.capture()); + + // Verify duration callback was invoked 3 times + verify(cacheStoreDurationCallback, times(3)).accept(any()); + } + + @Test + void acceptStoresCorrectDataRange() { + final CacheStoreJob job = new CacheStoreJob( + time, + objectCache, + keyAlignmentStrategy, + DATA, + cacheStoreDurationCallback + ); + + job.accept(OBJECT_KEY); + + verify(objectCache).put(cacheKeyCaptor.capture(), fileExtentCaptor.capture()); + + final FileExtent extent = fileExtentCaptor.getValue(); + assertThat(extent.range().offset()).isZero(); + assertThat(extent.range().length()).isEqualTo(DATA.length); + assertThat(extent.data()).hasSize(DATA.length); + } + + @Test + void acceptWithEmptyDataStoresEmptyExtent() { + final byte[] emptyData = new byte[0]; + + final CacheStoreJob job = new CacheStoreJob( + time, + objectCache, + keyAlignmentStrategy, + emptyData, + cacheStoreDurationCallback + ); + + job.accept(OBJECT_KEY); + + // With empty data and MAX_VALUE block size, alignment still produces one block + // but the extent will have empty data + verify(objectCache).put(cacheKeyCaptor.capture(), fileExtentCaptor.capture()); + + final FileExtent extent = fileExtentCaptor.getValue(); + assertThat(extent.data()).isEmpty(); + } +} diff --git a/storage/inkless/src/test/java/io/aiven/inkless/produce/FileCommitJobTest.java b/storage/inkless/src/test/java/io/aiven/inkless/produce/FileCommitJobTest.java index 9e0104a3112..347db1969ef 100644 --- a/storage/inkless/src/test/java/io/aiven/inkless/produce/FileCommitJobTest.java +++ b/storage/inkless/src/test/java/io/aiven/inkless/produce/FileCommitJobTest.java @@ -51,7 +51,6 @@ import io.aiven.inkless.control_plane.ControlPlaneException; import io.aiven.inkless.control_plane.InMemoryControlPlane; import io.aiven.inkless.storage_backend.common.ObjectDeleter; -import io.aiven.inkless.storage_backend.common.StorageBackendException; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.never; @@ -127,10 +126,10 @@ void commitFinishedSuccessfully() { when(time.nanoseconds()).thenReturn(10_000_000L, 20_000_000L); final ClosedFile file = new ClosedFile(Instant.EPOCH, REQUESTS, awaitingFuturesByRequest, COMMIT_BATCH_REQUESTS, Map.of(), DATA); - final CompletableFuture uploadFuture = CompletableFuture.completedFuture(OBJECT_KEY); - final FileCommitJob job = new FileCommitJob(BROKER_ID, file, uploadFuture, time, controlPlane, objectDeleter, commitTimeDurationCallback, commitWaitTimeDurationCallback); + final FileCommitJob job = new FileCommitJob(BROKER_ID, file, time, controlPlane, objectDeleter, commitTimeDurationCallback, commitWaitTimeDurationCallback); - job.get(); + // Simulate successful upload completion via callback + job.apply(OBJECT_KEY); verify(commitWaitTimeDurationCallback).accept(eq(200L)); verify(commitTimeDurationCallback).accept(eq(10L)); @@ -153,72 +152,85 @@ void commitFinishedSuccessfullyZeroBatches() { when(time.nanoseconds()).thenReturn(10_000_000L, 20_000_000L); final ClosedFile file = new ClosedFile(Instant.EPOCH, REQUESTS, awaitingFuturesByRequest, COMMIT_BATCH_REQUESTS, Map.of(), DATA); - final CompletableFuture uploadFuture = CompletableFuture.completedFuture(OBJECT_KEY); - final FileCommitJob job = new FileCommitJob(BROKER_ID, file, uploadFuture, time, controlPlane, objectDeleter, commitTimeDurationCallback, commitWaitTimeDurationCallback); + final FileCommitJob job = new FileCommitJob(BROKER_ID, file, time, controlPlane, objectDeleter, commitTimeDurationCallback, commitWaitTimeDurationCallback); - job.get(); + // Simulate successful upload completion via callback + job.apply(OBJECT_KEY); verify(commitWaitTimeDurationCallback).accept(eq(200L)); verify(commitTimeDurationCallback).accept(eq(10L)); } - @Test - void commitFinishedWithError() { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + void deleteObjectWhenFailureOnCommitIsFromControlPlane(boolean isSafeToDelete) throws Exception { final Map>> awaitingFuturesByRequest = Map.of( 0, new CompletableFuture<>(), 1, new CompletableFuture<>() ); - when(time.milliseconds()).thenReturn(10_000L, 10_200L); - when(time.nanoseconds()).thenReturn(10_000_000L, 20_000_000L); + when(controlPlane.commitFile(eq(OBJECT_KEY_MAIN_PART), eq(ObjectFormat.WRITE_AHEAD_MULTI_SEGMENT), eq(BROKER_ID), eq(FILE_SIZE), eq(COMMIT_BATCH_REQUESTS))) + .thenThrow(new ControlPlaneException("test")); + when(controlPlane.isSafeToDeleteFile(eq(OBJECT_KEY_MAIN_PART))).thenReturn(isSafeToDelete); final ClosedFile file = new ClosedFile(Instant.EPOCH, REQUESTS, awaitingFuturesByRequest, COMMIT_BATCH_REQUESTS, Map.of(), DATA); - final CompletableFuture uploadFuture = CompletableFuture.failedFuture(new StorageBackendException("test")); - final FileCommitJob job = new FileCommitJob(BROKER_ID, file, uploadFuture, time, controlPlane, objectDeleter, commitTimeDurationCallback, commitWaitTimeDurationCallback); + final FileCommitJob job = new FileCommitJob(BROKER_ID, file, time, controlPlane, objectDeleter, commitTimeDurationCallback, commitWaitTimeDurationCallback); - Assert.assertThrows(RuntimeException.class, job::get); + // Simulate successful upload but control plane commit failure + Assert.assertThrows(ControlPlaneException.class, () -> job.apply(OBJECT_KEY)); - verify(commitWaitTimeDurationCallback).accept(eq(200L)); - verify(commitTimeDurationCallback).accept(eq(10L)); + verify(objectDeleter, times(isSafeToDelete ? 1 : 0)).delete(eq(OBJECT_KEY)); } - @ParameterizedTest - @ValueSource(booleans = {true, false}) - void deleteObjectWhenFailureOnCommitIsFromControlPlane(boolean isSafeToDelete) throws Exception { + @Test + void doNotDeleteObjectWhenFailureOnCommitIsNotFromControlPlane() throws Exception { final Map>> awaitingFuturesByRequest = Map.of( 0, new CompletableFuture<>(), 1, new CompletableFuture<>() ); when(controlPlane.commitFile(eq(OBJECT_KEY_MAIN_PART), eq(ObjectFormat.WRITE_AHEAD_MULTI_SEGMENT), eq(BROKER_ID), eq(FILE_SIZE), eq(COMMIT_BATCH_REQUESTS))) - .thenThrow(new ControlPlaneException("test")); - when(controlPlane.isSafeToDeleteFile(eq(OBJECT_KEY_MAIN_PART))).thenReturn(isSafeToDelete); + .thenThrow(new RuntimeException("test")); final ClosedFile file = new ClosedFile(Instant.EPOCH, REQUESTS, awaitingFuturesByRequest, COMMIT_BATCH_REQUESTS, Map.of(), DATA); - final CompletableFuture uploadFuture = CompletableFuture.completedFuture(OBJECT_KEY); - final FileCommitJob job = new FileCommitJob(BROKER_ID, file, uploadFuture, time, controlPlane, objectDeleter, commitTimeDurationCallback, commitWaitTimeDurationCallback); + final FileCommitJob job = new FileCommitJob(BROKER_ID, file, time, controlPlane, objectDeleter, commitTimeDurationCallback, commitWaitTimeDurationCallback); - Assert.assertThrows(RuntimeException.class, job::get); + // Simulate successful upload but commit failure (non-control-plane error) + Assert.assertThrows(RuntimeException.class, () -> job.apply(OBJECT_KEY)); - verify(objectDeleter, times(isSafeToDelete ? 1 : 0)).delete(eq(OBJECT_KEY)); + verify(objectDeleter, never()).delete(eq(OBJECT_KEY)); } @Test - void doNotDeleteObjectWhenFailureOnCommitIsNotFromControlPlane() throws Exception { + void markReadyToCommitResetsWaitTime() { final Map>> awaitingFuturesByRequest = Map.of( 0, new CompletableFuture<>(), 1, new CompletableFuture<>() ); + final List commitBatchResponses = List.of( + CommitBatchResponse.success(0, 10, 0, "objectKey", COMMIT_BATCH_REQUESTS.get(0)), + CommitBatchResponse.of(Errors.INVALID_TOPIC_EXCEPTION, -1, -1, -1), + CommitBatchResponse.success(20, 10, 0, "objectKey", COMMIT_BATCH_REQUESTS.get(2)), + CommitBatchResponse.success(30, 10, 0, "objectKey", COMMIT_BATCH_REQUESTS.get(3)) + ); + when(controlPlane.commitFile(eq(OBJECT_KEY_MAIN_PART), eq(ObjectFormat.WRITE_AHEAD_MULTI_SEGMENT), eq(BROKER_ID), eq(FILE_SIZE), eq(COMMIT_BATCH_REQUESTS))) - .thenThrow(new RuntimeException("test")); + .thenReturn(commitBatchResponses); + // Job created at T=10000, markReadyToCommit at T=15000, onUploadComplete at T=15050 + when(time.milliseconds()).thenReturn(10_000L, 15_000L, 15_050L); + when(time.nanoseconds()).thenReturn(10_000_000L, 20_000_000L); final ClosedFile file = new ClosedFile(Instant.EPOCH, REQUESTS, awaitingFuturesByRequest, COMMIT_BATCH_REQUESTS, Map.of(), DATA); - final CompletableFuture uploadFuture = CompletableFuture.completedFuture(OBJECT_KEY); - final FileCommitJob job = new FileCommitJob(BROKER_ID, file, uploadFuture, time, controlPlane, objectDeleter, commitTimeDurationCallback, commitWaitTimeDurationCallback); + final FileCommitJob job = new FileCommitJob(BROKER_ID, file, time, controlPlane, objectDeleter, commitTimeDurationCallback, commitWaitTimeDurationCallback); - Assert.assertThrows(RuntimeException.class, job::get); + // Simulate async pipeline: reset submit time when actually ready to commit + job.markReadyToCommit(); - verify(objectDeleter, never()).delete(eq(OBJECT_KEY)); + // Simulate successful upload completion via callback + job.apply(OBJECT_KEY); + + // Wait time should be 50ms (from markReadyToCommit to apply), not 5050ms (from constructor) + verify(commitWaitTimeDurationCallback).accept(eq(50L)); } } diff --git a/storage/inkless/src/test/java/io/aiven/inkless/produce/FileCommitterTest.java b/storage/inkless/src/test/java/io/aiven/inkless/produce/FileCommitterTest.java index 567e9968acc..bd3612aa26c 100644 --- a/storage/inkless/src/test/java/io/aiven/inkless/produce/FileCommitterTest.java +++ b/storage/inkless/src/test/java/io/aiven/inkless/produce/FileCommitterTest.java @@ -23,10 +23,10 @@ import org.apache.kafka.common.record.TimestampType; import org.apache.kafka.common.utils.Time; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; -import org.mockito.ArgumentCaptor; -import org.mockito.Captor; import org.mockito.InOrder; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; @@ -37,11 +37,13 @@ import java.io.InputStream; import java.time.Duration; import java.time.Instant; +import java.util.ArrayList; import java.util.List; import java.util.Map; -import java.util.concurrent.Callable; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import io.aiven.inkless.cache.BatchCoordinateCache; @@ -56,16 +58,22 @@ import io.aiven.inkless.control_plane.CommitBatchRequest; import io.aiven.inkless.control_plane.ControlPlane; import io.aiven.inkless.control_plane.ControlPlaneException; +import io.aiven.inkless.generated.CacheKey; +import io.aiven.inkless.generated.FileExtent; import io.aiven.inkless.storage_backend.common.StorageBackend; import io.aiven.inkless.storage_backend.common.StorageBackendException; +import static java.util.concurrent.TimeUnit.SECONDS; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.awaitility.Awaitility.await; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.inOrder; import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; @@ -108,30 +116,38 @@ public ObjectKey create(String value) { @Mock Time time; @Mock - ExecutorService executorServiceUpload; - @Mock - ExecutorService executorServiceCommit; - @Mock - ExecutorService executorServiceCacheStore; - @Mock FileCommitterMetrics metrics; - @Captor - ArgumentCaptor> uploadCallableCaptor; - @Captor - ArgumentCaptor commitRunnableCaptor; + // Use real executors that run tasks immediately for testing async flow + private ExecutorService executorServiceUpload; + private ExecutorService executorServiceCommit; + private ExecutorService executorServiceCacheStore; + + @BeforeEach + void setUp() { + // Single-threaded real executors for deterministic ordering in tests. + // Tasks still execute asynchronously on background threads, but one at a time per executor. + executorServiceUpload = Executors.newSingleThreadExecutor(); + executorServiceCommit = Executors.newSingleThreadExecutor(); + executorServiceCacheStore = Executors.newSingleThreadExecutor(); + } + + @AfterEach + void tearDown() { + executorServiceUpload.shutdownNow(); + executorServiceCommit.shutdownNow(); + executorServiceCacheStore.shutdownNow(); + } @Test - @SuppressWarnings("unchecked") void success() throws Exception { doNothing() .when(storage).upload(eq(OBJECT_KEY), any(InputStream.class), eq((long) FILE.data().length)); + when(controlPlane.commitFile(any(), any(), anyInt(), anyLong(), any())) + .thenReturn(List.of()); when(time.nanoseconds()).thenReturn(10_000_000L); - - final CompletableFuture uploadFuture = CompletableFuture.completedFuture(OBJECT_KEY); - when(executorServiceUpload.submit(any(Callable.class))) - .thenReturn(uploadFuture); + when(time.milliseconds()).thenReturn(10L); final FileCommitter committer = new FileCommitter( BROKER_ID, controlPlane, OBJECT_KEY_CREATOR, storage, @@ -148,39 +164,24 @@ void success() throws Exception { committer.commit(FILE); - assertThat(committer.totalFilesInProgress()).isOne(); - assertThat(committer.totalBytesInProgress()).isEqualTo(FILE.data().length); - - verify(executorServiceUpload).submit(uploadCallableCaptor.capture()); - final Callable uploadCallable = uploadCallableCaptor.getValue(); - - uploadCallable.call(); - - verify(executorServiceCommit).execute(commitRunnableCaptor.capture()); - final Runnable commitRunnable = commitRunnableCaptor.getValue(); - - commitRunnable.run(); - - assertThat(committer.totalFilesInProgress()).isZero(); - assertThat(committer.totalBytesInProgress()).isZero(); - - verify(metrics).fileAdded(eq(FILE.size())); - verify(metrics).fileUploadFinished(eq(0L)); - verify(metrics).fileCommitFinished(eq(0L)); - verify(metrics).fileFinished(eq(Instant.EPOCH), eq(Instant.ofEpochMilli(10L))); + // Wait for async operations to complete - writeCompleted is the final callback + await().atMost(5, SECONDS).untilAsserted(() -> { + verify(metrics).fileAdded(eq(FILE.size())); + verify(metrics).fileUploadFinished(anyLong()); + verify(metrics).fileCommitFinished(anyLong()); + verify(metrics).fileCommitWaitFinished(anyLong()); + verify(metrics).fileFinished(eq(Instant.EPOCH), any()); + verify(metrics).writeCompleted(); + }); } @Test - @SuppressWarnings("unchecked") void commitFailed() throws Exception { doNothing() .when(storage).upload(eq(OBJECT_KEY), any(InputStream.class), eq((long) FILE.data().length)); when(time.nanoseconds()).thenReturn(10_000_000L); - - final CompletableFuture uploadFuture = CompletableFuture.completedFuture(OBJECT_KEY); - when(executorServiceUpload.submit(any(Callable.class))) - .thenReturn(uploadFuture); + when(time.milliseconds()).thenReturn(10L); when(controlPlane.commitFile(any(), any(), anyInt(), anyLong(), any())) .thenThrow(new ControlPlaneException("error")); @@ -197,46 +198,29 @@ void commitFailed() throws Exception { committer.commit(FILE); - assertThat(committer.totalFilesInProgress()).isOne(); - assertThat(committer.totalBytesInProgress()).isEqualTo(FILE.data().length); - - verify(executorServiceUpload).submit(uploadCallableCaptor.capture()); - final Callable uploadCallable = uploadCallableCaptor.getValue(); - - uploadCallable.call(); - - verify(executorServiceCommit).execute(commitRunnableCaptor.capture()); - final Runnable commitRunnable = commitRunnableCaptor.getValue(); - - commitRunnable.run(); - - assertThat(committer.totalFilesInProgress()).isZero(); - assertThat(committer.totalBytesInProgress()).isZero(); - - verify(metrics).fileAdded(eq(FILE.size())); - verify(metrics).fileUploadFinished(eq(0L)); - verify(metrics).fileCommitFinished(eq(0L)); - verify(metrics, times(0)).fileFinished(any(), any()); - verify(metrics).fileCommitFailed(); - verify(metrics).writeFailed(); + // Wait for async operations to complete - writeFailed is the final callback + await().atMost(5, SECONDS).untilAsserted(() -> { + verify(metrics).fileAdded(eq(FILE.size())); + verify(metrics).fileUploadFinished(anyLong()); + verify(metrics).fileCommitFinished(anyLong()); + verify(metrics, times(0)).fileFinished(any(), any()); + verify(metrics).fileCommitFailed(); + verify(metrics).writeFailed(); + }); } @Test - @SuppressWarnings("unchecked") void uploadFailed() throws Exception { - doNothing() + doThrow(new StorageBackendException("test")) .when(storage).upload(eq(OBJECT_KEY), any(InputStream.class), eq((long) FILE.data().length)); when(time.nanoseconds()).thenReturn(10_000_000L); - - final CompletableFuture uploadFuture = CompletableFuture.failedFuture(new StorageBackendException("test")); - when(executorServiceUpload.submit(any(Callable.class))) - .thenReturn(uploadFuture); + when(time.milliseconds()).thenReturn(10L); final FileCommitter committer = new FileCommitter( BROKER_ID, controlPlane, OBJECT_KEY_CREATOR, storage, KEY_ALIGNMENT_STRATEGY, OBJECT_CACHE, BATCH_COORDINATE_CACHE, time, - 3, Duration.ofMillis(100), + 1, Duration.ofMillis(100), executorServiceUpload, executorServiceCommit, executorServiceCacheStore, metrics); @@ -245,59 +229,57 @@ void uploadFailed() throws Exception { committer.commit(FILE); - assertThat(committer.totalFilesInProgress()).isOne(); - assertThat(committer.totalBytesInProgress()).isEqualTo(FILE.data().length); - - verify(executorServiceUpload).submit(uploadCallableCaptor.capture()); - final Callable uploadCallable = uploadCallableCaptor.getValue(); - - uploadCallable.call(); - - verify(executorServiceCommit).execute(commitRunnableCaptor.capture()); - final Runnable commitRunnable = commitRunnableCaptor.getValue(); - - commitRunnable.run(); - - assertThat(committer.totalFilesInProgress()).isZero(); - assertThat(committer.totalBytesInProgress()).isZero(); - - verify(metrics).fileAdded(eq(FILE.size())); - verify(metrics).fileUploadFinished(eq(0L)); - verify(metrics).fileCommitFinished(eq(0L)); - verify(metrics, times(0)).fileFinished(any(), any()); - verify(metrics).fileUploadFailed(); - verify(metrics).writeFailed(); + // Wait for async operations to complete - writeFailed is the final callback + await().atMost(5, SECONDS).untilAsserted(() -> { + verify(metrics).fileAdded(eq(FILE.size())); + verify(metrics).fileUploadFinished(anyLong()); + // fileCommitFinished is NOT called when upload fails because thenApplyAsync + // doesn't run the commit job on failure + verify(metrics, times(0)).fileCommitFinished(anyLong()); + verify(metrics, times(0)).fileFinished(any(), any()); + verify(metrics).fileUploadFailed(); + verify(metrics).writeFailed(); + }); } @Test void closeGraceful() throws IOException, InterruptedException { - when(executorServiceCommit.awaitTermination(30, TimeUnit.SECONDS)).thenReturn(true); + // Use mock executors for this test since we're testing shutdown behavior + ExecutorService mockUpload = org.mockito.Mockito.mock(ExecutorService.class); + ExecutorService mockCommit = org.mockito.Mockito.mock(ExecutorService.class); + ExecutorService mockCacheStore = org.mockito.Mockito.mock(ExecutorService.class); + + when(mockCommit.awaitTermination(30, TimeUnit.SECONDS)).thenReturn(true); final FileCommitter committer = new FileCommitter( BROKER_ID, controlPlane, OBJECT_KEY_CREATOR, storage, KEY_ALIGNMENT_STRATEGY, OBJECT_CACHE, BATCH_COORDINATE_CACHE, time, 3, Duration.ofMillis(100), - executorServiceUpload, executorServiceCommit, executorServiceCacheStore, metrics); + mockUpload, mockCommit, mockCacheStore, metrics); committer.close(); // Upload pool rejects new work immediately. - verify(executorServiceUpload).shutdown(); + verify(mockUpload).shutdown(); // Cache is best-effort, cancelled immediately. - verify(executorServiceCacheStore).shutdownNow(); + verify(mockCacheStore).shutdownNow(); // Commits are awaited (they internally wait for their paired uploads). - verify(executorServiceCommit).awaitTermination(30, TimeUnit.SECONDS); + verify(mockCommit).awaitTermination(30, TimeUnit.SECONDS); // Graceful termination: no force-shutdown needed on commit pool. - verify(executorServiceCommit, never()).shutdownNow(); + verify(mockCommit, never()).shutdownNow(); // Remaining uploads with no queued commit are force-stopped. - verify(executorServiceUpload).shutdownNow(); + verify(mockUpload).shutdownNow(); verify(metrics).close(); } @Test void closeCommitPoolTimesOutThenTerminates() throws IOException, InterruptedException { + ExecutorService mockUpload = org.mockito.Mockito.mock(ExecutorService.class); + ExecutorService mockCommit = org.mockito.Mockito.mock(ExecutorService.class); + ExecutorService mockCacheStore = org.mockito.Mockito.mock(ExecutorService.class); + // First await returns false (timeout), after shutdownNow second await succeeds. - when(executorServiceCommit.awaitTermination(30, TimeUnit.SECONDS)) + when(mockCommit.awaitTermination(30, TimeUnit.SECONDS)) .thenReturn(false) .thenReturn(true); @@ -305,25 +287,29 @@ void closeCommitPoolTimesOutThenTerminates() throws IOException, InterruptedExce BROKER_ID, controlPlane, OBJECT_KEY_CREATOR, storage, KEY_ALIGNMENT_STRATEGY, OBJECT_CACHE, BATCH_COORDINATE_CACHE, time, 3, Duration.ofMillis(100), - executorServiceUpload, executorServiceCommit, executorServiceCacheStore, metrics); + mockUpload, mockCommit, mockCacheStore, metrics); committer.close(); - final InOrder commitOrder = inOrder(executorServiceCommit); + final InOrder commitOrder = inOrder(mockCommit); // Commit pool: shutdown → await → timeout → shutdownNow → await again. - commitOrder.verify(executorServiceCommit).shutdown(); - commitOrder.verify(executorServiceCommit).awaitTermination(30, TimeUnit.SECONDS); - commitOrder.verify(executorServiceCommit).shutdownNow(); - commitOrder.verify(executorServiceCommit).awaitTermination(30, TimeUnit.SECONDS); + commitOrder.verify(mockCommit).shutdown(); + commitOrder.verify(mockCommit).awaitTermination(30, TimeUnit.SECONDS); + commitOrder.verify(mockCommit).shutdownNow(); + commitOrder.verify(mockCommit).awaitTermination(30, TimeUnit.SECONDS); - verify(executorServiceUpload).shutdownNow(); + verify(mockUpload).shutdownNow(); verify(metrics).close(); } @Test void closeCommitPoolNeverTerminates() throws IOException, InterruptedException { + ExecutorService mockUpload = org.mockito.Mockito.mock(ExecutorService.class); + ExecutorService mockCommit = org.mockito.Mockito.mock(ExecutorService.class); + ExecutorService mockCacheStore = org.mockito.Mockito.mock(ExecutorService.class); + // Both awaits return false — pool never terminates. - when(executorServiceCommit.awaitTermination(30, TimeUnit.SECONDS)) + when(mockCommit.awaitTermination(30, TimeUnit.SECONDS)) .thenReturn(false) .thenReturn(false); @@ -331,33 +317,37 @@ void closeCommitPoolNeverTerminates() throws IOException, InterruptedException { BROKER_ID, controlPlane, OBJECT_KEY_CREATOR, storage, KEY_ALIGNMENT_STRATEGY, OBJECT_CACHE, BATCH_COORDINATE_CACHE, time, 3, Duration.ofMillis(100), - executorServiceUpload, executorServiceCommit, executorServiceCacheStore, metrics); + mockUpload, mockCommit, mockCacheStore, metrics); committer.close(); - verify(executorServiceCommit).shutdownNow(); + verify(mockCommit).shutdownNow(); // Cleanup still completes despite commit pool not terminating. - verify(executorServiceUpload).shutdownNow(); + verify(mockUpload).shutdownNow(); verify(metrics).close(); } @Test void closeInterruptedDuringAwait() throws IOException, InterruptedException { - when(executorServiceCommit.awaitTermination(30, TimeUnit.SECONDS)) + ExecutorService mockUpload = org.mockito.Mockito.mock(ExecutorService.class); + ExecutorService mockCommit = org.mockito.Mockito.mock(ExecutorService.class); + ExecutorService mockCacheStore = org.mockito.Mockito.mock(ExecutorService.class); + + when(mockCommit.awaitTermination(30, TimeUnit.SECONDS)) .thenThrow(new InterruptedException("shutdown interrupted")); final FileCommitter committer = new FileCommitter( BROKER_ID, controlPlane, OBJECT_KEY_CREATOR, storage, KEY_ALIGNMENT_STRATEGY, OBJECT_CACHE, BATCH_COORDINATE_CACHE, time, 3, Duration.ofMillis(100), - executorServiceUpload, executorServiceCommit, executorServiceCacheStore, metrics); + mockUpload, mockCommit, mockCacheStore, metrics); committer.close(); // On interrupt: commit pool is force-shutdown. - verify(executorServiceCommit).shutdownNow(); + verify(mockCommit).shutdownNow(); // Rest of cleanup still runs. - verify(executorServiceUpload).shutdownNow(); + verify(mockUpload).shutdownNow(); verify(metrics).close(); // Interrupt flag is preserved. assertThat(Thread.interrupted()).isTrue(); @@ -479,4 +469,305 @@ void commitNull() { .isInstanceOf(NullPointerException.class) .hasMessage("file cannot be null"); } + + @Test + void commitsInSubmissionOrder() throws Exception { + // Commits are guaranteed to happen in submission order even if upload2 completes before upload1. + // This is critical for correct offset assignment. + + final CountDownLatch upload1Started = new CountDownLatch(1); + final CountDownLatch upload2Completed = new CountDownLatch(1); + final List commitOrder = new ArrayList<>(); + + // File 1 upload will wait until file 2 upload completes + doAnswer(invocation -> { + upload1Started.countDown(); + upload2Completed.await(); + Thread.sleep(50); // Ensure upload2 definitely completes first + return null; + }).doAnswer(invocation -> { + upload2Completed.countDown(); + return null; + }).when(storage).upload(any(), any(InputStream.class), anyLong()); + + // Track commit order by capturing the requestId from the CommitBatchRequest + when(controlPlane.commitFile(any(), any(), anyInt(), anyLong(), any())) + .thenAnswer(invocation -> { + @SuppressWarnings("unchecked") + List requests = (List) invocation.getArgument(4); + synchronized (commitOrder) { + // Record which file committed (by its requestId) + commitOrder.add(requests.get(0).requestId()); + } + return List.of(); + }); + + when(time.nanoseconds()).thenReturn(10_000_000L); + when(time.milliseconds()).thenReturn(10L); + + ExecutorService multiThreadUpload = Executors.newFixedThreadPool(2); + + try { + final FileCommitter committer = new FileCommitter( + BROKER_ID, controlPlane, OBJECT_KEY_CREATOR, storage, + KEY_ALIGNMENT_STRATEGY, OBJECT_CACHE, BATCH_COORDINATE_CACHE, time, + 1, Duration.ofMillis(100), + multiThreadUpload, executorServiceCommit, executorServiceCacheStore, + metrics); + + final ClosedFile file1 = new ClosedFile( + Instant.EPOCH, + Map.of(1, Map.of(TID0P0, MemoryRecords.EMPTY)), + Map.of(1, new CompletableFuture<>()), + List.of(CommitBatchRequest.of(1, TID0P0, 0, 0, 0, 0, 0, TimestampType.CREATE_TIME)), + Map.of(), + new byte[10]); + + final ClosedFile file2 = new ClosedFile( + Instant.EPOCH, + Map.of(2, Map.of(TID0P0, MemoryRecords.EMPTY)), + Map.of(2, new CompletableFuture<>()), + List.of(CommitBatchRequest.of(2, TID0P0, 0, 0, 0, 0, 0, TimestampType.CREATE_TIME)), + Map.of(), + new byte[10]); + + committer.commit(file1); + upload1Started.await(); + committer.commit(file2); + + await().atMost(5, SECONDS).untilAsserted(() -> { + synchronized (commitOrder) { + assertThat(commitOrder).hasSize(2); + } + }); + + // Verify submission order is maintained + synchronized (commitOrder) { + assertThat(commitOrder).containsExactly(1, 2); + } + } finally { + multiThreadUpload.shutdownNow(); + } + } + + @Test + void failedCommitDoesNotBlockSubsequentCommits() throws Exception { + // Verify that if the first commit fails (e.g., control plane error), + // the second commit can still succeed. This ensures error isolation. + + final CountDownLatch firstCommitFailed = new CountDownLatch(1); + final CountDownLatch secondCommitStarted = new CountDownLatch(1); + + doNothing().when(storage).upload(any(), any(InputStream.class), anyLong()); + + // First commit fails with control plane exception, second succeeds + when(controlPlane.commitFile(any(), any(), anyInt(), anyLong(), any())) + .thenAnswer(invocation -> { + firstCommitFailed.countDown(); + throw new ControlPlaneException("first commit error"); + }) + .thenAnswer(invocation -> { + secondCommitStarted.countDown(); + return List.of(); + }); + + when(time.nanoseconds()).thenReturn(10_000_000L); + when(time.milliseconds()).thenReturn(10L); + + final FileCommitter committer = new FileCommitter( + BROKER_ID, controlPlane, OBJECT_KEY_CREATOR, storage, + KEY_ALIGNMENT_STRATEGY, OBJECT_CACHE, BATCH_COORDINATE_CACHE, time, + 1, Duration.ofMillis(100), + executorServiceUpload, executorServiceCommit, executorServiceCacheStore, + metrics); + + final ClosedFile file1 = new ClosedFile( + Instant.EPOCH, + Map.of(1, Map.of(TID0P0, MemoryRecords.EMPTY)), + Map.of(1, new CompletableFuture<>()), + List.of(CommitBatchRequest.of(1, TID0P0, 0, 0, 0, 0, 0, TimestampType.CREATE_TIME)), + Map.of(), + new byte[10]); + + final ClosedFile file2 = new ClosedFile( + Instant.EPOCH, + Map.of(2, Map.of(TID0P0, MemoryRecords.EMPTY)), + Map.of(2, new CompletableFuture<>()), + List.of(CommitBatchRequest.of(2, TID0P0, 0, 0, 0, 0, 0, TimestampType.CREATE_TIME)), + Map.of(), + new byte[10]); + + committer.commit(file1); + committer.commit(file2); + + // Wait for both to complete + await().atMost(5, SECONDS).untilAsserted(() -> { + verify(metrics).writeFailed(); // first commit failed + verify(metrics).writeCompleted(); // second commit succeeded + }); + + // Verify the second commit was actually attempted + assertThat(secondCommitStarted.await(1, SECONDS)).isTrue(); + } + + @Test + void asyncModeCloseWhileUploadInFlightStillCommits() throws Exception { + // Verify that in async mode, calling close() while an upload is still in-flight + // does not cause the commit to be rejected - the close() method awaits pending work. + + final CountDownLatch uploadStarted = new CountDownLatch(1); + final CountDownLatch closeStarted = new CountDownLatch(1); + final CountDownLatch commitCompleted = new CountDownLatch(1); + + // Upload will block until we signal it to proceed after close() starts + doAnswer(invocation -> { + uploadStarted.countDown(); + // Wait for close() to be called + closeStarted.await(); + // Give close() a moment to start waiting for pending futures + Thread.sleep(100); + return null; + }).when(storage).upload(any(), any(InputStream.class), anyLong()); + + when(controlPlane.commitFile(any(), any(), anyInt(), anyLong(), any())) + .thenAnswer(invocation -> { + commitCompleted.countDown(); + return List.of(); + }); + + when(time.nanoseconds()).thenReturn(10_000_000L); + when(time.milliseconds()).thenReturn(10L); + + final FileCommitter committer = new FileCommitter( + BROKER_ID, controlPlane, OBJECT_KEY_CREATOR, storage, + KEY_ALIGNMENT_STRATEGY, OBJECT_CACHE, BATCH_COORDINATE_CACHE, time, + 1, Duration.ofMillis(100), + executorServiceUpload, executorServiceCommit, executorServiceCacheStore, + metrics); // async mode + + // Submit a file for commit + committer.commit(FILE); + + // Wait for upload to start + uploadStarted.await(); + + // Start close() on a separate thread - it should wait for the pending commit + CompletableFuture closeFuture = CompletableFuture.runAsync(() -> { + closeStarted.countDown(); + try { + committer.close(); + } catch (IOException e) { + throw new RuntimeException(e); + } + }); + + // Wait for close to complete - it should wait for the commit + closeFuture.get(10, SECONDS); + + // Verify the commit actually happened despite close() being called during upload + assertThat(commitCompleted.await(1, SECONDS)).isTrue(); + verify(controlPlane).commitFile(any(), any(), anyInt(), anyLong(), any()); + verify(metrics).writeCompleted(); + } + + @Test + void cacheStoreRunsAfterUploadCompletesNotAfterCommit() throws Exception { + // Verify that cache store is triggered by upload completion, not commit completion. + // This ensures cache store starts as soon as upload is done, without waiting for + // previous commits to finish. + + final CountDownLatch upload1Completed = new CountDownLatch(1); + final CountDownLatch cacheStore1Started = new CountDownLatch(1); + final CountDownLatch commit1Started = new CountDownLatch(1); + final CountDownLatch allowCommit1ToComplete = new CountDownLatch(1); + + // Track whether cache store started before commit completed + final java.util.concurrent.atomic.AtomicBoolean cacheStoreStartedBeforeCommitCompleted = + new java.util.concurrent.atomic.AtomicBoolean(false); + + doAnswer(invocation -> { + upload1Completed.countDown(); + return null; + }).when(storage).upload(any(), any(InputStream.class), anyLong()); + + when(controlPlane.commitFile(any(), any(), anyInt(), anyLong(), any())) + .thenAnswer(invocation -> { + commit1Started.countDown(); + // Block the commit + allowCommit1ToComplete.await(); + return List.of(); + }); + + when(time.nanoseconds()).thenReturn(10_000_000L); + when(time.milliseconds()).thenReturn(10L); + + // Use a custom ObjectCache that tracks when put is called + final ObjectCache trackingCache = new ObjectCache() { + @Override + public FileExtent get(CacheKey key) { + return null; + } + + @Override + public void put(CacheKey key, FileExtent value) { + // Check if commit has completed yet + if (allowCommit1ToComplete.getCount() > 0) { + // Commit hasn't been signaled to complete, so cache store started before commit finished + cacheStoreStartedBeforeCommitCompleted.set(true); + } + cacheStore1Started.countDown(); + } + + @Override + public CompletableFuture computeIfAbsent( + CacheKey key, java.util.function.Function load, java.util.concurrent.Executor loadExecutor) { + return CompletableFuture.supplyAsync(() -> load.apply(key), loadExecutor); + } + + @Override + public boolean remove(CacheKey key) { + return false; + } + + @Override + public long size() { + return 0; + } + + @Override + public void close() { + } + }; + + final FileCommitter committer = new FileCommitter( + BROKER_ID, controlPlane, OBJECT_KEY_CREATOR, storage, + KEY_ALIGNMENT_STRATEGY, trackingCache, BATCH_COORDINATE_CACHE, time, + 1, Duration.ofMillis(100), + executorServiceUpload, executorServiceCommit, executorServiceCacheStore, + metrics); + + committer.commit(FILE); + + // Wait for upload to complete + assertThat(upload1Completed.await(5, SECONDS)).isTrue(); + + // Cache store should start after upload, even while commit is blocked + assertThat(cacheStore1Started.await(5, SECONDS)) + .as("Cache store should start after upload completes") + .isTrue(); + + // Verify cache store started while commit was still blocked + assertThat(cacheStoreStartedBeforeCommitCompleted.get()) + .as("Cache store should run in parallel with commit, not after it") + .isTrue(); + + // Allow commit to complete + allowCommit1ToComplete.countDown(); + + // Wait for completion + await().atMost(5, SECONDS).untilAsserted(() -> { + verify(metrics).writeCompleted(); + }); + } + } diff --git a/storage/inkless/src/test/java/io/aiven/inkless/produce/WriterPropertyTest.java b/storage/inkless/src/test/java/io/aiven/inkless/produce/WriterPropertyTest.java index 40ea5af8ae8..a296c934a25 100644 --- a/storage/inkless/src/test/java/io/aiven/inkless/produce/WriterPropertyTest.java +++ b/storage/inkless/src/test/java/io/aiven/inkless/produce/WriterPropertyTest.java @@ -203,28 +203,14 @@ void test(final int requestCount, Arbitraries.longs().between(uploadDurationAvg - 5, uploadDurationAvg + 5)) ); final CommitterHandler committerHandler = new CommitterHandler( - uploaderHandler, - new MockExecutorServiceWithFutureSupport(), - new Timer("commit", - time, - Instant.ofEpochMilli(time.milliseconds()), - Arbitraries.longs().between(commitDurationAvg - 2, commitDurationAvg + 2)) + new MockExecutorServiceWithFutureSupport() ); final CompleterHandler completerHandler = new CompleterHandler( committerHandler, - new MockExecutorServiceWithFutureSupport(), - new Timer("complete", - time, - Instant.ofEpochMilli(time.milliseconds()), - Arbitraries.longs().between(completeDurationAvg - 2, completeDurationAvg + 2)) + new MockExecutorService() ); final CacheStoreHandler cacheStoreHandler = new CacheStoreHandler( - uploaderHandler, - new MockExecutorServiceWithFutureSupport(), - new Timer("cacheStore", - time, - Instant.ofEpochMilli(time.milliseconds()), - Arbitraries.longs().between(cacheStoreDurationAvg - 2, cacheStoreDurationAvg + 2)) + new MockExecutorService() ); try(final FileCommitter fileCommitter = new FileCommitter( 11, @@ -281,9 +267,17 @@ void test(final int requestCount, requester.handleFinishedRequests(); commitTicker.maybeTick(); uploaderHandler.maybeRunNext(); + // In async mode, upload callbacks add commit/cache tasks. + // Process them immediately without waiting for timer. committerHandler.maybeRunNext(); + while (!committerHandler.executorService.queue.isEmpty()) { + committerHandler.executorService.runNextIfExists(); + } completerHandler.maybeRunNext(); cacheStoreHandler.maybeRunNext(); + while (!cacheStoreHandler.executorService.queue.isEmpty()) { + cacheStoreHandler.executorService.runNextIfExists(); + } time.sleep(1); } assertThat(finished).withFailMessage(String.format("Not finished in %d virtual ms", maxTime)).isTrue(); @@ -574,10 +568,11 @@ public Future submit(final Callable task) { @Override boolean runNextIfExists() throws InterruptedException { - assertThat(returnedFutures.size()).isEqualTo(queue.size()); + // In async mode, callbacks can add tasks during execution, making sizes unbalanced final boolean result = super.runNextIfExists(); if (result) { - assert returnedFutures.take().isDone(); + final Future completed = returnedFutures.take(); + assertThat(completed.isDone()).isTrue(); } return result; } @@ -607,32 +602,20 @@ boolean oldestFutureIsDone() { } private static class CommitterHandler { - private final UploaderHandler uploaderHandler; private final MockExecutorServiceWithFutureSupport executorService; - private final Timer timer; - private CommitterHandler(final UploaderHandler uploaderHandler, - final MockExecutorServiceWithFutureSupport executorService, - final Timer timer) { - this.uploaderHandler = uploaderHandler; + private CommitterHandler(final MockExecutorServiceWithFutureSupport executorService) { this.executorService = executorService; - this.timer = timer; } void maybeRunNext() throws InterruptedException { - if (!timer.happensNow()) { - return; - } - if (!uploaderHandler.oldestFutureIsDone()) { - // Otherwise it'd block indefinitely. - return; - } + // In async mode, run immediately - no timer delay since tasks are + // only queued via callbacks when uploads complete executorService.runNextIfExists(); } boolean oldestFutureIsDone() { - return uploaderHandler.oldestFutureIsDone() - && Optional.ofNullable(executorService.returnedFutures.peek()) + return Optional.ofNullable(executorService.returnedFutures.peek()) .map(Future::isDone) .orElse(true); } @@ -641,22 +624,15 @@ boolean oldestFutureIsDone() { private static class CompleterHandler { private final CommitterHandler committerHandler; private final MockExecutorService executorService; - private final Timer timer; private CompleterHandler(final CommitterHandler committerHandler, - final MockExecutorService executorService, - final Timer timer) { + final MockExecutorService executorService) { this.committerHandler = committerHandler; this.executorService = executorService; - this.timer = timer; } void maybeRunNext() throws InterruptedException { - if (!timer.happensNow()) { - return; - } if (!committerHandler.oldestFutureIsDone()) { - // Otherwise it'd block indefinitely. return; } executorService.runNextIfExists(); @@ -664,26 +640,14 @@ void maybeRunNext() throws InterruptedException { } private static class CacheStoreHandler { - private final UploaderHandler uploaderHandler; private final MockExecutorService executorService; - private final Timer timer; - private CacheStoreHandler(final UploaderHandler uploaderHandler, - final MockExecutorService executorService, - final Timer timer) { - this.uploaderHandler = uploaderHandler; + private CacheStoreHandler(final MockExecutorService executorService) { this.executorService = executorService; - this.timer = timer; } void maybeRunNext() throws InterruptedException { - if (!timer.happensNow()) { - return; - } - if (!uploaderHandler.oldestFutureIsDone()) { - // Otherwise it'd block indefinitely. - return; - } + // In async mode, run immediately - no timer delay executorService.runNextIfExists(); } }