Skip to content
Draft
Show file tree
Hide file tree
Changes from 7 commits
Commits
Show all changes
59 commits
Select commit Hold shift + click to select a range
782f6df
KAFKA-19883: Add TX_PENDING state to RecordState for KIP-1289 transac…
Shekharrajak May 23, 2026
f1fbb13
KAFKA-19883: Add txn staging fields and applyTxnMarker to InFlightSta…
Shekharrajak May 23, 2026
5c03d8d
KAFKA-19883: Add ShareGroupMetadata and TxnShareAcknowledge wire sche…
Shekharrajak May 23, 2026
3b5c547
KAFKA-19883: Add sendShareAcknowledgementsToTransaction to producer a…
Shekharrajak May 23, 2026
555b065
KAFKA-19883: Add SharePartition.stageTxnAcknowledge and applyTxnMarke…
Shekharrajak May 24, 2026
02b73ef
KAFKA-19883: Add SharePartitionManager.acknowledgeTransactional and a…
Shekharrajak May 24, 2026
f0ce656
KAFKA-19883: Wire TxnShareAcknowledge handler and WriteTxnMarkers hoo…
Shekharrajak May 24, 2026
8fae3cb
KAFKA-19883: Roll back staged records on stage failure for per-partit…
Shekharrajak May 25, 2026
f34609a
KAFKA-19883: Drop TV1 path in sendShareAcknowledgementsToTransaction …
Shekharrajak May 28, 2026
cde188c
Add transactional share ack drain
Shekharrajak Jun 11, 2026
187a228
Register share ack transaction participants
Shekharrajak Jun 11, 2026
562b06e
Scope share transaction markers
Shekharrajak Jun 11, 2026
0d31d5b
Gate transactional share acknowledgements
Shekharrajak Jun 11, 2026
9223f2d
Handle share ack partition errors
Shekharrajak Jun 11, 2026
cf3fe0a
Finalize share ack markers
Shekharrajak Jun 11, 2026
cedcf0e
Preserve pending share ack metadata
Shekharrajak Jun 11, 2026
4d1f329
Persist pending share acknowledgements
Shekharrajak Jun 11, 2026
96d1f31
Persist share ack marker finalization
Shekharrajak Jun 11, 2026
1a1517e
Restore pending share ack metadata
Shekharrajak Jun 11, 2026
0b1906f
Complete share ack markers
Shekharrajak Jun 11, 2026
5500953
Test share ack marker routing
Shekharrajak Jun 11, 2026
955274f
Authorize transactional share acks
Shekharrajak Jun 11, 2026
f0b377b
Handle share ack staging retries
Shekharrajak Jun 11, 2026
6d10f2d
Fix transactional share ack staging
Shekharrajak Jun 11, 2026
2633a34
Initialize transactional share ack cache
Shekharrajak Jun 11, 2026
48b5061
Require TV2 for transactional share acks
Shekharrajak Jun 13, 2026
764e4a5
Return affected share keys from txn markers
Shekharrajak Jun 13, 2026
695a945
Invalidate share caches after txn markers
Shekharrajak Jun 13, 2026
728912f
Require TV2 for share txn markers
Shekharrajak Jun 13, 2026
f8d2020
Test multi-partition share txn registration
Shekharrajak Jun 13, 2026
eebb9be
Wire txn share ack test fixtures
Shekharrajak Jun 14, 2026
2fcfad0
Fix transactional share ack splitting
Shekharrajak Jun 14, 2026
5a151c4
Fix share txn marker epoch matching
Shekharrajak Jun 15, 2026
28102b1
Test aborted share txn ack redelivery
Shekharrajak Jun 15, 2026
735d1c1
Finalize rejected share txn acks
Shekharrajak Jun 15, 2026
d4852c9
Test subset share txn ack commit
Shekharrajak Jun 15, 2026
ff4f502
Test remote share txn ack commit
Shekharrajak Jun 16, 2026
b0c198d
Fix remote txn share ack validation
Shekharrajak Jun 16, 2026
ed8d02a
Test remote share txn ack abort
Shekharrajak Jun 16, 2026
5b07f7f
Merge upstream trunk
Shekharrajak Jun 16, 2026
b79935b
Cover share txn participant mapping
Shekharrajak Jun 17, 2026
4564874
Fix transactional share ack staging
Shekharrajak Jun 17, 2026
d8ae956
Test share txn commit recovery
Shekharrajak Jun 17, 2026
17e9428
Test share txn abort recovery
Shekharrajak Jun 17, 2026
9c884a3
Assert share txn recovery states
Shekharrajak Jun 17, 2026
2fea1dc
Add multi share-state txn ack IT
Shekharrajak Jun 18, 2026
49136c3
Cover share ack drain semantics
Shekharrajak Jun 18, 2026
a8b5fe9
Test share txn coordinator failover
Shekharrajak Jun 18, 2026
f737fe5
Assert remote share marker refresh
Shekharrajak Jun 18, 2026
04961cc
Fix transactional reject DLQ flow
Shekharrajak Jun 18, 2026
7c8dbd9
Cover share marker retries
Shekharrajak Jun 18, 2026
6fb4175
Expose prepared txn owner state
Shekharrajak Jun 19, 2026
95c654e
Rename share ack txn owner
Shekharrajak Jun 19, 2026
7e7e1b9
Merge remote-tracking branch 'upstream/trunk' into kip-1289-txn-ack-s…
Shekharrajak Jun 24, 2026
43f07ea
Fence stale transactional share acknowledgements
Shekharrajak Jun 24, 2026
8e7687d
Clarify fenced share member epoch test
Shekharrajak Jun 24, 2026
936bf1c
Add same-transaction share ack tests
Shekharrajak Jun 26, 2026
425f1de
Enable prepared transaction recovery
Shekharrajak Jun 26, 2026
7f01976
Add prepared transaction recovery test
Shekharrajak Jun 26, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -837,6 +837,16 @@ public void wakeup() {
delegate.wakeup();
}

/**
* Returns a snapshot of the current share group membership metadata for this consumer.
* Pass to {@link org.apache.kafka.clients.producer.Producer#sendShareAcknowledgementsToTransaction}
* to atomically bind acknowledgments to a producer transaction (KIP-1289).
*/
@Override
public ShareGroupMetadata shareGroupMetadata() {
return delegate.shareGroupMetadata();
}

// Functions below are for testing only
String clientId() {
return delegate.clientId();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,11 @@ public synchronized void wakeup() {
wakeup.set(true);
}

@Override
public synchronized ShareGroupMetadata shareGroupMetadata() {
throw new UnsupportedOperationException("MockShareConsumer does not support shareGroupMetadata()");
}

public synchronized void addRecord(ConsumerRecord<K, V> record) {
ensureNotClosed();
TopicPartition tp = new TopicPartition(record.topic(), record.partition());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,4 +132,9 @@ public interface ShareConsumer<K, V> extends Closeable {
* @see KafkaShareConsumer#wakeup()
*/
void wakeup();

/**
* @see KafkaShareConsumer#shareGroupMetadata()
*/
ShareGroupMetadata shareGroupMetadata();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.consumer;

import java.util.Objects;

/**
* Metadata for the share group member obtained via {@link ShareConsumer#shareGroupMetadata()}.
* Pass this to {@link org.apache.kafka.clients.producer.Producer#sendShareAcknowledgementsToTransaction}
* to atomically bind acknowledgments to a producer transaction (KIP-1289).
*/
public class ShareGroupMetadata {

private final String groupId;
private final String memberId;
private final int memberEpoch;

public ShareGroupMetadata(String groupId, String memberId, int memberEpoch) {
this.groupId = Objects.requireNonNull(groupId, "groupId cannot be null");
this.memberId = Objects.requireNonNull(memberId, "memberId cannot be null");
this.memberEpoch = memberEpoch;
}

public String groupId() {
return groupId;
}

public String memberId() {
return memberId;
}

public int memberEpoch() {
return memberEpoch;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
ShareGroupMetadata that = (ShareGroupMetadata) o;
return memberEpoch == that.memberEpoch
&& Objects.equals(groupId, that.groupId)
&& Objects.equals(memberId, that.memberId);
}

@Override
public int hashCode() {
return Objects.hash(groupId, memberId, memberEpoch);
}

@Override
public String toString() {
return "ShareGroupMetadata(groupId=" + groupId
+ ", memberId=" + memberId
+ ", memberEpoch=" + memberEpoch
+ ")";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.kafka.clients.GroupRebalanceConfig;
import org.apache.kafka.clients.KafkaClient;
import org.apache.kafka.clients.consumer.AcknowledgeType;
import org.apache.kafka.clients.consumer.ShareGroupMetadata;
import org.apache.kafka.clients.consumer.AcknowledgementCommitCallback;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
Expand All @@ -45,6 +46,7 @@
import org.apache.kafka.clients.consumer.internals.events.ShareAcknowledgementEventHandler;
import org.apache.kafka.clients.consumer.internals.events.ShareFetchEvent;
import org.apache.kafka.clients.consumer.internals.events.SharePollEvent;
import org.apache.kafka.clients.consumer.internals.events.ShareGroupMetadataEvent;
import org.apache.kafka.clients.consumer.internals.events.ShareSubscriptionChangeEvent;
import org.apache.kafka.clients.consumer.internals.events.ShareUnsubscribeEvent;
import org.apache.kafka.clients.consumer.internals.events.StopFindCoordinatorOnCloseEvent;
Expand Down Expand Up @@ -1114,6 +1116,21 @@ public void wakeup() {
wakeupTrigger.wakeup();
}

@Override
public ShareGroupMetadata shareGroupMetadata() {
acquireAndEnsureOpen();
try {
ShareGroupMetadataEvent event = new ShareGroupMetadataEvent(
calculateDeadlineMs(time.timer(defaultApiTimeoutMs)));
applicationEventHandler.add(event);
return processBackgroundEvents(event.future(),
time.timer(defaultApiTimeoutMs),
e -> false);
} finally {
release();
}
}

/**
* Acquire the light lock and ensure that the consumer hasn't been closed.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ public enum Type {
SHARE_SUBSCRIPTION_CHANGE, SHARE_UNSUBSCRIBE,
SHARE_ACKNOWLEDGE_ON_CLOSE,
SHARE_ACKNOWLEDGEMENT_COMMIT_CALLBACK_REGISTRATION,
SHARE_GROUP_METADATA,
SEEK_UNVALIDATED,
STREAMS_ON_TASKS_ASSIGNED_CALLBACK_COMPLETED,
STREAMS_ON_TASKS_REVOKED_CALLBACK_COMPLETED,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,10 @@ public void process(ApplicationEvent event) {
process((SharePollEvent) event);
return;

case SHARE_GROUP_METADATA:
process((ShareGroupMetadataEvent) event);
return;

case COMMIT_ASYNC:
process((AsyncCommitEvent) event);
return;
Expand Down Expand Up @@ -619,6 +623,16 @@ private void process(final ShareAcknowledgementCommitCallbackRegistrationEvent e
manager.setAcknowledgementCommitCallbackRegistered(event.isCallbackRegistered());
}

private void process(final ShareGroupMetadataEvent event) {
requestManagers.shareMembershipManager.ifPresentOrElse(
mgr -> event.future().complete(
new org.apache.kafka.clients.consumer.ShareGroupMetadata(
mgr.groupId(), mgr.memberId(), mgr.memberEpoch())),
() -> event.future().completeExceptionally(
new IllegalStateException("No share group membership manager available"))
);
}

private void process(final SeekUnvalidatedEvent event) {
try {
event.offsetEpoch().ifPresent(epoch -> metadata.updateLastSeenEpochIfNewer(event.partition(), epoch));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.consumer.internals.events;

import org.apache.kafka.clients.consumer.ShareGroupMetadata;

/**
* Event requesting the current share group membership metadata snapshot from the background thread.
* Used by {@link org.apache.kafka.clients.consumer.internals.ShareConsumerImpl#shareGroupMetadata()}.
*/
public class ShareGroupMetadataEvent extends CompletableApplicationEvent<ShareGroupMetadata> {

public ShareGroupMetadataEvent(long deadlineMs) {
super(Type.SHARE_GROUP_METADATA, deadlineMs);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.kafka.clients.consumer.ShareGroupMetadata;
import org.apache.kafka.clients.consumer.internals.AcknowledgementBatch;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.clients.producer.internals.BufferPool;
import org.apache.kafka.clients.producer.internals.BuiltInPartitioner;
import org.apache.kafka.clients.producer.internals.KafkaProducerMetrics;
Expand Down Expand Up @@ -780,6 +783,22 @@ public void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offs
}
}

@Override
public void sendShareAcknowledgementsToTransaction(
Map<TopicIdPartition, List<AcknowledgementBatch>> acknowledgements,
ShareGroupMetadata groupMetadata) throws ProducerFencedException {
throwIfNoTransactionManager();
throwIfProducerClosed();
throwIfInPreparedState();

if (!acknowledgements.isEmpty()) {
TransactionalRequestResult result =
transactionManager.sendShareAcknowledgementsToTransaction(acknowledgements, groupMetadata);
sender.wakeup();
result.await(maxBlockTimeMs, TimeUnit.MILLISECONDS, SEND_OFFSETS_TIMEOUT_MSG);
}
}

/**
* Prepares the current transaction for a two-phase commit. This method will flush all pending messages
* and transition the producer into a mode where only {@link #commitTransaction()}, {@link #abortTransaction()},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@

import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.ShareGroupMetadata;
import org.apache.kafka.clients.consumer.internals.AcknowledgementBatch;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.clients.producer.internals.FutureRecordMetadata;
import org.apache.kafka.clients.producer.internals.ProduceRequestResult;
import org.apache.kafka.common.Cluster;
Expand Down Expand Up @@ -209,6 +212,16 @@ public void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offs
this.sentOffsets = true;
}

@Override
public void sendShareAcknowledgementsToTransaction(
Map<TopicIdPartition, List<AcknowledgementBatch>> acknowledgements,
ShareGroupMetadata groupMetadata) throws ProducerFencedException {
verifyNotClosed();
verifyNotFenced();
verifyTransactionsInitialized();
verifyTransactionInFlight();
}

@Override
public PreparedTxnState prepareTransaction() throws ProducerFencedException {
verifyNotClosed();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,12 @@

import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.ShareGroupMetadata;
import org.apache.kafka.clients.consumer.internals.AcknowledgementBatch;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.ProducerFencedException;
Expand Down Expand Up @@ -62,6 +65,12 @@ default void initTransactions() {
void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets,
ConsumerGroupMetadata groupMetadata) throws ProducerFencedException;

/**
* See {@link KafkaProducer#sendShareAcknowledgementsToTransaction(Map, ShareGroupMetadata)}
*/
void sendShareAcknowledgementsToTransaction(Map<TopicIdPartition, List<AcknowledgementBatch>> acknowledgements,
ShareGroupMetadata groupMetadata) throws ProducerFencedException;

/**
* See {@link KafkaProducer#prepareTransaction()}
*/
Expand Down
Loading