Skip to content

FLIP-573: [DRAFT] Queues for Kafka #271

Draft
Shekharrajak wants to merge 15 commits into
apache:mainfrom
Shekharrajak:flink-share-group-eos-tdd
Draft

FLIP-573: [DRAFT] Queues for Kafka #271
Shekharrajak wants to merge 15 commits into
apache:mainfrom
Shekharrajak:flink-share-group-eos-tdd

Conversation

@Shekharrajak

@Shekharrajak Shekharrajak commented Jun 20, 2026

Copy link
Copy Markdown

@Shekharrajak Shekharrajak marked this pull request as draft June 20, 2026 04:56
}

public long getTransactionOwnerId() {
return transactionOwnerId;

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

transactionOwnerId , transactionOwner epoch is actually producer id and epoch - we are trying to decouple the txn ownership from producer

boolean sinkCommitted =
committable.getCommitPhase()
== KafkaShareEosCommittable.CommitPhase.SINK_COMMITTED;
if (!sinkCommitted) {

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

commits sink transactions first, then share ACK transactions.

}

try {
shareAckCommitter.commit(committable.getShareAckCommittables());

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

recovery cases: if producer commit fails then ack commit will not be done.

this.commitPhase = commitPhase;
}

public static KafkaShareEosCommittable ready(

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

source can create it and committer can consume it.

}

@Override
public byte[] serialize(KafkaShareEosCommittable committable) throws IOException {

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

JM <-> TM sharing


@Timeout(240)
@ResourceLock("KafkaTestBase")
class KafkaShareEosPipelineITCase {

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It runs a Flink MiniCluster with 2 TMs, 4 slots, parallelism 4, 6-partition Kafka input/output topics, share source -> operator -> transactional
Kafka sink, and validates:

}

@Test
void testShareAckCommitOnCheckpoint() throws Exception {

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

basic source-side 2PC path: ACK is staged as TX_PENDING and only becomes terminal after the Kafka transaction commits.


waitForShareLag(admin, context.groupId, context.topicPartition, 1L);
ConsumerRecord<byte[], byte[]> redelivered = client.pollOne();
assertThat(redelivered.offset()).isEqualTo(record.offset());

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no-data-loss before checkpoint completion: if Flink fails before completing the checkpoint, aborting the ACK transaction makes the record available again.

client.acknowledgeAccept(first);
manager.stageAcknowledgements();

produce(context.bootstrapServers, context.topic, "second");

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since in flink sub task we can have multiple poll here we covers cross-poll ACK staging. Kafka drains acknowledgementsForTransaction(), so the connector must explicitly merge multiple poll batches by staging them into the active checkpoint
transaction.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Important Kafka client behavior:

ShareConsumerImpl.acknowledgementsForTransaction()
calls ShareFetch.takeAcknowledgementsForTransaction()
that drains current staged ACKs

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

produceToPartitions(
context.bootstrapServers, context.topic, partitionCount, recordsPerPartition);

MiniClusterWithClientResource miniCluster =

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Flink cluster

}

@Test
void testParallelSubtasksCommitMultiPartitionShareAcks() throws Exception {

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Intention is to cover distributed share consumption with multiple Flink subtasks and multiple Kafka partitions. Assignment is broker-side; Flink does not assign Kafka topic partitions itself.

@Shekharrajak Shekharrajak force-pushed the flink-share-group-eos-tdd branch from 4d8aa2e to 7aa3013 Compare June 21, 2026 06:33

Configuration flinkConfiguration = new Configuration();
flinkConfiguration.set(RestartStrategyOptions.RESTART_STRATEGY, "none");
MiniClusterWithClientResource miniCluster =

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is basically to setup like :

 MiniCluster: 2 TaskManagers, 2 slots each
  parallelism: 4
  input topic: 6 partitions, 30 records
  output topic: 6 partitions
  source -> rebalance -> map -> Kafka EOS sink

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the practical two-transaction EOS design: sink output commits first, then share ACKs are finalized. This prevents losing input records before their output is committed.

assertThat(shareCommits).doesNotHaveDuplicates();
assertThat(COMMITTED_SHARE_ACKS).hasSameSizeAs(shareCommits);
assertThat(SINK_COMMITTED_SHARE_ACKS).containsAll(COMMITTED_SHARE_ACKS);
for (String shareAckCommitKey : COMMITTED_SHARE_ACKS) {

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the system crashes after this point, every task can restart from the same consistent timeline.

CompleteCommit + Commit + retry epoch => Errors.NONE
CompleteCommit + Commit + current epoch => INVALID_TXN_STATE

the stored old epoch, not the new bumped epoch. So Kafka treats the second EndTxn as a retry of the same committed transaction and returns success.
if it fails in sink txn so it is idempotent

import java.util.function.Function;

@Internal
class SameTransactionShareAckKafkaWriter<IN>

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Name can be different but intention is - sink records and share acks are part of the same Kafka transaction, so one Kafka commit makes both visible/committed atomically.

.filter(kafkaRequest -> kafkaRequest.retry)
.findFirst();
if (retry.isPresent()) {
for (CommitRequest<KafkaCommittable> request : requests) {

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sink records and share acks are part of the same Kafka transaction, so one Kafka commit makes both visible/committed atomically.

throws IOException, InterruptedException;

void preCommit(ShareAckTransactionHandle transaction) throws IOException, InterruptedException;
String preCommit(ShareAckTransactionHandle transaction) throws IOException, InterruptedException;

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to return the prepared transaction state, and stored that value in KafkaShareAckTransactionManager snapshots.

try {
producer = writerProducer.orElseGet(() -> getProducer(committable));
producer.commitTransaction();
Optional<String> preparedTransactionState =

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

persisted PreparedTxnState in committables and recovery path in KafkaCommitter.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant