diff --git a/docs/inkless/configs.rst b/docs/inkless/configs.rst index 5c5e9b487c8..86de9ed18bf 100644 --- a/docs/inkless/configs.rst +++ b/docs/inkless/configs.rst @@ -71,8 +71,23 @@ Under ``inkless.`` * Valid Values: [1,...] * Importance: medium +``produce.pipelined.enabled`` + Whether to use the pipelined writer instead of the lock-based writer. The pipelined writer uses a SEDA architecture to eliminate lock contention: validation is parallelized across N worker threads, and buffer writing is handled by a single dedicated thread. This eliminates the global writer lock bottleneck. + + * Type: boolean + * Default: false + * Importance: medium + +``produce.pipelined.validation.threads`` + Number of validation worker threads for the pipelined writer. These threads perform CPU-intensive validation work (CRC validation, size checks, offset assignment) in parallel. A value of 0 means auto-detect (uses available processors). + + * Type: int + * Default: 0 + * Valid Values: [0,...] + * Importance: medium + ``produce.upload.backoff.ms`` - The number of millisecond to back off for before the next upload attempt. + The number of milliseconds to back off for before the next upload attempt. * Type: int * Default: 10 diff --git a/storage/inkless/src/main/java/io/aiven/inkless/config/InklessConfig.java b/storage/inkless/src/main/java/io/aiven/inkless/config/InklessConfig.java index 12f08fc7f86..704db7e1b44 100644 --- a/storage/inkless/src/main/java/io/aiven/inkless/config/InklessConfig.java +++ b/storage/inkless/src/main/java/io/aiven/inkless/config/InklessConfig.java @@ -64,9 +64,24 @@ public class InklessConfig extends AbstractConfig { private static final int PRODUCE_MAX_UPLOAD_ATTEMPTS_DEFAULT = 3; public static final String PRODUCE_UPLOAD_BACKOFF_MS_CONFIG = PRODUCE_PREFIX + "upload.backoff.ms"; - private static final String PRODUCE_UPLOAD_BACKOFF_MS_DOC = "The number of millisecond to back off for before the next upload attempt."; + private static final String PRODUCE_UPLOAD_BACKOFF_MS_DOC = "The number of milliseconds to back off for before the next upload attempt."; private static final int PRODUCE_UPLOAD_BACKOFF_MS_DEFAULT = 10; + public static final String PRODUCE_PIPELINED_PREFIX = PRODUCE_PREFIX + "pipelined."; + + public static final String PRODUCE_PIPELINED_ENABLED_CONFIG = PRODUCE_PIPELINED_PREFIX + "enabled"; + private static final String PRODUCE_PIPELINED_ENABLED_DOC = "Whether to use the pipelined writer instead of the lock-based writer. " + + "The pipelined writer uses a SEDA architecture to eliminate lock contention: " + + "validation is parallelized across N worker threads, and buffer writing is handled by a single dedicated thread. " + + "This eliminates the global writer lock bottleneck."; + private static final boolean PRODUCE_PIPELINED_ENABLED_DEFAULT = false; + + public static final String PRODUCE_PIPELINED_VALIDATION_THREADS_CONFIG = PRODUCE_PIPELINED_PREFIX + "validation.threads"; + private static final String PRODUCE_PIPELINED_VALIDATION_THREADS_DOC = "Number of validation worker threads for the pipelined writer. " + + "These threads perform CPU-intensive validation work (CRC validation, size checks, offset assignment) in parallel. " + + "A value of 0 means auto-detect (uses available processors)."; + private static final int PRODUCE_PIPELINED_VALIDATION_THREADS_DEFAULT = 0; + public static final String STORAGE_PREFIX = "storage."; public static final String STORAGE_BACKEND_CLASS_CONFIG = STORAGE_PREFIX + "backend.class"; @@ -261,6 +276,23 @@ public static ConfigDef configDef() { PRODUCE_UPLOAD_BACKOFF_MS_DOC ); + configDef.define( + PRODUCE_PIPELINED_ENABLED_CONFIG, + ConfigDef.Type.BOOLEAN, + PRODUCE_PIPELINED_ENABLED_DEFAULT, + ConfigDef.Importance.MEDIUM, + PRODUCE_PIPELINED_ENABLED_DOC + ); + + configDef.define( + PRODUCE_PIPELINED_VALIDATION_THREADS_CONFIG, + ConfigDef.Type.INT, + PRODUCE_PIPELINED_VALIDATION_THREADS_DEFAULT, + ConfigDef.Range.atLeast(0), + ConfigDef.Importance.MEDIUM, + PRODUCE_PIPELINED_VALIDATION_THREADS_DOC + ); + configDef.define( STORAGE_BACKEND_CLASS_CONFIG, ConfigDef.Type.CLASS, @@ -544,6 +576,14 @@ public Duration produceUploadBackoff() { return Duration.ofMillis(getInt(PRODUCE_UPLOAD_BACKOFF_MS_CONFIG)); } + public boolean producePipelinedEnabled() { + return getBoolean(PRODUCE_PIPELINED_ENABLED_CONFIG); + } + + public int producePipelinedValidationThreads() { + return getInt(PRODUCE_PIPELINED_VALIDATION_THREADS_CONFIG); + } + public int fetchCacheBlockBytes() { return getInt(CONSUME_CACHE_BLOCK_BYTES_CONFIG); } diff --git a/storage/inkless/src/main/java/io/aiven/inkless/produce/ActiveFile.java b/storage/inkless/src/main/java/io/aiven/inkless/produce/ActiveFile.java index 1f02f23a61d..f2c1ac2f72c 100644 --- a/storage/inkless/src/main/java/io/aiven/inkless/produce/ActiveFile.java +++ b/storage/inkless/src/main/java/io/aiven/inkless/produce/ActiveFile.java @@ -25,6 +25,7 @@ import org.apache.kafka.common.errors.RecordTooLargeException; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.record.MemoryRecords; +import org.apache.kafka.common.record.MutableRecordBatch; import org.apache.kafka.common.record.RecordBatch; import org.apache.kafka.common.requests.ProduceResponse; import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse; @@ -189,6 +190,59 @@ int size() { return buffer.totalSize(); } + /** + * Adds a pre-validated batch directly to the buffer. + * + *

This method is used by {@link PipelinedWriter} where validation is done + * in a separate stage (validation workers) before reaching the buffer writer. + * The batch is assumed to be already validated, so this method only adds it + * to the buffer without re-validation. + * + * @param topicIdPartition the partition for the batch + * @param recordBatch the pre-validated record batch + * @param requestId the request ID for tracking + */ + void addBatchDirect( + final TopicIdPartition topicIdPartition, + final MutableRecordBatch recordBatch, + final int requestId + ) { + if (start == null) { + start = TimeUtils.durationMeasurementNow(time); + } + buffer.addBatch(topicIdPartition, recordBatch, requestId); + } + + /** + * Registers a future to be completed when the file is committed. + * + *

This method is used by {@link PipelinedWriter} to store the result future + * along with the original request data. The future will be completed by + * {@link AppendCompleter} when the file commit succeeds or fails. + * + * @param requestId the request ID + * @param resultFuture the future to complete with the partition responses + * @param originalRecords the original records from the produce request + * @param invalidBatches the batches that failed validation (already completed with errors) + */ + void addAwaitingFuture( + final int requestId, + final CompletableFuture> resultFuture, + final Map originalRecords, + final Map invalidBatches + ) { + // Request IDs are monotonically increasing from the single-threaded buffer writer + if (requestId <= this.requestId) { + LOGGER.warn("Unexpected request ID order: received {} but current is {}", requestId, this.requestId); + } + this.requestId = requestId; + originalRequests.put(requestId, originalRecords); + awaitingFuturesByRequest.put(requestId, resultFuture); + if (!invalidBatches.isEmpty()) { + invalidBatchesByRequest.put(requestId, invalidBatches); + } + } + ClosedFile close() { final BatchBuffer.CloseResult closeResult = buffer.close(); return new ClosedFile( diff --git a/storage/inkless/src/main/java/io/aiven/inkless/produce/AppendHandler.java b/storage/inkless/src/main/java/io/aiven/inkless/produce/AppendHandler.java index c54670709eb..1ff84014c46 100644 --- a/storage/inkless/src/main/java/io/aiven/inkless/produce/AppendHandler.java +++ b/storage/inkless/src/main/java/io/aiven/inkless/produce/AppendHandler.java @@ -45,14 +45,42 @@ public class AppendHandler implements Closeable { private static final Logger LOGGER = LoggerFactory.getLogger(AppendHandler.class); private final SharedState state; - private final Writer writer; + private final ProduceWriter writer; @DoNotMutate @CoverageIgnore public AppendHandler(final SharedState state) { - this( - state, - new Writer( + this(state, createWriter(state)); + } + + @DoNotMutate + @CoverageIgnore + private static ProduceWriter createWriter(final SharedState state) { + if (state.config().producePipelinedEnabled()) { + LOGGER.info("Using PipelinedWriter with {} validation workers", + state.config().producePipelinedValidationThreads() > 0 + ? state.config().producePipelinedValidationThreads() + : "auto-detected"); + return new PipelinedWriter( + state.time(), + state.brokerId(), + state.objectKeyCreator(), + state.buildStorage(), + state.keyAlignmentStrategy(), + state.cache(), + state.batchCoordinateCache(), + state.controlPlane(), + state.config().commitInterval(), + state.config().produceBufferMaxBytes(), + state.config().produceMaxUploadAttempts(), + state.config().produceUploadBackoff(), + state.config().produceUploadThreadPoolSize(), + state.brokerTopicStats(), + state.config().producePipelinedValidationThreads() + ); + } else { + LOGGER.info("Using lock-based Writer"); + return new Writer( state.time(), state.brokerId(), state.objectKeyCreator(), @@ -67,13 +95,13 @@ public AppendHandler(final SharedState state) { state.config().produceUploadBackoff(), state.config().produceUploadThreadPoolSize(), state.brokerTopicStats() - ) - ); + ); + } } // Visible for tests AppendHandler(final SharedState state, - final Writer writer) { + final ProduceWriter writer) { this.state = state; this.writer = writer; } diff --git a/storage/inkless/src/main/java/io/aiven/inkless/produce/PipelinedWriter.java b/storage/inkless/src/main/java/io/aiven/inkless/produce/PipelinedWriter.java new file mode 100644 index 00000000000..9dd9566b27d --- /dev/null +++ b/storage/inkless/src/main/java/io/aiven/inkless/produce/PipelinedWriter.java @@ -0,0 +1,727 @@ +/* + * Inkless + * Copyright (C) 2024 - 2025 Aiven OY + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package io.aiven.inkless.produce; + +import org.apache.kafka.common.TopicIdPartition; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.compress.Compression; +import org.apache.kafka.common.errors.CorruptRecordException; +import org.apache.kafka.common.errors.InvalidProducerEpochException; +import org.apache.kafka.common.errors.KafkaStorageException; +import org.apache.kafka.common.errors.RecordTooLargeException; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.record.MemoryRecords; +import org.apache.kafka.common.record.MutableRecordBatch; +import org.apache.kafka.common.record.RecordBatch; +import org.apache.kafka.common.record.TimestampType; +import org.apache.kafka.common.requests.ProduceResponse; +import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse; +import org.apache.kafka.common.utils.PrimitiveRef; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.server.common.RequestLocal; +import org.apache.kafka.server.record.BrokerCompressionType; +import org.apache.kafka.storage.internals.log.LogAppendInfo; +import org.apache.kafka.storage.internals.log.LogConfig; +import org.apache.kafka.storage.internals.log.LogValidator; +import org.apache.kafka.storage.internals.log.RecordValidationException; +import org.apache.kafka.storage.log.metrics.BrokerTopicStats; + +import com.groupcdg.pitest.annotations.DoNotMutate; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.time.Duration; +import java.time.Instant; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import io.aiven.inkless.TimeUtils; +import io.aiven.inkless.cache.BatchCoordinateCache; +import io.aiven.inkless.cache.KeyAlignmentStrategy; +import io.aiven.inkless.cache.ObjectCache; +import io.aiven.inkless.common.InklessThreadFactory; +import io.aiven.inkless.common.ObjectKeyCreator; +import io.aiven.inkless.control_plane.ControlPlane; +import io.aiven.inkless.storage_backend.common.StorageBackend; + +import static org.apache.kafka.storage.internals.log.UnifiedLog.UNKNOWN_OFFSET; +import static org.apache.kafka.storage.internals.log.UnifiedLog.newValidatorMetricsRecorder; + +/** + * Pipelined writer implementation using staged event-driven architecture (SEDA). + * + *

Architecture

+ *
+ * Request Handler Threads (Kafka network threads)
+ *           │
+ *           ▼
+ * ┌─────────────────────────────────────┐
+ * │     Stage 1: Validation Workers     │  ← N workers (CPU-bound, parallel)
+ * │  • CRC validation                   │
+ * │  • Size checks                      │
+ * │  • Offset assignment                │
+ * └─────────────────────────────────────┘
+ *           │
+ *           ▼
+ * ┌─────────────────────────────────────┐
+ * │        Buffer Queue (bounded)       │  ← Backpressure point
+ * └─────────────────────────────────────┘
+ *           │
+ *           ▼
+ * ┌─────────────────────────────────────┐
+ * │   Stage 2: Single Buffer Writer     │  ← 1 thread (NO LOCK needed!)
+ * │  • buffer.addBatch()                │
+ * │  • Rotation management              │
+ * │  • Complete futures                 │
+ * └─────────────────────────────────────┘
+ * 
+ * + *

Benefits

+ * + */ +class PipelinedWriter implements ProduceWriter { + private static final Logger LOGGER = LoggerFactory.getLogger(PipelinedWriter.class); + + /** + * Queue capacity for validated requests waiting for buffer writer. + * Provides ~1000 requests of buffering before backpressure kicks in. + */ + private static final int BUFFER_QUEUE_CAPACITY = 1000; + + /** + * Maximum requests to drain from queue in one batch. + * Balances latency vs throughput. + */ + private static final int MAX_BATCH_DRAIN_SIZE = 100; + + /** + * Poll timeout for buffer writer when queue is empty. + */ + private static final long BUFFER_WRITER_POLL_TIMEOUT_MS = 50; + + // Stage 1: Validation worker pool + private final ExecutorService validationExecutor; + private final int validationWorkerCount; + + // Stage 2: Single buffer writer + private final ExecutorService bufferWriterExecutor; + private final BlockingQueue bufferQueue; + + // Buffer writer state (owned exclusively by buffer writer thread) + private final Time time; + private final Duration commitInterval; + private final int maxBufferSize; + private final StorageBackend storage; + private final FileCommitter fileCommitter; + private final WriterMetrics writerMetrics; + private final BrokerTopicStats brokerTopicStats; + + // Shared state + private final AtomicBoolean closed = new AtomicBoolean(false); + + // Tick scheduling (managed by buffer writer thread) + private final ScheduledExecutorService tickScheduler; + private ScheduledFuture scheduledTick; + + // Buffer writer thread-local state + private ActiveFile activeFile; + private Instant openedAt; + private int nextRequestId = 0; + + // Metrics recorder for validation + private final LogValidator.MetricsRecorder validatorMetricsRecorder; + + /** + * Holds the result of validation, used to transfer between validateRequest() and write(). + * This avoids creating a throwaway CompletableFuture in validateRequest(). + */ + private record ValidationResult( + Map validatedBatches, + Map invalidBatches + ) {} + + @DoNotMutate + PipelinedWriter( + final Time time, + final int brokerId, + final ObjectKeyCreator objectKeyCreator, + final StorageBackend storage, + final KeyAlignmentStrategy keyAlignmentStrategy, + final ObjectCache objectCache, + final BatchCoordinateCache batchCoordinateCache, + final ControlPlane controlPlane, + final Duration commitInterval, + final int maxBufferSize, + final int maxFileUploadAttempts, + final Duration fileUploadRetryBackoff, + final int fileUploaderThreadPoolSize, + final BrokerTopicStats brokerTopicStats, + final int validationWorkerCount + ) { + this( + time, + commitInterval, + maxBufferSize, + Executors.newScheduledThreadPool(1, new InklessThreadFactory("inkless-tick-scheduler-", true)), + storage, + new FileCommitter( + brokerId, controlPlane, objectKeyCreator, storage, + keyAlignmentStrategy, objectCache, batchCoordinateCache, time, + maxFileUploadAttempts, fileUploadRetryBackoff, + fileUploaderThreadPoolSize), + new WriterMetrics(time), + brokerTopicStats, + validationWorkerCount + ); + } + + // Visible for testing + PipelinedWriter( + final Time time, + final Duration commitInterval, + final int maxBufferSize, + final ScheduledExecutorService tickScheduler, + final StorageBackend storage, + final FileCommitter fileCommitter, + final WriterMetrics writerMetrics, + final BrokerTopicStats brokerTopicStats, + final int validationWorkerCount + ) { + this.time = Objects.requireNonNull(time, "time cannot be null"); + this.commitInterval = Objects.requireNonNull(commitInterval, "commitInterval cannot be null"); + if (maxBufferSize <= 0) { + throw new IllegalArgumentException("maxBufferSize must be positive"); + } + this.maxBufferSize = maxBufferSize; + this.tickScheduler = Objects.requireNonNull(tickScheduler, "tickScheduler cannot be null"); + this.storage = Objects.requireNonNull(storage, "storage cannot be null"); + this.fileCommitter = Objects.requireNonNull(fileCommitter, "fileCommitter cannot be null"); + this.writerMetrics = Objects.requireNonNull(writerMetrics, "writerMetrics cannot be null"); + this.brokerTopicStats = Objects.requireNonNull(brokerTopicStats, "brokerTopicStats cannot be null"); + this.validationWorkerCount = validationWorkerCount > 0 ? validationWorkerCount : Runtime.getRuntime().availableProcessors(); + + // Initialize validation worker pool + this.validationExecutor = Executors.newFixedThreadPool( + this.validationWorkerCount, + new InklessThreadFactory("inkless-validator-", true) + ); + + // Initialize buffer queue (bounded for backpressure) + this.bufferQueue = new ArrayBlockingQueue<>(BUFFER_QUEUE_CAPACITY); + + // Initialize single buffer writer thread + this.bufferWriterExecutor = Executors.newSingleThreadExecutor( + new InklessThreadFactory("inkless-buffer-writer-", false) + ); + + // Initialize ActiveFile on buffer writer thread + this.activeFile = new ActiveFile(time, this.brokerTopicStats); + + // Metrics recorder (thread-safe) + this.validatorMetricsRecorder = newValidatorMetricsRecorder(this.brokerTopicStats.allTopicsStats()); + + // Start buffer writer loop + bufferWriterExecutor.submit(this::bufferWriterLoop); + + LOGGER.info("PipelinedWriter started with {} validation workers", this.validationWorkerCount); + } + + /** + * Entry point for write requests from Kafka request handler threads. + * + *

This method submits the request to the validation stage and returns immediately + * with a future that will be completed when the request is fully processed. + */ + @Override + public CompletableFuture> write( + final Map entriesPerPartition, + final Map topicConfigs, + final RequestLocal requestLocal + ) { + Objects.requireNonNull(entriesPerPartition, "entriesPerPartition cannot be null"); + Objects.requireNonNull(topicConfigs, "topicConfigs cannot be null"); + Objects.requireNonNull(requestLocal, "requestLocal cannot be null"); + + if (entriesPerPartition.isEmpty()) { + throw new IllegalArgumentException("entriesPerPartition cannot be empty"); + } + + // Verify ALL requested topics have configs (fail if ANY is missing) + if (!entriesPerPartition.keySet().stream().map(TopicIdPartition::topic).distinct().allMatch(topicConfigs::containsKey)) { + throw new IllegalArgumentException("Configs are not including all the topics requested"); + } + + if (closed.get()) { + return CompletableFuture.failedFuture(new RuntimeException("Writer already closed")); + } + + CompletableFuture> resultFuture = new CompletableFuture<>(); + + // Submit to validation stage (async, non-blocking for caller) + // The exceptionally handler ensures resultFuture is completed even if runAsync submission fails + // (e.g., RejectedExecutionException if executor is shutting down) + CompletableFuture.runAsync(() -> { + try { + // Each validation worker must use its own RequestLocal instance to + // avoid sharing thread-confined state from the request handler thread. + RequestLocal validationRequestLocal = RequestLocal.withThreadConfinedCaching(); + + // ══════════════════════════════════════════════════════════════ + // STAGE 1: Validation (runs in parallel on validation workers) + // ══════════════════════════════════════════════════════════════ + ValidationResult validated = validateRequest( + entriesPerPartition, + topicConfigs, + validationRequestLocal + ); + + // Create the validated request with the future + ValidatedRequest request = new ValidatedRequest( + entriesPerPartition, + validated.validatedBatches(), + validated.invalidBatches(), + resultFuture + ); + + // Hand off to buffer writer stage (blocks if queue full = backpressure) + if (!bufferQueue.offer(request, 30, TimeUnit.SECONDS)) { + resultFuture.completeExceptionally( + new KafkaStorageException("Buffer queue full, request timed out")); + } + + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + resultFuture.completeExceptionally(e); + } catch (Exception e) { + LOGGER.error("Validation failed", e); + resultFuture.completeExceptionally(e); + } + }, validationExecutor).exceptionally(e -> { + // Handle case where runAsync itself fails (e.g., executor shutdown) + resultFuture.completeExceptionally(e); + return null; + }); + + return resultFuture; + } + + /** + * Stage 1: Validates all batches in a request. + * + *

This method performs CPU-intensive validation work: + *

    + *
  • CRC validation
  • + *
  • Size checks
  • + *
  • Offset assignment
  • + *
  • Timestamp validation
  • + *
+ * + *

This runs on validation worker threads, NOT under any lock. + * + * @return a ValidationResult containing validated and invalid batches + */ + private ValidationResult validateRequest( + Map entriesPerPartition, + Map topicConfigs, + RequestLocal requestLocal + ) { + Map validatedBatches = new HashMap<>(); + Map invalidBatches = new HashMap<>(); + + for (var entry : entriesPerPartition.entrySet()) { + TopicIdPartition topicIdPartition = entry.getKey(); + MemoryRecords records = entry.getValue(); + LogConfig config = topicConfigs.get(topicIdPartition.topic()); + + if (config == null) { + throw new IllegalArgumentException("Config not provided for topic " + topicIdPartition.topic()); + } + + try { + // Mark metrics + brokerTopicStats.topicStats(topicIdPartition.topic()).totalProduceRequestRate().mark(); + brokerTopicStats.allTopicsStats().totalProduceRequestRate().mark(); + + // ═══════════════════════════════════════════════════════════ + // CPU-intensive validation work (NO LOCK!) + // ═══════════════════════════════════════════════════════════ + LogAppendInfo appendInfo = UnifiedLog.analyzeAndValidateRecords( + topicIdPartition.topicPartition(), + config, + records, + UNKNOWN_OFFSET, // logStartOffset - set on control-plane, use unknown to fulfill validation + UnifiedLog.APPEND_ORIGIN, + false, // ignoreRecordSize + true, // requireOffsetsMonotonic + UnifiedLog.LEADER_EPOCH, + brokerTopicStats + ); + + if (appendInfo.validBytes() <= 0) { + // Empty batch + invalidBatches.put(topicIdPartition, new PartitionResponse(Errors.NONE)); + continue; + } + + MemoryRecords validRecords = UnifiedLog.trimInvalidBytes( + topicIdPartition.topicPartition(), + records, + appendInfo + ); + + // Offset assignment + PrimitiveRef.LongRef offset = PrimitiveRef.ofLong(appendInfo.firstOffset()); + Compression targetCompression = BrokerCompressionType.targetCompression( + config.compression, + appendInfo.sourceCompression() + ); + + LogValidator validator = new LogValidator( + validRecords, + topicIdPartition.topicPartition(), + time, + appendInfo.sourceCompression(), + targetCompression, + config.compact, + RecordBatch.CURRENT_MAGIC_VALUE, + config.messageTimestampType, + config.messageTimestampBeforeMaxMs, + config.messageTimestampAfterMaxMs, + UnifiedLog.LEADER_EPOCH, + UnifiedLog.APPEND_ORIGIN + ); + + LogValidator.ValidationResult result = validator.validateMessagesAndAssignOffsets( + offset, + validatorMetricsRecorder, + requestLocal.bufferSupplier() + ); + + MemoryRecords finalRecords = result.validatedRecords(); + appendInfo.setMaxTimestamp(result.maxTimestampMs()); + appendInfo.setLastOffset(offset.value - 1); + appendInfo.setRecordValidationStats(result.recordValidationStats()); + if (config.messageTimestampType == TimestampType.LOG_APPEND_TIME) { + appendInfo.setLogAppendTime(result.logAppendTimeMs()); + } + + // Re-validate sizes if changed + if (result.messageSizeMaybeChanged()) { + for (MutableRecordBatch batch : finalRecords.batches()) { + if (batch.sizeInBytes() > config.maxMessageSize()) { + brokerTopicStats.topicStats(topicIdPartition.topicPartition().topic()) + .bytesRejectedRate().mark(records.sizeInBytes()); + brokerTopicStats.allTopicsStats().bytesRejectedRate().mark(records.sizeInBytes()); + throw new RecordTooLargeException("Message batch size is " + batch.sizeInBytes() + + " bytes which exceeds the maximum configured size of " + config.maxMessageSize()); + } + } + } + + // Update metrics for successfully validated + brokerTopicStats.topicStats(topicIdPartition.topic()).bytesInRate(true).mark(records.sizeInBytes()); + brokerTopicStats.allTopicsStats().bytesInRate(true).mark(records.sizeInBytes()); + brokerTopicStats.topicStats(topicIdPartition.topic()).messagesInRate().mark(appendInfo.numMessages()); + brokerTopicStats.allTopicsStats().messagesInRate().mark(appendInfo.numMessages()); + + validatedBatches.put(topicIdPartition, new ValidatedRequest.ValidatedBatch( + topicIdPartition, + finalRecords, + appendInfo + )); + + } catch (RecordTooLargeException | CorruptRecordException | KafkaStorageException e) { + invalidBatches.put(topicIdPartition, new PartitionResponse(Errors.forException(e))); + } catch (RecordValidationException rve) { + processFailedRecords(topicIdPartition.topicPartition(), rve.invalidException()); + invalidBatches.put(topicIdPartition, + new PartitionResponse( + Errors.forException(rve.invalidException()), + ProduceResponse.INVALID_OFFSET, + RecordBatch.NO_TIMESTAMP, + ProduceResponse.INVALID_OFFSET, + rve.recordErrors(), + rve.getMessage() + )); + } catch (Throwable t) { + processFailedRecords(topicIdPartition.topicPartition(), t); + invalidBatches.put(topicIdPartition, new PartitionResponse(Errors.forException(t))); + } + } + + return new ValidationResult(validatedBatches, invalidBatches); + } + + private void processFailedRecords(TopicPartition topicPartition, Throwable t) { + brokerTopicStats.topicStats(topicPartition.topic()).failedProduceRequestRate().mark(); + brokerTopicStats.allTopicsStats().failedProduceRequestRate().mark(); + if (t instanceof InvalidProducerEpochException) { + // InvalidProducerEpochException is expected during producer fencing, log at INFO + LOGGER.info("Error processing append operation on partition {}", topicPartition, t); + } else { + LOGGER.error("Error processing append operation on partition {}", topicPartition, t); + } + } + + /** + * Stage 2: Buffer writer loop (single thread, NO LOCK!). + * + *

This method runs on a dedicated thread and owns all mutable buffer state. + * It drains validated requests from the queue and appends them to the active file. + */ + private void bufferWriterLoop() { + LOGGER.info("Buffer writer thread started"); + + while (!closed.get() || !bufferQueue.isEmpty()) { + try { + // Drain multiple requests for efficiency (batch processing) + List batch = new ArrayList<>(); + + // Block waiting for at least one request + ValidatedRequest first = bufferQueue.poll(BUFFER_WRITER_POLL_TIMEOUT_MS, TimeUnit.MILLISECONDS); + + if (first != null) { + batch.add(first); + // Drain any additional waiting requests (non-blocking) + bufferQueue.drainTo(batch, MAX_BATCH_DRAIN_SIZE - 1); + } + + // Process batch (NO LOCK - single thread owns state!) + for (ValidatedRequest request : batch) { + processValidatedRequest(request); + } + + // Check for tick-based rotation if no requests processed + if (batch.isEmpty()) { + maybeTickRotate(); + } + + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + LOGGER.info("Buffer writer thread interrupted"); + break; + } catch (Exception e) { + LOGGER.error("Error in buffer writer loop", e); + } + } + + LOGGER.info("Buffer writer thread exiting"); + } + + /** + * Process a single validated request (runs on buffer writer thread only). + */ + private void processValidatedRequest(ValidatedRequest request) { + try { + // Check if this is a rotation trigger from tick scheduler + if (isRotationTrigger(request)) { + if (!activeFile.isEmpty()) { + LOGGER.debug("Processing tick-based rotation trigger"); + cancelScheduledTick(); + rotateFile(false); + } + return; // Don't process rotation triggers as normal requests + } + + if (openedAt == null && !request.hasNoValidBatches()) { + openedAt = TimeUtils.durationMeasurementNow(time); + } + + // Check if rotation needed before adding + int incomingSize = request.validatedSize(); + int currentSize = activeFile.size(); + + if (!activeFile.isEmpty() && currentSize + incomingSize > maxBufferSize) { + cancelScheduledTick(); + rotateFile(false); + } + + // Assign request ID and add batches to buffer + int requestId = nextRequestId++; + + for (var entry : request.validatedBatches().entrySet()) { + TopicIdPartition tip = entry.getKey(); + ValidatedRequest.ValidatedBatch batch = entry.getValue(); + + // Add each record batch to the buffer + for (MutableRecordBatch recordBatch : batch.validatedRecords().batches()) { + activeFile.addBatchDirect(tip, recordBatch, requestId); + } + } + + // Store future for completion on file commit (responses built during commit phase by AppendCompleter) + activeFile.addAwaitingFuture(requestId, request.resultFuture(), request.originalRecords(), request.invalidBatches()); + + writerMetrics.requestAdded(); + + // Check if rotation needed after adding + if (activeFile.size() >= maxBufferSize) { + cancelScheduledTick(); + rotateFile(false); + } else if (scheduledTick == null && !activeFile.isEmpty()) { + scheduleTickRotation(); + } + + } catch (Exception e) { + LOGGER.error("Error processing validated request", e); + request.resultFuture().completeExceptionally(e); + } + } + + private void maybeTickRotate() { + // Called when no requests were processed in this iteration. + // Ensure rotation is scheduled if there's buffered data but no scheduled tick. + if (!activeFile.isEmpty() && scheduledTick == null) { + scheduleTickRotation(); + } + } + + private void scheduleTickRotation() { + scheduledTick = tickScheduler.schedule( + this::tickRotate, + commitInterval.toMillis(), + TimeUnit.MILLISECONDS + ); + } + + private void cancelScheduledTick() { + if (scheduledTick != null) { + scheduledTick.cancel(false); + scheduledTick = null; + } + } + + /** + * Called by tick scheduler to trigger rotation. + * Submits a rotation command to the buffer queue. + * + *

Note: This method runs on the tick scheduler thread, not the buffer writer thread. + * We only check the closed flag here (which is thread-safe via AtomicBoolean). + * The buffer writer thread will decide whether to actually rotate based on activeFile state. + */ + private void tickRotate() { + if (!closed.get()) { + try { + // Submit a rotation trigger to the buffer queue. + // The buffer writer thread will check if rotation is needed. + bufferQueue.offer(createRotationTrigger(), 1, TimeUnit.SECONDS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + } + + private ValidatedRequest createRotationTrigger() { + return ValidatedRequest.rotationTrigger(); + } + + private boolean isRotationTrigger(ValidatedRequest request) { + return request.isRotationTrigger(); + } + + private void rotateFile(boolean swallowInterrupted) { + LOGGER.debug("Rotating active file"); + ActiveFile prevActiveFile = activeFile; + activeFile = new ActiveFile(time, brokerTopicStats); + scheduledTick = null; + + try { + fileCommitter.commit(prevActiveFile.close()); + if (openedAt != null) { + writerMetrics.fileRotated(openedAt); + openedAt = null; + } + } catch (InterruptedException e) { + if (!swallowInterrupted) { + LOGGER.error("Interrupted during rotation", e); + throw new RuntimeException(e); + } else { + LOGGER.info("Interrupted during rotation, ignoring", e); + } + } + } + + @Override + public void close() throws IOException { + if (!closed.compareAndSet(false, true)) { + return; + } + + LOGGER.info("Closing PipelinedWriter"); + + // Stop accepting new validation work + validationExecutor.shutdown(); + + // Wait for validation tasks to complete and enqueue their results + try { + if (!validationExecutor.awaitTermination(30, TimeUnit.SECONDS)) { + LOGGER.warn("Validation executor did not terminate gracefully"); + validationExecutor.shutdownNow(); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + validationExecutor.shutdownNow(); + } + + // Cancel tick scheduler + tickScheduler.shutdownNow(); + + // Wait for buffer queue to drain and buffer writer to finish + bufferWriterExecutor.shutdown(); + try { + if (!bufferWriterExecutor.awaitTermination(30, TimeUnit.SECONDS)) { + LOGGER.warn("Buffer writer did not terminate gracefully"); + bufferWriterExecutor.shutdownNow(); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + bufferWriterExecutor.shutdownNow(); + } + + // Final rotation if there's pending data + if (!activeFile.isEmpty()) { + rotateFile(true); + } + + // Close downstream components + fileCommitter.close(); + storage.close(); + writerMetrics.close(); + + LOGGER.info("PipelinedWriter closed"); + } +} diff --git a/storage/inkless/src/main/java/io/aiven/inkless/produce/ProduceWriter.java b/storage/inkless/src/main/java/io/aiven/inkless/produce/ProduceWriter.java new file mode 100644 index 00000000000..8fea38cd863 --- /dev/null +++ b/storage/inkless/src/main/java/io/aiven/inkless/produce/ProduceWriter.java @@ -0,0 +1,51 @@ +/* + * Inkless + * Copyright (C) 2024 - 2025 Aiven OY + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package io.aiven.inkless.produce; + +import org.apache.kafka.common.TopicIdPartition; +import org.apache.kafka.common.record.MemoryRecords; +import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse; +import org.apache.kafka.server.common.RequestLocal; +import org.apache.kafka.storage.internals.log.LogConfig; + +import java.io.Closeable; +import java.util.Map; +import java.util.concurrent.CompletableFuture; + +/** + * Interface for produce writers in Inkless. + * + *

This interface abstracts the produce writing functionality, allowing for different + * implementations such as the lock-based {@link Writer} and the pipelined {@link PipelinedWriter}. + */ +public interface ProduceWriter extends Closeable { + + /** + * Write records to the diskless storage. + * + * @param entriesPerPartition the records to write, keyed by partition + * @param topicConfigs the log configurations for each topic + * @param requestLocal the request-local context + * @return a future that completes with the partition responses when the write is committed + */ + CompletableFuture> write( + Map entriesPerPartition, + Map topicConfigs, + RequestLocal requestLocal + ); +} diff --git a/storage/inkless/src/main/java/io/aiven/inkless/produce/ValidatedRequest.java b/storage/inkless/src/main/java/io/aiven/inkless/produce/ValidatedRequest.java new file mode 100644 index 00000000000..774f5cdf783 --- /dev/null +++ b/storage/inkless/src/main/java/io/aiven/inkless/produce/ValidatedRequest.java @@ -0,0 +1,134 @@ +/* + * Inkless + * Copyright (C) 2024 - 2025 Aiven OY + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package io.aiven.inkless.produce; + +import org.apache.kafka.common.TopicIdPartition; +import org.apache.kafka.common.record.MemoryRecords; +import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse; +import org.apache.kafka.storage.internals.log.LogAppendInfo; + +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.CompletableFuture; + +/** + * Holds the result of validation stage, to be passed to the buffer writer stage. + * + *

This class is used to transfer validated batches between the validation workers + * and the single buffer writer thread in the pipelined Writer architecture. + * + *

Thread-safety: This class is immutable except for the resultFuture which is + * set once during construction and completed once by the buffer writer. + */ +class ValidatedRequest { + + /** + * Holds a validated batch ready for buffering. + */ + record ValidatedBatch( + TopicIdPartition topicIdPartition, + MemoryRecords validatedRecords, + LogAppendInfo appendInfo + ) { + ValidatedBatch { + Objects.requireNonNull(topicIdPartition, "topicIdPartition cannot be null"); + Objects.requireNonNull(validatedRecords, "validatedRecords cannot be null"); + Objects.requireNonNull(appendInfo, "appendInfo cannot be null"); + } + } + + private final Map originalRecords; + private final Map validatedBatches; + private final Map invalidBatches; + private final CompletableFuture> resultFuture; + private final boolean rotationTrigger; + + ValidatedRequest( + Map originalRecords, + Map validatedBatches, + Map invalidBatches, + CompletableFuture> resultFuture + ) { + this(originalRecords, validatedBatches, invalidBatches, resultFuture, false); + } + + private ValidatedRequest( + Map originalRecords, + Map validatedBatches, + Map invalidBatches, + CompletableFuture> resultFuture, + boolean rotationTrigger + ) { + this.originalRecords = Objects.requireNonNull(originalRecords, "originalRecords cannot be null"); + this.validatedBatches = Objects.requireNonNull(validatedBatches, "validatedBatches cannot be null"); + this.invalidBatches = Objects.requireNonNull(invalidBatches, "invalidBatches cannot be null"); + this.resultFuture = Objects.requireNonNull(resultFuture, "resultFuture cannot be null"); + this.rotationTrigger = rotationTrigger; + } + + /** + * Creates a special rotation trigger request. + * This is used to signal the buffer writer to rotate the active file. + */ + static ValidatedRequest rotationTrigger() { + return new ValidatedRequest( + Map.of(), Map.of(), Map.of(), + CompletableFuture.completedFuture(Map.of()), + true + ); + } + + Map originalRecords() { + return originalRecords; + } + + Map validatedBatches() { + return validatedBatches; + } + + Map invalidBatches() { + return invalidBatches; + } + + CompletableFuture> resultFuture() { + return resultFuture; + } + + /** + * Returns true if this is a rotation trigger signal, not a real request. + */ + boolean isRotationTrigger() { + return rotationTrigger; + } + + /** + * Returns the total size in bytes of all validated batches. + */ + int validatedSize() { + return validatedBatches.values().stream() + .mapToInt(b -> b.validatedRecords().sizeInBytes()) + .sum(); + } + + /** + * Returns true if there are no validated batches (all were invalid). + */ + boolean hasNoValidBatches() { + return validatedBatches.isEmpty(); + } +} diff --git a/storage/inkless/src/main/java/io/aiven/inkless/produce/Writer.java b/storage/inkless/src/main/java/io/aiven/inkless/produce/Writer.java index 7096393d4d3..4ac4e51de5d 100644 --- a/storage/inkless/src/main/java/io/aiven/inkless/produce/Writer.java +++ b/storage/inkless/src/main/java/io/aiven/inkless/produce/Writer.java @@ -31,7 +31,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.Closeable; import java.io.IOException; import java.time.Duration; import java.time.Instant; @@ -65,7 +64,7 @@ * *

The class is thread-safe: all the event entry points are protected with the lock.

*/ -class Writer implements Closeable { +class Writer implements ProduceWriter { private static final Logger LOGGER = LoggerFactory.getLogger(Writer.class); private final Lock lock = new ReentrantLock(); @@ -137,7 +136,8 @@ class Writer implements Closeable { this.activeFile = new ActiveFile(time, brokerTopicStats); } - CompletableFuture> write( + @Override + public CompletableFuture> write( final Map entriesPerPartition, final Map topicConfigs, final RequestLocal requestLocal diff --git a/storage/inkless/src/test/java/io/aiven/inkless/config/InklessConfigTest.java b/storage/inkless/src/test/java/io/aiven/inkless/config/InklessConfigTest.java index 1db9ad002a3..56fcfe90844 100644 --- a/storage/inkless/src/test/java/io/aiven/inkless/config/InklessConfigTest.java +++ b/storage/inkless/src/test/java/io/aiven/inkless/config/InklessConfigTest.java @@ -108,6 +108,9 @@ void minimalConfig() { assertThat(config.fetchDataThreadPoolSize()).isEqualTo(32); assertThat(config.fetchMetadataThreadPoolSize()).isEqualTo(8); assertThat(config.maxBatchesPerEnforcementRequest()).isEqualTo(0); + // Pipelined writer defaults + assertThat(config.producePipelinedEnabled()).isFalse(); + assertThat(config.producePipelinedValidationThreads()).isEqualTo(0); } @Test @@ -134,6 +137,8 @@ void fullConfig() { configs.put("fetch.lagging.consumer.threshold.ms", "240000"); // 4 minutes configs.put("fetch.lagging.consumer.request.rate.limit", "250"); configs.put("retention.enforcement.max.batches.per.request", "10"); + configs.put("produce.pipelined.enabled", "true"); + configs.put("produce.pipelined.validation.threads", "8"); final var config = new InklessConfig( configs ); @@ -158,6 +163,8 @@ void fullConfig() { assertThat(config.fetchLaggingConsumerThresholdMs()).isEqualTo(240_000L); assertThat(config.fetchLaggingConsumerRequestRateLimit()).isEqualTo(250); assertThat(config.maxBatchesPerEnforcementRequest()).isEqualTo(10); + assertThat(config.producePipelinedEnabled()).isTrue(); + assertThat(config.producePipelinedValidationThreads()).isEqualTo(8); } @Test @@ -500,6 +507,19 @@ void laggingConsumerThresholdEqualToCacheLifespanValid() { assertThat(inklessConfig.fetchLaggingConsumerThresholdMs()).isEqualTo(60_000L); } + @Test + void producePipelinedValidationThreadsNegativeInvalid() { + final Map config = Map.of( + "control.plane.class", InMemoryControlPlane.class.getCanonicalName(), + "storage.backend.class", ConfigTestStorageBackend.class.getCanonicalName(), + "produce.pipelined.validation.threads", "-1" + ); + + assertThatThrownBy(() -> new InklessConfig(config)) + .isInstanceOf(ConfigException.class) + .hasMessage("Invalid value -1 for configuration produce.pipelined.validation.threads: Value must be at least 0"); + } + @Test void laggingConsumerThresholdAutoSkipsValidation() { // Test that threshold=-1 (auto) skips validation even if cache lifespan is large diff --git a/storage/inkless/src/test/java/io/aiven/inkless/produce/ActiveFileTest.java b/storage/inkless/src/test/java/io/aiven/inkless/produce/ActiveFileTest.java index bc0edd6afc1..6781d498ad3 100644 --- a/storage/inkless/src/test/java/io/aiven/inkless/produce/ActiveFileTest.java +++ b/storage/inkless/src/test/java/io/aiven/inkless/produce/ActiveFileTest.java @@ -23,6 +23,7 @@ import org.apache.kafka.common.config.TopicConfig; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.record.MemoryRecords; +import org.apache.kafka.common.record.MutableRecordBatch; import org.apache.kafka.common.record.SimpleRecord; import org.apache.kafka.common.record.TimestampType; import org.apache.kafka.common.requests.ProduceResponse; @@ -36,6 +37,7 @@ import java.time.Instant; import java.util.List; import java.util.Map; +import java.util.concurrent.CompletableFuture; import io.aiven.inkless.control_plane.CommitBatchRequest; @@ -208,4 +210,170 @@ void gatherInvalidResponses() { .containsExactly(Map.entry(T0P1, new ProduceResponse.PartitionResponse(Errors.INVALID_RECORD))); assertThat(result.data()).hasSize(312 - 78); // 78 bytes of the invalid batch } + + // ========== Tests for PipelinedWriter support methods ========== + + /** + * Test addBatchDirect adds pre-validated batches to the buffer. + * This method is used by PipelinedWriter where validation is done in a separate stage. + * + *

Note: addBatchDirect only adds batches to the buffer but doesn't update the + * requestId counter. The full pipelined flow requires calling addAwaitingFuture + * to register the request and update isEmpty() status. + */ + @Test + void addBatchDirect() { + final Instant start = Instant.ofEpochMilli(10); + final ActiveFile file = new ActiveFile(Time.SYSTEM, start); + + // Create a record and extract the batch + final MemoryRecords records = MemoryRecords.withRecords(Compression.NONE, new SimpleRecord(1000, new byte[10])); + final MutableRecordBatch batch = records.batches().iterator().next(); + + // Initially empty and zero size + assertThat(file.size()).isZero(); + + // Add batch directly (bypassing validation) + file.addBatchDirect(T0P0, batch, 0); + + // Size is updated, but isEmpty() requires addAwaitingFuture to be called + assertThat(file.size()).isEqualTo(78); + + // Register the request to complete the pipelined flow + final CompletableFuture> future = new CompletableFuture<>(); + file.addAwaitingFuture(0, future, Map.of(T0P0, records), Map.of()); + + // Now isEmpty() returns false because a request was registered + assertThat(file.isEmpty()).isFalse(); + } + + /** + * Test addBatchDirect with multiple batches to different partitions. + */ + @Test + void addBatchDirectMultiplePartitions() { + final ActiveFile file = new ActiveFile(Time.SYSTEM, Instant.EPOCH); + + final MemoryRecords records1 = MemoryRecords.withRecords(Compression.NONE, new SimpleRecord(1000, new byte[10])); + final MemoryRecords records2 = MemoryRecords.withRecords(Compression.NONE, new SimpleRecord(2000, new byte[10])); + + file.addBatchDirect(T0P0, records1.batches().iterator().next(), 0); + file.addBatchDirect(T0P1, records2.batches().iterator().next(), 0); + + assertThat(file.size()).isEqualTo(156); // 78 * 2 + + final ClosedFile closed = file.close(); + assertThat(closed.commitBatchRequests()).hasSize(2); + } + + /** + * Test addAwaitingFuture registers futures for completion. + * This method is used by PipelinedWriter to store futures after validation. + */ + @Test + void addAwaitingFuture() { + final ActiveFile file = new ActiveFile(Time.SYSTEM, Instant.EPOCH); + + // Add some batches first + final MemoryRecords records = MemoryRecords.withRecords(Compression.NONE, new SimpleRecord(1000, new byte[10])); + file.addBatchDirect(T0P0, records.batches().iterator().next(), 0); + + // Create and register a future + final CompletableFuture> resultFuture = new CompletableFuture<>(); + final Map originalRecords = Map.of(T0P0, records); + final Map invalidBatches = Map.of(); + + file.addAwaitingFuture(0, resultFuture, originalRecords, invalidBatches); + + // Close and verify the future is tracked + final ClosedFile closed = file.close(); + assertThat(closed.awaitingFuturesByRequest()).hasSize(1); + assertThat(closed.awaitingFuturesByRequest().get(0)).isSameAs(resultFuture); + assertThat(closed.originalRequests().get(0)).isEqualTo(originalRecords); + } + + /** + * Test addAwaitingFuture with invalid batches. + */ + @Test + void addAwaitingFutureWithInvalidBatches() { + final ActiveFile file = new ActiveFile(Time.SYSTEM, Instant.EPOCH); + + // Add some batches + final MemoryRecords records = MemoryRecords.withRecords(Compression.NONE, new SimpleRecord(1000, new byte[10])); + file.addBatchDirect(T0P0, records.batches().iterator().next(), 0); + + // Create future with some invalid batches + final CompletableFuture> resultFuture = new CompletableFuture<>(); + final Map originalRecords = Map.of( + T0P0, records, + T0P1, records // This one will be "invalid" + ); + final Map invalidBatches = Map.of( + T0P1, new ProduceResponse.PartitionResponse(Errors.INVALID_RECORD) + ); + + file.addAwaitingFuture(0, resultFuture, originalRecords, invalidBatches); + + final ClosedFile closed = file.close(); + assertThat(closed.invalidResponseByRequest().get(0)).containsKey(T0P1); + assertThat(closed.invalidResponseByRequest().get(0).get(T0P1).error).isEqualTo(Errors.INVALID_RECORD); + } + + /** + * Test multiple requests using addBatchDirect and addAwaitingFuture. + */ + @Test + void multiplePipelinedRequests() { + final ActiveFile file = new ActiveFile(Time.SYSTEM, Instant.EPOCH); + + // Request 0 + final MemoryRecords records0 = MemoryRecords.withRecords(Compression.NONE, new SimpleRecord(1000, new byte[10])); + file.addBatchDirect(T0P0, records0.batches().iterator().next(), 0); + final CompletableFuture> future0 = new CompletableFuture<>(); + file.addAwaitingFuture(0, future0, Map.of(T0P0, records0), Map.of()); + + // Request 1 + final MemoryRecords records1 = MemoryRecords.withRecords(Compression.NONE, new SimpleRecord(2000, new byte[10])); + file.addBatchDirect(T0P1, records1.batches().iterator().next(), 1); + final CompletableFuture> future1 = new CompletableFuture<>(); + file.addAwaitingFuture(1, future1, Map.of(T0P1, records1), Map.of()); + + // Request 2 + final MemoryRecords records2 = MemoryRecords.withRecords(Compression.NONE, new SimpleRecord(3000, new byte[10])); + file.addBatchDirect(T1P0, records2.batches().iterator().next(), 2); + final CompletableFuture> future2 = new CompletableFuture<>(); + file.addAwaitingFuture(2, future2, Map.of(T1P0, records2), Map.of()); + + final ClosedFile closed = file.close(); + + // Verify all requests are tracked + assertThat(closed.awaitingFuturesByRequest()).hasSize(3); + assertThat(closed.originalRequests()).hasSize(3); + assertThat(closed.commitBatchRequests()).hasSize(3); + + // Verify futures are not yet completed + assertThat(future0).isNotCompleted(); + assertThat(future1).isNotCompleted(); + assertThat(future2).isNotCompleted(); + } + + /** + * Test that addBatchDirect initializes start time on first call. + */ + @Test + void addBatchDirectInitializesStartTime() { + final Time time = new MockTime(); + final ActiveFile file = new ActiveFile(time, (Instant) null); + + // start is null initially + assertThat(file.isEmpty()).isTrue(); + + final MemoryRecords records = MemoryRecords.withRecords(Compression.NONE, new SimpleRecord(1000, new byte[10])); + file.addBatchDirect(T0P0, records.batches().iterator().next(), 0); + + // After addBatchDirect, the file should have a start time + final ClosedFile closed = file.close(); + assertThat(closed.start()).isNotNull(); + } } diff --git a/storage/inkless/src/test/java/io/aiven/inkless/produce/PipelinedWriterTest.java b/storage/inkless/src/test/java/io/aiven/inkless/produce/PipelinedWriterTest.java new file mode 100644 index 00000000000..9d79f12882c --- /dev/null +++ b/storage/inkless/src/test/java/io/aiven/inkless/produce/PipelinedWriterTest.java @@ -0,0 +1,434 @@ +/* + * Inkless + * Copyright (C) 2024 - 2025 Aiven OY + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package io.aiven.inkless.produce; + +import org.apache.kafka.common.TopicIdPartition; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.config.TopicConfig; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.record.MemoryRecords; +import org.apache.kafka.common.record.TimestampType; +import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse; +import org.apache.kafka.common.utils.MockTime; +import org.apache.kafka.server.common.RequestLocal; +import org.apache.kafka.storage.internals.log.LogConfig; +import org.apache.kafka.storage.log.metrics.BrokerTopicStats; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.mockito.junit.jupiter.MockitoSettings; +import org.mockito.quality.Strictness; + +import java.io.IOException; +import java.time.Duration; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import io.aiven.inkless.storage_backend.common.StorageBackend; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.atLeastOnce; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; + +/** + * Tests for {@link PipelinedWriter}. + * + *

These tests verify the SEDA-based pipelined writer functionality, + * including tick-based rotation which was a source of bugs. + */ +@ExtendWith(MockitoExtension.class) +@MockitoSettings(strictness = Strictness.LENIENT) +class PipelinedWriterTest { + static final String TOPIC_0 = "topic0"; + static final String TOPIC_1 = "topic1"; + static final Uuid TOPIC_ID_0 = new Uuid(0, 1); + static final Uuid TOPIC_ID_1 = new Uuid(0, 2); + static final TopicIdPartition T0P0 = new TopicIdPartition(TOPIC_ID_0, 0, TOPIC_0); + static final TopicIdPartition T0P1 = new TopicIdPartition(TOPIC_ID_0, 1, TOPIC_0); + static final TopicIdPartition T1P0 = new TopicIdPartition(TOPIC_ID_1, 0, TOPIC_1); + + static final Map TOPIC_CONFIGS = Map.of( + TOPIC_0, new LogConfig(Map.of(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG, TimestampType.CREATE_TIME.name)), + TOPIC_1, new LogConfig(Map.of(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG, TimestampType.CREATE_TIME.name)) + ); + static final RequestLocal REQUEST_LOCAL = RequestLocal.noCaching(); + + MockTime time; + ScheduledExecutorService tickScheduler; + BrokerTopicStats brokerTopicStats; + WriterTestUtils.RecordCreator recordCreator; + + @Mock + StorageBackend storage; + @Mock + FileCommitter fileCommitter; + @Mock + WriterMetrics writerMetrics; + + PipelinedWriter writer; + + @BeforeEach + void setup() throws InterruptedException { + time = new MockTime(); + tickScheduler = Executors.newScheduledThreadPool(1); + brokerTopicStats = new BrokerTopicStats(); + recordCreator = new WriterTestUtils.RecordCreator(); + + // Set up mock to complete futures when commit is called + // This simulates what AppendCompleter does in production + doAnswer(invocation -> { + ClosedFile closedFile = invocation.getArgument(0); + // Complete all awaiting futures with success responses + closedFile.awaitingFuturesByRequest().forEach((requestId, future) -> { + Map response = new HashMap<>(); + closedFile.originalRequests().getOrDefault(requestId, Map.of()).keySet().forEach(tip -> + response.put(tip, new PartitionResponse(Errors.NONE, 0, -1, 0)) + ); + // Also include invalid batches in response + response.putAll(closedFile.invalidResponseByRequest().getOrDefault(requestId, Map.of())); + future.complete(response); + }); + return null; + }).when(fileCommitter).commit(any(ClosedFile.class)); + } + + @AfterEach + void tearDown() throws IOException { + if (writer != null) { + writer.close(); + } + tickScheduler.shutdownNow(); + } + + private PipelinedWriter createWriter(Duration commitInterval, int maxBufferSize, int validationThreads) { + return new PipelinedWriter( + time, + commitInterval, + maxBufferSize, + tickScheduler, + storage, + fileCommitter, + writerMetrics, + brokerTopicStats, + validationThreads + ); + } + + /** + * Test that tick-based rotation works correctly. + * + *

This test verifies that rotation triggers are properly processed + * when the buffer doesn't fill up (requiring tick-based rotation). + */ + @Test + void tickBasedRotationCompletesFutures() throws InterruptedException, ExecutionException, TimeoutException { + // Use a very short commit interval so tick fires quickly + final Duration commitInterval = Duration.ofMillis(50); + // Use a large buffer so size-based rotation doesn't trigger + final int maxBufferSize = 10 * 1024 * 1024; // 10 MB + + writer = createWriter(commitInterval, maxBufferSize, 4); + + // Write a small request (won't trigger size-based rotation) + final Map writeRequest = Map.of( + T0P0, recordCreator.create(T0P0.topicPartition(), 10) // small batch + ); + + CompletableFuture> future = + writer.write(writeRequest, TOPIC_CONFIGS, REQUEST_LOCAL); + + // The future should complete within a reasonable time via tick-based rotation + // If tick-based rotation is broken, this will timeout + Map result = future.get(5, TimeUnit.SECONDS); + + // Verify the response + assertThat(result).containsKey(T0P0); + assertThat(result.get(T0P0).error).isEqualTo(Errors.NONE); + + // Verify commit was called (rotation happened) + verify(fileCommitter, atLeastOnce()).commit(any()); + } + + /** + * Test that size-based rotation works when buffer fills up. + */ + @Test + void sizeBasedRotationCompletesFutures() throws InterruptedException, ExecutionException, TimeoutException { + // Use a long commit interval so tick doesn't interfere + final Duration commitInterval = Duration.ofMinutes(10); + // Use a small buffer so size-based rotation triggers + final int maxBufferSize = 1000; // Very small + + writer = createWriter(commitInterval, maxBufferSize, 4); + + // Write a request that will exceed the buffer size + final Map writeRequest = Map.of( + T0P0, recordCreator.create(T0P0.topicPartition(), 100) // larger batch + ); + + CompletableFuture> future = + writer.write(writeRequest, TOPIC_CONFIGS, REQUEST_LOCAL); + + // Should complete quickly via size-based rotation + Map result = future.get(5, TimeUnit.SECONDS); + + assertThat(result).containsKey(T0P0); + assertThat(result.get(T0P0).error).isEqualTo(Errors.NONE); + verify(fileCommitter, atLeastOnce()).commit(any()); + } + + /** + * Test that multiple writes are batched correctly. + */ + @Test + void multipleWritesAreBatched() throws InterruptedException, ExecutionException, TimeoutException { + final Duration commitInterval = Duration.ofMillis(100); + final int maxBufferSize = 10 * 1024 * 1024; + + writer = createWriter(commitInterval, maxBufferSize, 4); + + // Write multiple small requests + CompletableFuture> future1 = + writer.write(Map.of(T0P0, recordCreator.create(T0P0.topicPartition(), 5)), TOPIC_CONFIGS, REQUEST_LOCAL); + CompletableFuture> future2 = + writer.write(Map.of(T0P0, recordCreator.create(T0P0.topicPartition(), 5)), TOPIC_CONFIGS, REQUEST_LOCAL); + CompletableFuture> future3 = + writer.write(Map.of(T0P0, recordCreator.create(T0P0.topicPartition(), 5)), TOPIC_CONFIGS, REQUEST_LOCAL); + + // All should complete via tick-based rotation + Map result1 = future1.get(5, TimeUnit.SECONDS); + Map result2 = future2.get(5, TimeUnit.SECONDS); + Map result3 = future3.get(5, TimeUnit.SECONDS); + + assertThat(result1.get(T0P0).error).isEqualTo(Errors.NONE); + assertThat(result2.get(T0P0).error).isEqualTo(Errors.NONE); + assertThat(result3.get(T0P0).error).isEqualTo(Errors.NONE); + } + + /** + * Test writes to multiple partitions in a single request. + */ + @Test + void multiplePartitionsInSingleRequest() throws InterruptedException, ExecutionException, TimeoutException { + final Duration commitInterval = Duration.ofMillis(100); + final int maxBufferSize = 10 * 1024 * 1024; + + writer = createWriter(commitInterval, maxBufferSize, 4); + + // Write to multiple partitions in one request + final Map writeRequest = Map.of( + T0P0, recordCreator.create(T0P0.topicPartition(), 5), + T0P1, recordCreator.create(T0P1.topicPartition(), 5), + T1P0, recordCreator.create(T1P0.topicPartition(), 5) + ); + + CompletableFuture> future = + writer.write(writeRequest, TOPIC_CONFIGS, REQUEST_LOCAL); + + Map result = future.get(5, TimeUnit.SECONDS); + + assertThat(result).containsKeys(T0P0, T0P1, T1P0); + assertThat(result.get(T0P0).error).isEqualTo(Errors.NONE); + assertThat(result.get(T0P1).error).isEqualTo(Errors.NONE); + assertThat(result.get(T1P0).error).isEqualTo(Errors.NONE); + } + + /** + * Test that writer rejects writes after close. + */ + @Test + void rejectsWritesAfterClose() throws IOException { + writer = createWriter(Duration.ofMillis(100), 10 * 1024 * 1024, 4); + + writer.close(); + + CompletableFuture> future = + writer.write(Map.of(T0P0, recordCreator.create(T0P0.topicPartition(), 5)), TOPIC_CONFIGS, REQUEST_LOCAL); + + assertThat(future).isCompletedExceptionally(); + assertThatThrownBy(future::get) + .isInstanceOf(ExecutionException.class) + .hasCauseInstanceOf(RuntimeException.class); + } + + /** + * Test that empty requests are rejected. + */ + @Test + void rejectsEmptyRequests() { + writer = createWriter(Duration.ofMillis(100), 10 * 1024 * 1024, 4); + + assertThatThrownBy(() -> writer.write(Map.of(), TOPIC_CONFIGS, REQUEST_LOCAL)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("empty"); + } + + /** + * Test that null parameters are rejected. + */ + @Test + void rejectsNullParameters() { + writer = createWriter(Duration.ofMillis(100), 10 * 1024 * 1024, 4); + + assertThatThrownBy(() -> writer.write(null, TOPIC_CONFIGS, REQUEST_LOCAL)) + .isInstanceOf(NullPointerException.class); + + assertThatThrownBy(() -> writer.write(Map.of(T0P0, recordCreator.create(T0P0.topicPartition(), 5)), null, REQUEST_LOCAL)) + .isInstanceOf(NullPointerException.class); + + assertThatThrownBy(() -> writer.write(Map.of(T0P0, recordCreator.create(T0P0.topicPartition(), 5)), TOPIC_CONFIGS, null)) + .isInstanceOf(NullPointerException.class); + } + + /** + * Test constructor validation. + */ + @Test + void constructorValidation() { + // null time + assertThatThrownBy(() -> new PipelinedWriter( + null, Duration.ofMillis(100), 1024, tickScheduler, storage, fileCommitter, writerMetrics, brokerTopicStats, 4 + )).isInstanceOf(NullPointerException.class); + + // null commitInterval + assertThatThrownBy(() -> new PipelinedWriter( + time, null, 1024, tickScheduler, storage, fileCommitter, writerMetrics, brokerTopicStats, 4 + )).isInstanceOf(NullPointerException.class); + + // non-positive maxBufferSize + assertThatThrownBy(() -> new PipelinedWriter( + time, Duration.ofMillis(100), 0, tickScheduler, storage, fileCommitter, writerMetrics, brokerTopicStats, 4 + )).isInstanceOf(IllegalArgumentException.class); + + assertThatThrownBy(() -> new PipelinedWriter( + time, Duration.ofMillis(100), -1, tickScheduler, storage, fileCommitter, writerMetrics, brokerTopicStats, 4 + )).isInstanceOf(IllegalArgumentException.class); + + // null tickScheduler + assertThatThrownBy(() -> new PipelinedWriter( + time, Duration.ofMillis(100), 1024, null, storage, fileCommitter, writerMetrics, brokerTopicStats, 4 + )).isInstanceOf(NullPointerException.class); + + // null storage + assertThatThrownBy(() -> new PipelinedWriter( + time, Duration.ofMillis(100), 1024, tickScheduler, null, fileCommitter, writerMetrics, brokerTopicStats, 4 + )).isInstanceOf(NullPointerException.class); + + // null fileCommitter + assertThatThrownBy(() -> new PipelinedWriter( + time, Duration.ofMillis(100), 1024, tickScheduler, storage, null, writerMetrics, brokerTopicStats, 4 + )).isInstanceOf(NullPointerException.class); + + // null writerMetrics + assertThatThrownBy(() -> new PipelinedWriter( + time, Duration.ofMillis(100), 1024, tickScheduler, storage, fileCommitter, null, brokerTopicStats, 4 + )).isInstanceOf(NullPointerException.class); + } + + /** + * Test that validation worker count of 0 uses auto-detection. + */ + @Test + void validationWorkerCountAutoDetection() throws InterruptedException, ExecutionException, TimeoutException { + // Use 0 for auto-detection + writer = createWriter(Duration.ofMillis(100), 10 * 1024 * 1024, 0); + + CompletableFuture> future = + writer.write(Map.of(T0P0, recordCreator.create(T0P0.topicPartition(), 5)), TOPIC_CONFIGS, REQUEST_LOCAL); + + Map result = future.get(5, TimeUnit.SECONDS); + assertThat(result.get(T0P0).error).isEqualTo(Errors.NONE); + } + + /** + * Test close without any writes. + */ + @Test + void closeWithoutWrites() throws IOException, InterruptedException { + writer = createWriter(Duration.ofMillis(100), 10 * 1024 * 1024, 4); + + writer.close(); + + // Should not call commit since there was no data + verify(fileCommitter, never()).commit(any()); + } + + /** + * Test that close is idempotent. + */ + @Test + void closeIsIdempotent() throws IOException { + writer = createWriter(Duration.ofMillis(100), 10 * 1024 * 1024, 4); + + writer.close(); + writer.close(); // Should not throw + + // Mark as null so tearDown doesn't try to close again + writer = null; + } + + /** + * Test concurrent writes from multiple threads. + */ + @Test + void concurrentWrites() throws InterruptedException, ExecutionException, TimeoutException { + writer = createWriter(Duration.ofMillis(200), 10 * 1024 * 1024, 4); + + final int numThreads = 10; + final int writesPerThread = 5; + + CompletableFuture[] futures = new CompletableFuture[numThreads * writesPerThread]; + + for (int t = 0; t < numThreads; t++) { + for (int w = 0; w < writesPerThread; w++) { + int index = t * writesPerThread + w; + futures[index] = CompletableFuture.runAsync(() -> { + try { + writer.write( + Map.of(T0P0, recordCreator.create(T0P0.topicPartition(), 3)), + TOPIC_CONFIGS, + REQUEST_LOCAL + ).get(10, TimeUnit.SECONDS); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + } + } + + // Wait for all to complete + CompletableFuture.allOf(futures).get(30, TimeUnit.SECONDS); + + // All should have completed without error + verify(fileCommitter, atLeastOnce()).commit(any()); + } +} diff --git a/storage/inkless/src/test/java/io/aiven/inkless/produce/ValidatedRequestTest.java b/storage/inkless/src/test/java/io/aiven/inkless/produce/ValidatedRequestTest.java new file mode 100644 index 00000000000..5e1efa0395b --- /dev/null +++ b/storage/inkless/src/test/java/io/aiven/inkless/produce/ValidatedRequestTest.java @@ -0,0 +1,171 @@ +/* + * Inkless + * Copyright (C) 2024 - 2025 Aiven OY + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package io.aiven.inkless.produce; + +import org.apache.kafka.common.TopicIdPartition; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.compress.Compression; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.record.MemoryRecords; +import org.apache.kafka.common.record.SimpleRecord; +import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse; +import org.apache.kafka.storage.internals.log.LogAppendInfo; + +import org.junit.jupiter.api.Test; + +import java.util.Map; +import java.util.concurrent.CompletableFuture; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** + * Tests for {@link ValidatedRequest} and {@link ValidatedRequest.ValidatedBatch}. + */ +class ValidatedRequestTest { + static final Uuid TOPIC_ID_0 = new Uuid(0, 1); + static final String TOPIC_0 = "topic0"; + static final TopicIdPartition T0P0 = new TopicIdPartition(TOPIC_ID_0, 0, TOPIC_0); + static final TopicIdPartition T0P1 = new TopicIdPartition(TOPIC_ID_0, 1, TOPIC_0); + + // Use the UNKNOWN_LOG_APPEND_INFO constant for testing + private static final LogAppendInfo APPEND_INFO = LogAppendInfo.UNKNOWN_LOG_APPEND_INFO; + + @Test + void validatedBatchConstructorValidation() { + final MemoryRecords records = MemoryRecords.withRecords(Compression.NONE, new SimpleRecord(new byte[10])); + + assertThatThrownBy(() -> new ValidatedRequest.ValidatedBatch(null, records, APPEND_INFO)) + .isInstanceOf(NullPointerException.class) + .hasMessageContaining("topicIdPartition"); + + assertThatThrownBy(() -> new ValidatedRequest.ValidatedBatch(T0P0, null, APPEND_INFO)) + .isInstanceOf(NullPointerException.class) + .hasMessageContaining("validatedRecords"); + + assertThatThrownBy(() -> new ValidatedRequest.ValidatedBatch(T0P0, records, null)) + .isInstanceOf(NullPointerException.class) + .hasMessageContaining("appendInfo"); + } + + @Test + void validatedRequestConstructorValidation() { + final Map originalRecords = Map.of(); + final Map validatedBatches = Map.of(); + final Map invalidBatches = Map.of(); + final CompletableFuture> resultFuture = new CompletableFuture<>(); + + assertThatThrownBy(() -> new ValidatedRequest(null, validatedBatches, invalidBatches, resultFuture)) + .isInstanceOf(NullPointerException.class) + .hasMessageContaining("originalRecords"); + + assertThatThrownBy(() -> new ValidatedRequest(originalRecords, null, invalidBatches, resultFuture)) + .isInstanceOf(NullPointerException.class) + .hasMessageContaining("validatedBatches"); + + assertThatThrownBy(() -> new ValidatedRequest(originalRecords, validatedBatches, null, resultFuture)) + .isInstanceOf(NullPointerException.class) + .hasMessageContaining("invalidBatches"); + + assertThatThrownBy(() -> new ValidatedRequest(originalRecords, validatedBatches, invalidBatches, null)) + .isInstanceOf(NullPointerException.class) + .hasMessageContaining("resultFuture"); + } + + @Test + void validatedSizeEmpty() { + final ValidatedRequest request = new ValidatedRequest( + Map.of(), + Map.of(), + Map.of(), + new CompletableFuture<>() + ); + + assertThat(request.validatedSize()).isZero(); + } + + @Test + void validatedSizeWithBatches() { + final MemoryRecords records1 = MemoryRecords.withRecords(Compression.NONE, new SimpleRecord(new byte[10])); + final MemoryRecords records2 = MemoryRecords.withRecords(Compression.NONE, new SimpleRecord(new byte[20])); + + final ValidatedRequest request = new ValidatedRequest( + Map.of(T0P0, records1, T0P1, records2), + Map.of( + T0P0, new ValidatedRequest.ValidatedBatch(T0P0, records1, APPEND_INFO), + T0P1, new ValidatedRequest.ValidatedBatch(T0P1, records2, APPEND_INFO) + ), + Map.of(), + new CompletableFuture<>() + ); + + // Size should be sum of both record sizes + assertThat(request.validatedSize()).isEqualTo(records1.sizeInBytes() + records2.sizeInBytes()); + } + + @Test + void hasNoValidBatchesTrue() { + final ValidatedRequest request = new ValidatedRequest( + Map.of(), + Map.of(), // No valid batches + Map.of(T0P0, new PartitionResponse(Errors.INVALID_RECORD)), // Only invalid + new CompletableFuture<>() + ); + + assertThat(request.hasNoValidBatches()).isTrue(); + } + + @Test + void hasNoValidBatchesFalse() { + final MemoryRecords records = MemoryRecords.withRecords(Compression.NONE, new SimpleRecord(new byte[10])); + + final ValidatedRequest request = new ValidatedRequest( + Map.of(T0P0, records), + Map.of(T0P0, new ValidatedRequest.ValidatedBatch(T0P0, records, APPEND_INFO)), + Map.of(), + new CompletableFuture<>() + ); + + assertThat(request.hasNoValidBatches()).isFalse(); + } + + @Test + void accessors() { + final MemoryRecords records = MemoryRecords.withRecords(Compression.NONE, new SimpleRecord(new byte[10])); + final Map originalRecords = Map.of(T0P0, records); + final Map validatedBatches = Map.of( + T0P0, new ValidatedRequest.ValidatedBatch(T0P0, records, APPEND_INFO) + ); + final Map invalidBatches = Map.of( + T0P1, new PartitionResponse(Errors.INVALID_RECORD) + ); + final CompletableFuture> resultFuture = new CompletableFuture<>(); + + final ValidatedRequest request = new ValidatedRequest( + originalRecords, + validatedBatches, + invalidBatches, + resultFuture + ); + + assertThat(request.originalRecords()).isEqualTo(originalRecords); + assertThat(request.validatedBatches()).isEqualTo(validatedBatches); + assertThat(request.invalidBatches()).isEqualTo(invalidBatches); + assertThat(request.resultFuture()).isSameAs(resultFuture); + } +}