Skip to content
Draft
Show file tree
Hide file tree
Changes from 58 commits
Commits
Show all changes
59 commits
Select commit Hold shift + click to select a range
782f6df
KAFKA-19883: Add TX_PENDING state to RecordState for KIP-1289 transac…
Shekharrajak May 23, 2026
f1fbb13
KAFKA-19883: Add txn staging fields and applyTxnMarker to InFlightSta…
Shekharrajak May 23, 2026
5c03d8d
KAFKA-19883: Add ShareGroupMetadata and TxnShareAcknowledge wire sche…
Shekharrajak May 23, 2026
3b5c547
KAFKA-19883: Add sendShareAcknowledgementsToTransaction to producer a…
Shekharrajak May 23, 2026
555b065
KAFKA-19883: Add SharePartition.stageTxnAcknowledge and applyTxnMarke…
Shekharrajak May 24, 2026
02b73ef
KAFKA-19883: Add SharePartitionManager.acknowledgeTransactional and a…
Shekharrajak May 24, 2026
f0ce656
KAFKA-19883: Wire TxnShareAcknowledge handler and WriteTxnMarkers hoo…
Shekharrajak May 24, 2026
8fae3cb
KAFKA-19883: Roll back staged records on stage failure for per-partit…
Shekharrajak May 25, 2026
f34609a
KAFKA-19883: Drop TV1 path in sendShareAcknowledgementsToTransaction …
Shekharrajak May 28, 2026
cde188c
Add transactional share ack drain
Shekharrajak Jun 11, 2026
187a228
Register share ack transaction participants
Shekharrajak Jun 11, 2026
562b06e
Scope share transaction markers
Shekharrajak Jun 11, 2026
0d31d5b
Gate transactional share acknowledgements
Shekharrajak Jun 11, 2026
9223f2d
Handle share ack partition errors
Shekharrajak Jun 11, 2026
cf3fe0a
Finalize share ack markers
Shekharrajak Jun 11, 2026
cedcf0e
Preserve pending share ack metadata
Shekharrajak Jun 11, 2026
4d1f329
Persist pending share acknowledgements
Shekharrajak Jun 11, 2026
96d1f31
Persist share ack marker finalization
Shekharrajak Jun 11, 2026
1a1517e
Restore pending share ack metadata
Shekharrajak Jun 11, 2026
0b1906f
Complete share ack markers
Shekharrajak Jun 11, 2026
5500953
Test share ack marker routing
Shekharrajak Jun 11, 2026
955274f
Authorize transactional share acks
Shekharrajak Jun 11, 2026
f0b377b
Handle share ack staging retries
Shekharrajak Jun 11, 2026
6d10f2d
Fix transactional share ack staging
Shekharrajak Jun 11, 2026
2633a34
Initialize transactional share ack cache
Shekharrajak Jun 11, 2026
48b5061
Require TV2 for transactional share acks
Shekharrajak Jun 13, 2026
764e4a5
Return affected share keys from txn markers
Shekharrajak Jun 13, 2026
695a945
Invalidate share caches after txn markers
Shekharrajak Jun 13, 2026
728912f
Require TV2 for share txn markers
Shekharrajak Jun 13, 2026
f8d2020
Test multi-partition share txn registration
Shekharrajak Jun 13, 2026
eebb9be
Wire txn share ack test fixtures
Shekharrajak Jun 14, 2026
2fcfad0
Fix transactional share ack splitting
Shekharrajak Jun 14, 2026
5a151c4
Fix share txn marker epoch matching
Shekharrajak Jun 15, 2026
28102b1
Test aborted share txn ack redelivery
Shekharrajak Jun 15, 2026
735d1c1
Finalize rejected share txn acks
Shekharrajak Jun 15, 2026
d4852c9
Test subset share txn ack commit
Shekharrajak Jun 15, 2026
ff4f502
Test remote share txn ack commit
Shekharrajak Jun 16, 2026
b0c198d
Fix remote txn share ack validation
Shekharrajak Jun 16, 2026
ed8d02a
Test remote share txn ack abort
Shekharrajak Jun 16, 2026
5b07f7f
Merge upstream trunk
Shekharrajak Jun 16, 2026
b79935b
Cover share txn participant mapping
Shekharrajak Jun 17, 2026
4564874
Fix transactional share ack staging
Shekharrajak Jun 17, 2026
d8ae956
Test share txn commit recovery
Shekharrajak Jun 17, 2026
17e9428
Test share txn abort recovery
Shekharrajak Jun 17, 2026
9c884a3
Assert share txn recovery states
Shekharrajak Jun 17, 2026
2fea1dc
Add multi share-state txn ack IT
Shekharrajak Jun 18, 2026
49136c3
Cover share ack drain semantics
Shekharrajak Jun 18, 2026
a8b5fe9
Test share txn coordinator failover
Shekharrajak Jun 18, 2026
f737fe5
Assert remote share marker refresh
Shekharrajak Jun 18, 2026
04961cc
Fix transactional reject DLQ flow
Shekharrajak Jun 18, 2026
7c8dbd9
Cover share marker retries
Shekharrajak Jun 18, 2026
6fb4175
Expose prepared txn owner state
Shekharrajak Jun 19, 2026
95c654e
Rename share ack txn owner
Shekharrajak Jun 19, 2026
7e7e1b9
Merge remote-tracking branch 'upstream/trunk' into kip-1289-txn-ack-s…
Shekharrajak Jun 24, 2026
43f07ea
Fence stale transactional share acknowledgements
Shekharrajak Jun 24, 2026
8e7687d
Clarify fenced share member epoch test
Shekharrajak Jun 24, 2026
936bf1c
Add same-transaction share ack tests
Shekharrajak Jun 26, 2026
425f1de
Enable prepared transaction recovery
Shekharrajak Jun 26, 2026
7f01976
Add prepared transaction recovery test
Shekharrajak Jun 26, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -2170,6 +2170,8 @@ project(':clients:clients-integration-tests') {
testImplementation project(':metadata')
testImplementation project(':raft')
testImplementation project(':server')
testImplementation project(':coordinator-common')
testImplementation project(':share-coordinator')
testImplementation project(':storage')
testImplementation project(':core').sourceSets.test.output
testImplementation testFixtures(project(':clients'))
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -837,6 +837,25 @@ public void wakeup() {
delegate.wakeup();
}

/**
* Returns a snapshot of the current share group membership metadata for this consumer.
* Pass to {@link org.apache.kafka.clients.producer.Producer#sendShareAcknowledgementsToTransaction}
* to atomically bind acknowledgments to a producer transaction (KIP-1289).
*/
@Override
public ShareGroupMetadata shareGroupMetadata() {
return delegate.shareGroupMetadata();
}

/**
* Returns and clears acknowledgements prepared for a producer transaction.
* <p>This method can only be used with explicit acknowledgement mode.
*/
@Override
public ShareAcknowledgements acknowledgementsForTransaction() {
return delegate.acknowledgementsForTransaction();
}

// Functions below are for testing only
String clientId() {
return delegate.clientId();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,16 @@ public synchronized void wakeup() {
wakeup.set(true);
}

@Override
public synchronized ShareGroupMetadata shareGroupMetadata() {
throw new UnsupportedOperationException("MockShareConsumer does not support shareGroupMetadata()");
}

@Override
public synchronized ShareAcknowledgements acknowledgementsForTransaction() {
throw new UnsupportedOperationException("MockShareConsumer does not support acknowledgementsForTransaction()");
}

public synchronized void addRecord(ConsumerRecord<K, V> record) {
ensureNotClosed();
TopicPartition tp = new TopicPartition(record.topic(), record.partition());
Expand All @@ -188,4 +198,4 @@ private void ensureNotClosed() {
if (closed)
throw new IllegalStateException("This consumer has already been closed.");
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.consumer;

import java.util.List;
import java.util.Objects;

public final class ShareAcknowledgementBatch {
private final long firstOffset;
private final long lastOffset;
private final List<Byte> acknowledgeTypes;

public ShareAcknowledgementBatch(long firstOffset, long lastOffset, List<Byte> acknowledgeTypes) {
if (lastOffset < firstOffset) {
throw new IllegalArgumentException("lastOffset cannot be smaller than firstOffset");
}
Objects.requireNonNull(acknowledgeTypes, "acknowledgeTypes cannot be null");
if (acknowledgeTypes.isEmpty()) {
throw new IllegalArgumentException("acknowledgeTypes cannot be empty");
}
long recordCount = lastOffset - firstOffset + 1;
if (acknowledgeTypes.size() != 1 && acknowledgeTypes.size() != recordCount) {
throw new IllegalArgumentException("acknowledgeTypes must contain one type or one type per offset");
}

this.firstOffset = firstOffset;
this.lastOffset = lastOffset;
this.acknowledgeTypes = List.copyOf(acknowledgeTypes);
}

public long firstOffset() {
return firstOffset;
}

public long lastOffset() {
return lastOffset;
}

public List<Byte> acknowledgeTypes() {
return acknowledgeTypes;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (!(o instanceof ShareAcknowledgementBatch)) return false;
ShareAcknowledgementBatch that = (ShareAcknowledgementBatch) o;
return firstOffset == that.firstOffset
&& lastOffset == that.lastOffset
&& acknowledgeTypes.equals(that.acknowledgeTypes);
}

@Override
public int hashCode() {
return Objects.hash(firstOffset, lastOffset, acknowledgeTypes);
}

@Override
public String toString() {
return "ShareAcknowledgementBatch(" +
"firstOffset=" + firstOffset +
", lastOffset=" + lastOffset +
", acknowledgeTypes=" + acknowledgeTypes +
")";
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.consumer;

import org.apache.kafka.common.TopicIdPartition;

import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;

public final class ShareAcknowledgements {
private final Map<TopicIdPartition, List<ShareAcknowledgementBatch>> acknowledgements;

public static ShareAcknowledgements empty() {
return new ShareAcknowledgements(Map.of());
}

public ShareAcknowledgements(Map<TopicIdPartition, List<ShareAcknowledgementBatch>> acknowledgements) {
Objects.requireNonNull(acknowledgements, "acknowledgements cannot be null");

Map<TopicIdPartition, List<ShareAcknowledgementBatch>> copy = new LinkedHashMap<>();
acknowledgements.forEach((topicIdPartition, batches) -> {
Objects.requireNonNull(topicIdPartition, "topicIdPartition cannot be null");
Objects.requireNonNull(batches, "acknowledgement batches cannot be null");
if (!batches.isEmpty()) {
copy.put(topicIdPartition, List.copyOf(batches));
}
});
this.acknowledgements = Map.copyOf(copy);
}

public boolean isEmpty() {
return acknowledgements.isEmpty();
}

public Map<TopicIdPartition, List<ShareAcknowledgementBatch>> acknowledgements() {
return acknowledgements;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (!(o instanceof ShareAcknowledgements)) return false;
ShareAcknowledgements that = (ShareAcknowledgements) o;
return acknowledgements.equals(that.acknowledgements);
}

@Override
public int hashCode() {
return acknowledgements.hashCode();
}

@Override
public String toString() {
return "ShareAcknowledgements(" + acknowledgements + ")";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -132,4 +132,14 @@ public interface ShareConsumer<K, V> extends Closeable {
* @see KafkaShareConsumer#wakeup()
*/
void wakeup();

/**
* @see KafkaShareConsumer#shareGroupMetadata()
*/
ShareGroupMetadata shareGroupMetadata();

/**
* @see KafkaShareConsumer#acknowledgementsForTransaction()
*/
ShareAcknowledgements acknowledgementsForTransaction();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.consumer;

import java.util.Objects;

/**
* Metadata for the share group member obtained via {@link ShareConsumer#shareGroupMetadata()}.
* Pass this to {@link org.apache.kafka.clients.producer.Producer#sendShareAcknowledgementsToTransaction}
* to atomically bind acknowledgments to a producer transaction (KIP-1289).
*/
public class ShareGroupMetadata {

private final String groupId;
private final String memberId;
private final int memberEpoch;

public ShareGroupMetadata(String groupId, String memberId, int memberEpoch) {
this.groupId = Objects.requireNonNull(groupId, "groupId cannot be null");
this.memberId = Objects.requireNonNull(memberId, "memberId cannot be null");
this.memberEpoch = memberEpoch;
}

public String groupId() {
return groupId;
}

public String memberId() {
return memberId;
}

public int memberEpoch() {
return memberEpoch;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
ShareGroupMetadata that = (ShareGroupMetadata) o;
return memberEpoch == that.memberEpoch
&& Objects.equals(groupId, that.groupId)
&& Objects.equals(memberId, that.memberId);
}

@Override
public int hashCode() {
return Objects.hash(groupId, memberId, memberEpoch);
}

@Override
public String toString() {
return "ShareGroupMetadata(groupId=" + groupId
+ ", memberId=" + memberId
+ ", memberEpoch=" + memberEpoch
+ ")";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import org.apache.kafka.clients.GroupRebalanceConfig;
import org.apache.kafka.clients.KafkaClient;
import org.apache.kafka.clients.consumer.AcknowledgeType;
import org.apache.kafka.clients.consumer.ShareAcknowledgements;
import org.apache.kafka.clients.consumer.ShareGroupMetadata;
import org.apache.kafka.clients.consumer.AcknowledgementCommitCallback;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
Expand All @@ -45,6 +47,7 @@
import org.apache.kafka.clients.consumer.internals.events.ShareAcknowledgementEventHandler;
import org.apache.kafka.clients.consumer.internals.events.ShareFetchEvent;
import org.apache.kafka.clients.consumer.internals.events.SharePollEvent;
import org.apache.kafka.clients.consumer.internals.events.ShareGroupMetadataEvent;
import org.apache.kafka.clients.consumer.internals.events.ShareSubscriptionChangeEvent;
import org.apache.kafka.clients.consumer.internals.events.ShareUnsubscribeEvent;
import org.apache.kafka.clients.consumer.internals.events.StopFindCoordinatorOnCloseEvent;
Expand Down Expand Up @@ -1114,6 +1117,37 @@ public void wakeup() {
wakeupTrigger.wakeup();
}

@Override
public ShareGroupMetadata shareGroupMetadata() {
acquireAndEnsureOpen();
try {
ShareGroupMetadataEvent event = new ShareGroupMetadataEvent(
calculateDeadlineMs(time.timer(defaultApiTimeoutMs)));
applicationEventHandler.add(event);
return processBackgroundEvents(event.future(),
time.timer(defaultApiTimeoutMs),
e -> false);
} finally {
release();
}
}

@Override
public ShareAcknowledgements acknowledgementsForTransaction() {
acquireAndEnsureOpen();
try {
handleCompletedAcknowledgements();
ensureExplicitAcknowledgement();
ensureInFlightAcknowledgedIfExplicitAcknowledgement();
if (currentFetch.hasRenewals()) {
throw new IllegalStateException("Renew acknowledgements cannot be sent to a transaction.");
}
return currentFetch.takeAcknowledgementsForTransaction();
} finally {
release();
}
}

/**
* Acquire the light lock and ensure that the consumer hasn't been closed.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
import org.apache.kafka.clients.consumer.AcknowledgeType;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaShareConsumer;
import org.apache.kafka.clients.consumer.ShareAcknowledgementBatch;
import org.apache.kafka.clients.consumer.ShareAcknowledgements;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.TopicPartition;

Expand All @@ -30,6 +32,7 @@
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;

/**
* {@link ShareFetch} represents the records fetched from the broker to be returned to the consumer
Expand Down Expand Up @@ -236,6 +239,24 @@ public Map<TopicIdPartition, NodeAcknowledgements> takeAcknowledgedRecords() {
return acknowledgementMap;
}

public ShareAcknowledgements takeAcknowledgementsForTransaction() {
Map<TopicIdPartition, List<ShareAcknowledgementBatch>> acknowledgementMap = new LinkedHashMap<>();
takeAcknowledgedRecords().forEach((tip, nodeAcknowledgements) -> {
List<ShareAcknowledgementBatch> acknowledgementBatches = nodeAcknowledgements.acknowledgements()
.getAcknowledgementBatches()
.stream()
.map(batch -> new ShareAcknowledgementBatch(
batch.firstOffset(),
batch.lastOffset(),
batch.acknowledgeTypes()))
.collect(Collectors.toList());
if (!acknowledgementBatches.isEmpty()) {
acknowledgementMap.put(tip, acknowledgementBatches);
}
});
return new ShareAcknowledgements(acknowledgementMap);
}

/**
* Handles completed renew acknowledgements by returning successfully renewed records
* to the set of in-flight records.
Expand Down
Loading