Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 16 additions & 1 deletion docs/inkless/configs.rst
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,23 @@ Under ``inkless.``
* Valid Values: [1,...]
* Importance: medium

``produce.pipelined.enabled``
Whether to use the pipelined writer instead of the lock-based writer. The pipelined writer uses a SEDA architecture to eliminate lock contention: validation is parallelized across N worker threads, and buffer writing is handled by a single dedicated thread. This eliminates the global writer lock bottleneck.

* Type: boolean
* Default: false
* Importance: medium

``produce.pipelined.validation.threads``
Number of validation worker threads for the pipelined writer. These threads perform CPU-intensive validation work (CRC validation, size checks, offset assignment) in parallel. A value of 0 means auto-detect (uses available processors).

* Type: int
* Default: 0
* Valid Values: [0,...]
* Importance: medium

``produce.upload.backoff.ms``
The number of millisecond to back off for before the next upload attempt.
The number of milliseconds to back off for before the next upload attempt.

* Type: int
* Default: 10
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,24 @@ public class InklessConfig extends AbstractConfig {
private static final int PRODUCE_MAX_UPLOAD_ATTEMPTS_DEFAULT = 3;

public static final String PRODUCE_UPLOAD_BACKOFF_MS_CONFIG = PRODUCE_PREFIX + "upload.backoff.ms";
private static final String PRODUCE_UPLOAD_BACKOFF_MS_DOC = "The number of millisecond to back off for before the next upload attempt.";
private static final String PRODUCE_UPLOAD_BACKOFF_MS_DOC = "The number of milliseconds to back off for before the next upload attempt.";
private static final int PRODUCE_UPLOAD_BACKOFF_MS_DEFAULT = 10;

public static final String PRODUCE_PIPELINED_PREFIX = PRODUCE_PREFIX + "pipelined.";

public static final String PRODUCE_PIPELINED_ENABLED_CONFIG = PRODUCE_PIPELINED_PREFIX + "enabled";
private static final String PRODUCE_PIPELINED_ENABLED_DOC = "Whether to use the pipelined writer instead of the lock-based writer. "
+ "The pipelined writer uses a SEDA architecture to eliminate lock contention: "
+ "validation is parallelized across N worker threads, and buffer writing is handled by a single dedicated thread. "
+ "This eliminates the global writer lock bottleneck.";
private static final boolean PRODUCE_PIPELINED_ENABLED_DEFAULT = false;

public static final String PRODUCE_PIPELINED_VALIDATION_THREADS_CONFIG = PRODUCE_PIPELINED_PREFIX + "validation.threads";
private static final String PRODUCE_PIPELINED_VALIDATION_THREADS_DOC = "Number of validation worker threads for the pipelined writer. "
+ "These threads perform CPU-intensive validation work (CRC validation, size checks, offset assignment) in parallel. "
+ "A value of 0 means auto-detect (uses available processors).";
private static final int PRODUCE_PIPELINED_VALIDATION_THREADS_DEFAULT = 0;

public static final String STORAGE_PREFIX = "storage.";

public static final String STORAGE_BACKEND_CLASS_CONFIG = STORAGE_PREFIX + "backend.class";
Expand Down Expand Up @@ -261,6 +276,23 @@ public static ConfigDef configDef() {
PRODUCE_UPLOAD_BACKOFF_MS_DOC
);

configDef.define(
PRODUCE_PIPELINED_ENABLED_CONFIG,
ConfigDef.Type.BOOLEAN,
PRODUCE_PIPELINED_ENABLED_DEFAULT,
ConfigDef.Importance.MEDIUM,
PRODUCE_PIPELINED_ENABLED_DOC
);

configDef.define(
PRODUCE_PIPELINED_VALIDATION_THREADS_CONFIG,
ConfigDef.Type.INT,
PRODUCE_PIPELINED_VALIDATION_THREADS_DEFAULT,
ConfigDef.Range.atLeast(0),
ConfigDef.Importance.MEDIUM,
PRODUCE_PIPELINED_VALIDATION_THREADS_DOC
);

configDef.define(
STORAGE_BACKEND_CLASS_CONFIG,
ConfigDef.Type.CLASS,
Expand Down Expand Up @@ -544,6 +576,14 @@ public Duration produceUploadBackoff() {
return Duration.ofMillis(getInt(PRODUCE_UPLOAD_BACKOFF_MS_CONFIG));
}

public boolean producePipelinedEnabled() {
return getBoolean(PRODUCE_PIPELINED_ENABLED_CONFIG);
}

public int producePipelinedValidationThreads() {
return getInt(PRODUCE_PIPELINED_VALIDATION_THREADS_CONFIG);
}
Comment thread
Mwea marked this conversation as resolved.

public int fetchCacheBlockBytes() {
return getInt(CONSUME_CACHE_BLOCK_BYTES_CONFIG);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.kafka.common.errors.RecordTooLargeException;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.MutableRecordBatch;
import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.requests.ProduceResponse;
import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse;
Expand Down Expand Up @@ -189,6 +190,59 @@ int size() {
return buffer.totalSize();
}

/**
* Adds a pre-validated batch directly to the buffer.
*
* <p>This method is used by {@link PipelinedWriter} where validation is done
* in a separate stage (validation workers) before reaching the buffer writer.
* The batch is assumed to be already validated, so this method only adds it
* to the buffer without re-validation.
*
* @param topicIdPartition the partition for the batch
* @param recordBatch the pre-validated record batch
* @param requestId the request ID for tracking
*/
void addBatchDirect(
final TopicIdPartition topicIdPartition,
final MutableRecordBatch recordBatch,
final int requestId
) {
if (start == null) {
start = TimeUtils.durationMeasurementNow(time);
}
buffer.addBatch(topicIdPartition, recordBatch, requestId);
}

/**
* Registers a future to be completed when the file is committed.
*
* <p>This method is used by {@link PipelinedWriter} to store the result future
* along with the original request data. The future will be completed by
* {@link AppendCompleter} when the file commit succeeds or fails.
*
* @param requestId the request ID
* @param resultFuture the future to complete with the partition responses
* @param originalRecords the original records from the produce request
* @param invalidBatches the batches that failed validation (already completed with errors)
*/
void addAwaitingFuture(
final int requestId,
final CompletableFuture<Map<TopicIdPartition, PartitionResponse>> resultFuture,
final Map<TopicIdPartition, MemoryRecords> originalRecords,
final Map<TopicIdPartition, PartitionResponse> invalidBatches
) {
// Request IDs are monotonically increasing from the single-threaded buffer writer
if (requestId <= this.requestId) {
LOGGER.warn("Unexpected request ID order: received {} but current is {}", requestId, this.requestId);
}
this.requestId = requestId;
originalRequests.put(requestId, originalRecords);
awaitingFuturesByRequest.put(requestId, resultFuture);
if (!invalidBatches.isEmpty()) {
invalidBatchesByRequest.put(requestId, invalidBatches);
}
}

ClosedFile close() {
final BatchBuffer.CloseResult closeResult = buffer.close();
return new ClosedFile(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,42 @@ public class AppendHandler implements Closeable {
private static final Logger LOGGER = LoggerFactory.getLogger(AppendHandler.class);

private final SharedState state;
private final Writer writer;
private final ProduceWriter writer;

@DoNotMutate
@CoverageIgnore
public AppendHandler(final SharedState state) {
this(
state,
new Writer(
this(state, createWriter(state));
}

@DoNotMutate
@CoverageIgnore
private static ProduceWriter createWriter(final SharedState state) {
if (state.config().producePipelinedEnabled()) {
LOGGER.info("Using PipelinedWriter with {} validation workers",
state.config().producePipelinedValidationThreads() > 0
? state.config().producePipelinedValidationThreads()
: "auto-detected");
return new PipelinedWriter(
state.time(),
state.brokerId(),
state.objectKeyCreator(),
state.buildStorage(),
state.keyAlignmentStrategy(),
state.cache(),
state.batchCoordinateCache(),
state.controlPlane(),
state.config().commitInterval(),
state.config().produceBufferMaxBytes(),
state.config().produceMaxUploadAttempts(),
state.config().produceUploadBackoff(),
state.config().produceUploadThreadPoolSize(),
state.brokerTopicStats(),
state.config().producePipelinedValidationThreads()
);
} else {
LOGGER.info("Using lock-based Writer");
return new Writer(
state.time(),
state.brokerId(),
state.objectKeyCreator(),
Expand All @@ -67,13 +95,13 @@ public AppendHandler(final SharedState state) {
state.config().produceUploadBackoff(),
state.config().produceUploadThreadPoolSize(),
state.brokerTopicStats()
)
);
);
}
}

// Visible for tests
AppendHandler(final SharedState state,
final Writer writer) {
final ProduceWriter writer) {
this.state = state;
this.writer = writer;
}
Expand Down
Loading