KAFKA-19883: [DRAFT] KIP-1289 — Transactional acknowledgments for share groups (state machine + client API + wire schema)#22357
Conversation
…nd wire shareGroupMetadata() for KIP-1289
| ARCHIVING((byte) 3), // Per KIP-1191 | ||
| ARCHIVED((byte) 4); | ||
| ARCHIVED((byte) 4), | ||
| TX_PENDING((byte) 5); // Per KIP-1289: staged into an open producer transaction |
There was a problem hiding this comment.
TX_PENDING is the bridge state that holds the record - between aquired and ack (or archived)
|
|
||
| // Either the transition is from Available -> Acquired or from Acquired -> Available/ | ||
| // Acknowledged/Archived. | ||
| if (newState == TX_PENDING && this != ACQUIRED) { |
There was a problem hiding this comment.
validation for tx_pending state - it should be after AQUIRED , before acknowledged or archiving
| if (state != RecordState.TX_PENDING) { | ||
| return null; | ||
| } | ||
| if (this.stagedProducerId != producerId || this.stagedProducerEpoch != producerEpoch) { |
There was a problem hiding this comment.
we need fencing to ensure only the right producer can confirm.
The confirmation can come from different RPC call from a different broker/server.
| topic.partitions().add(partition); | ||
| } | ||
|
|
||
| TxnShareAcknowledgeRequestData data = new TxnShareAcknowledgeRequestData() |
There was a problem hiding this comment.
TxnShareAcknowledgeRequest will be sent from produer at client side with all producer deails
| for (AcknowledgementBatch b : entry.getValue()) { | ||
| for (byte ackType : b.acknowledgeTypes()) { | ||
| batches.add(new TxnShareAcknowledgeBatch() | ||
| .setFirstOffset(b.firstOffset()) |
There was a problem hiding this comment.
(firstOffset, lastOffset, ackType) triples -> this will help in pointing to records which is already on the broker's log.
| "(currentState= " + currentState + ")"); | ||
| } | ||
|
|
||
| TxnRequestHandler handler; |
There was a problem hiding this comment.
Same implementation as sendOffsetsToTransaction() transactionManager.sendOffsetsToTransaction(offsets, groupMetadata);
|
Drafting txn Coord side of the changes. |
…pplyTxnMarker for KIP-1289
…k in KafkaApis for KIP-1289
| Map<TopicIdPartition, CompletableFuture<Throwable>> futures = new HashMap<>(); | ||
| acknowledgeTopics.forEach((topicIdPartition, acknowledgePartitionBatches) -> { | ||
| SharePartitionKey sharePartitionKey = sharePartitionKey(groupId, topicIdPartition); | ||
| SharePartition sharePartition = partitionCache.get(sharePartitionKey); |
There was a problem hiding this comment.
per-partition dispatch
There was a problem hiding this comment.
sharePartition.stageTxnAcknowledge(memberId, producerId, producerEpoch, acknowledgePartitionBatches)
|
|
||
| public void applyTxnMarker(long producerId, short producerEpoch, TransactionResult result) { | ||
| log.debug("Broadcasting txn marker producerId={} epoch={} result={}", producerId, producerEpoch, result); | ||
| partitionCache.values().forEach(sp -> sp.applyTxnMarker(producerId, producerEpoch, result)); |
There was a problem hiding this comment.
iterates every SharePartition on this broker, each iterates its InFlightStates, each checks (state == TX_PENDING && stagedProducerId == producerId && stagedProducerEpoch == epoch) and apply the marker for all the share partitions.
This can be improved but InFlightState.applyTxnMarker is O(1) operation
| var skippedMarkers = 0 | ||
| for (marker <- markers.asScala) { | ||
| val producerId = marker.producerId | ||
| sharePartitionManager.applyTxnMarker(producerId, marker.producerEpoch, marker.transactionResult) |
There was a problem hiding this comment.
After auth check for each marker calling share parititon manager - since share partition manager is responsible for finding share partition and apply
| ) { | ||
| for (Map.Entry<Long, InFlightBatch> entry : subMap.entrySet()) { | ||
| InFlightBatch inFlightBatch = entry.getValue(); | ||
| if (inFlightBatch.lastOffset() < startOffset) continue; |
There was a problem hiding this comment.
below startOffset - already acknowledged
| return future; | ||
| } | ||
|
|
||
| private Throwable stageBatchTxnRecords( |
There was a problem hiding this comment.
return throwable : signalling pattern
| } | ||
|
|
||
| throwable = stageBatchTxnRecords(memberId, producerId, producerEpoch, batch, ackTypeMap, subMap); | ||
| if (throwable != null) break; |
There was a problem hiding this comment.
break if there is one stage txn failed with any exception - if previous ones are success, then those updated with txn_pending state but current txn batch not.
transaction will abort and applyTxnMarker(ABORT) will revert them to AVAILABLE both batch ( batch 1, which was succeed and batch 2 which got failed in some step - and after the txn timeout it will also be aborted)
# user code :
try {
producer.sendShareAcknowledgementsToTransaction(acks, shareGroupMeta); // ← throws
producer.commitTransaction();
} catch (KafkaException e) {
producer.abortTransaction(); // ← this is what reverts the records state back to
}
There was a problem hiding this comment.
We must have same semantics as KafkaProducer.sendOffsetsToTransaction
Retry contract (catch KafkaException → abortTransaction() → retry loop)
• Distinction between fatal vs abortable vs retriable errors
• Producer reuse semantics after abort
There was a problem hiding this comment.
Transactional REJECT received
-> validate ownership/session/producer fencing
-> persist pending transactional reject
-> no DLQ write yet
Transaction COMMIT marker
-> materialize REJECT
-> if DLQ disabled: ARCHIVED
-> if DLQ enabled: ARCHIVING, then DLQ enqueue, then ARCHIVED
Transaction ABORT marker
-> discard pending REJECT
-> no DLQ write
-> record becomes retryable according to share lock/member rules
|
Thanks for the PR. Personally, I would prefer if there was no PR for such a complicated KIP until it has successfully passed a vote. Without serious review by committers knowledgeable in the areas of share groups and transactions, the KIP would not yet be able to pass a vote. It will need +3 binding votes. There are many details which need to be agreed by the committers before we are there. For example, we will need to have a new Finally, we would not accept a KIP like this directly into Apache Kafka as a GA feature without some kind of Early Access or Preview release, so the feature bump also allows us to pace its enablement with the successful completion of an extended period of system testing. My suggestion would be to close the PR, concentrate on fleshing out the KIP and building alignment with committers. |
| return DELIVERY_STATE_AVAILABLE; | ||
| } | ||
| if (batch.stagedDeliveryState() != PersisterStateBatch.NO_STAGED_DELIVERY_STATE) { | ||
| if (batch.stagedDeliveryState() == DELIVERY_STATE_ARCHIVING) { |
There was a problem hiding this comment.
For transactional REJECT with DLQ enabled, the coordinator must preserve staged ARCHIVING on commit.
The source SharePartition owns DLQ phase 2; after reload it resumes ARCHIVING, writes
the DLQ record, and then archives the share-state batch.
| topicIdPartition | ||
| ); | ||
| } | ||
| if (topicIdPartitionByShareStatePartition.size() == sgsTopicPartitions.size()) { |
There was a problem hiding this comment.
one producer transaction can include share acks whose source partitions map to multiple __share_group_state partitions. So this helper method will help in IT
We must try out some better way of doing it.
| assertEquals(0, shareConsumer.poll(Duration.ofMillis(1000)).count()); | ||
| verifySharePartitionLag(admin, groupId, tp, 0L); | ||
| waitForDlqRecords(dlqTopic, 1); | ||
| verifyLatestShareStateDeliveryState(groupId, acknowledgedPartition, 0L, RecordState.ARCHIVED); |
There was a problem hiding this comment.
proves commit+reject with DLQ enabled writes one DLQ record and ends in ARCHIVED.
| return DELIVERY_STATE_AVAILABLE; | ||
| } | ||
| if (batch.stagedDeliveryState() != PersisterStateBatch.NO_STAGED_DELIVERY_STATE) { | ||
| if (batch.stagedDeliveryState() == DELIVERY_STATE_ARCHIVING) { |
There was a problem hiding this comment.
preserves staged delivery state instead of converting ARCHIVING to ARCHIVED.
| sharePartition.applyTxnMarker(100L, (short) 1, TransactionResult.COMMIT).join(); | ||
| sharePartition.applyTxnMarker(100L, (short) 1, TransactionResult.COMMIT).join(); | ||
|
|
||
| assertTrue(sharePartition.cachedState().isEmpty()); |
There was a problem hiding this comment.
covers duplicate marker idempotency.
| * Prepare a transaction for a two-phase commit. | ||
| * This transitions the transaction to the PREPARED_TRANSACTION state. | ||
| * The preparedTxnState is set with the current producer ID and epoch. | ||
| * The preparedTxnState is set with the current transaction owner fence. |
There was a problem hiding this comment.
Share ack transaction staging/finalization internals now use txnOwnerId / txnOwnerEpoch across SharePartition, SharePartitionManager, InFlightState, InFlightBatch, and share coordinator
completion paths.
This will help in external procesing engine to use it as txn and engine coord will run commit during checkpoint or completion of all the jobs
| try { | ||
| assertThrows(CommitFailedException.class, | ||
| () -> transactionalProducer.sendShareAcknowledgementsToTransaction(acknowledgements, staleGroupMetadata)); | ||
| () -> transactionalProducer.sendShareAcknowledgementsToTransaction(acknowledgements, fencedGroupMetadata)); |
There was a problem hiding this comment.
the request is rejected because its epoch does not match coordinator-owned membership state.
There was a problem hiding this comment.
- old epoch lower than current -> stale request
- epoch equal to current -> valid request
- previous epoch -> tolerated in Kafka share group logic
- epoch higher than current -> impossible client claim, reject/fence it
| List<ConsumerRecord<byte[], byte[]>> outputRecords = readCommittedRecords(outputTopicPartition, 1); | ||
| ConsumerRecord<byte[], byte[]> outputRecord = outputRecords.get(0); | ||
| assertEquals(0L, outputRecord.offset()); | ||
| assertEquals(outputValue, new String(outputRecord.value(), StandardCharsets.UTF_8)); |
There was a problem hiding this comment.
validation : one Kafka transaction can include both a normal output record and share acknowledgements.
| verifySharePartitionLag(admin, groupId, inputTopicPartition, 1L); | ||
| ConsumerRecords<byte[], byte[]> redeliveredRecords = waitedPoll(shareConsumer, 2500L, 1); | ||
| ConsumerRecord<byte[], byte[]> redeliveredRecord = redeliveredRecords.iterator().next(); | ||
| assertEquals(0L, redeliveredRecord.offset()); |
There was a problem hiding this comment.
validation : one Kafka transaction can include both a normal output record and share acknowledgements.
This is abort case.
There was a problem hiding this comment.
output stays hidden from read_committed, share lag remains 1, and the input record is redelivered.
| val recoveredProducer = transactionalProducer(transactionalId) | ||
| try { | ||
| recoveredProducer.initTransactions(true) | ||
| recoveredProducer.completeTransaction(new PreparedTxnState(preparedState.toString)) |
There was a problem hiding this comment.
recovered commit .
Client recovery now sets the active transaction owner and marks the transaction as started, so completeTransaction(...) sends EndTxn
| val recoveredProducer = transactionalProducer(transactionalId) | ||
| try { | ||
| recoveredProducer.initTransactions(true) | ||
| recoveredProducer.completeTransaction(new PreparedTxnState(s"${preparedState.txnOwnerId + 1}:${preparedState.txnOwnerEpoch}")) |
There was a problem hiding this comment.
abort flow .
txnOwnerId do not match, so it calls abortTransaction()
| // 2PC functionality is disabled, clients that attempt to use this functionality | ||
| // would receive an authorization failed error. | ||
| responseCallback(initTransactionError(Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED)) | ||
| } else if (keepPreparedTxn) { |
There was a problem hiding this comment.
Broker no longer rejects keepPreparedTxn unconditionally
|
|
||
| public Builder(InitProducerIdRequestData data) { | ||
| super(ApiKeys.INIT_PRODUCER_ID); | ||
| super( |
There was a problem hiding this comment.
Client requires InitProducerId v6 when enable2Pc or keepPreparedTxn is set
| expectedProducerIdAndEpoch) | ||
| ) | ||
| txnMetadata.inLock(() => { | ||
| if (keepPreparedTxn && txnMetadata.state == TransactionState.ONGOING) { |
There was a problem hiding this comment.
Coordinator preserves only existing ONGOING 2PC transactions and returns the ongoing producer id/epoch for recovery.
Share groups (KIP-932) have no equivalent of sendOffsetsToTransaction. There is no way to atomically bind a share-group acknowledgment to a producer transaction, blocking EOS for share-group-based read-process-write pipelines.
Description
This is the foundational layer for KIP-1289. It does not include the broker-side RPC handler (follow-up PR).
tests
• RecordStateTest — full 6-state transition matrix including all TX_PENDING paths
• InFlightStateTxnTest
follow up
• Broker-side handleTransactionnShareAcknowledge handler in Kafka Apis.scala
• ShareCoordinatorShard.replayEndTransactionMarker() real body
• SharePartition TX_PENDING acquisition exclusion
• IT tests (require broker handler)