From d3c25b3403934ed891c23c967b2e58c09f545edd Mon Sep 17 00:00:00 2001 From: Jorge Esteban Quilcate Otoya Date: Fri, 12 Jun 2026 15:34:20 +0300 Subject: [PATCH 1/2] fix(inkless:controller): fix leader skew for managed diskless after rolling restart MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Three changes that together ensure balanced leadership for managed diskless topics: 1. Enable preferred leader rebalance: the previous blanket skip for all diskless topics was correct for unmanaged (RF=1) but wrong for managed (RF>1) where tiered storage upload/deletion are leader-only. 2. ISR = all replicas at creation/reassignment/addPartitions: matches unmanaged diskless semantics. Data is in object storage — broker fencing doesn't affect availability. 3. Expand ISR on broker unfence: when a broker returns, re-add it to ISR for diskless managed partitions where it is a replica. This repairs stale ISR from prior shrinks and enables the preferred leader election to redistribute leadership. Co-Authored-By: Claude Opus 4.6 --- .../controller/ReplicationControlManager.java | 152 +++++++++++------ .../ReplicationControlManagerTest.java | 161 ++++++++++++++++-- 2 files changed, 254 insertions(+), 59 deletions(-) diff --git a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java index 9b3ba2d6b1..127762fc62 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java +++ b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java @@ -113,6 +113,7 @@ import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; @@ -583,9 +584,7 @@ public void replay(PartitionRecord record) { isReassignmentInProgress(prevPartInfo), isReassignmentInProgress(newPartInfo)); } - // Diskless topics are excluded: the metadata transformer handles leader routing, - // so tracking preferred leader imbalance is unnecessary. - if (!isDisklessTopic(topicInfo.name)) { + if (shouldTrackPreferredLeader(topicInfo.name)) { if (newPartInfo.hasPreferredLeader()) { imbalancedPartitions.remove(new TopicIdPartition(record.topicId(), record.partitionId())); } else { @@ -633,7 +632,7 @@ public void replay(PartitionChangeRecord record) { record.topicId(); newPartitionInfo.maybeLogPartitionChange(log, topicPart, prevPartitionInfo); - if (!isDisklessTopic(topicInfo.name)) { + if (shouldTrackPreferredLeader(topicInfo.name)) { if (newPartitionInfo.hasPreferredLeader()) { imbalancedPartitions.remove(new TopicIdPartition(record.topicId(), record.partitionId())); } else { @@ -927,13 +926,19 @@ private ApiError createTopic(ControllerRequestContext context, PartitionAssignment partitionAssignment = new PartitionAssignment(assignment.brokerIds(), clusterDescriber); validateManualPartitionAssignment(partitionAssignment, replicationFactor); replicationFactor = OptionalInt.of(assignment.brokerIds().size()); - List isr = assignment.brokerIds().stream(). - filter(clusterControl::isActive).toList(); - if (isr.isEmpty()) { + // At least one active broker is required for initial leader election, + // even for diskless (data is in object storage, but clients need a leader to connect to). + if (assignment.brokerIds().stream().noneMatch(clusterControl::isActive)) { return new ApiError(Errors.INVALID_REPLICA_ASSIGNMENT, "All brokers specified in the manual partition assignment for " + "partition " + assignment.partitionIndex() + " are fenced or in controlled shutdown."); } + // For diskless: ISR = all replicas (data in object storage, fencing doesn't affect availability). + // Active replicas first so buildPartitionRegistration picks an active leader via isr.get(0). + // For classic: ISR = active replicas only. + List isr = disklessEnabled + ? assignment.brokerIds().stream().sorted(activeFirstComparator()).toList() + : assignment.brokerIds().stream().filter(clusterControl::isActive).toList(); newParts.put( assignment.partitionIndex(), buildPartitionRegistration(partitionAssignment, isr) @@ -977,13 +982,14 @@ private ApiError createTopic(ControllerRequestContext context, numPartitions, replicationFactor ), clusterDescriber); - brokerFilter = clusterControl::isActive; + // For diskless (managed or not): ISR = all replicas regardless of fenced state. + // Data lives in object storage, so broker fencing doesn't affect availability. + brokerFilter = disklessEnabled ? x -> true : clusterControl::isActive; } else { topicAssignment = createDisklessAssignment(numPartitions); if (topicAssignment == null) { return new ApiError(Errors.BROKER_NOT_AVAILABLE, "No brokers available to create diskless topic."); } - // For diskless, it doesn't matter if a broker is fenced of not. brokerFilter = x -> true; } @@ -1442,7 +1448,7 @@ ControllerResult alterPartition( partition, topic.id, partitionId, - new LeaderAcceptor(clusterControl, partition), + leaderAcceptorFor(topic.name, partition), featureControl.metadataVersionOrThrow(), getTopicEffectiveMinIsr(topic.name) ) @@ -1868,6 +1874,37 @@ void handleBrokerUnfenced(int brokerId, long brokerEpoch, List records) { + int expanded = 0; + for (TopicControlInfo topic : topics.values()) { + if (!isDisklessTopic(topic.name)) continue; + for (var entry : topic.parts.entrySet()) { + int partitionId = entry.getKey(); + PartitionRegistration partition = entry.getValue(); + if (!Replicas.contains(partition.replicas, brokerId)) continue; + if (Replicas.contains(partition.isr, brokerId)) continue; + // Broker is a replica but not in ISR — expand ISR + int[] newIsr = Replicas.copyWith(partition.isr, brokerId); + PartitionChangeRecord changeRecord = new PartitionChangeRecord() + .setTopicId(topic.id) + .setPartitionId(partitionId) + .setIsr(Replicas.toList(newIsr)); + records.add(new ApiMessageAndVersion(changeRecord, (short) 0)); + expanded++; + } + } + if (expanded > 0) { + log.info("handleBrokerUnfenced: expanded ISR for {} diskless managed partition(s) " + + "to include broker {}", expanded, brokerId); + } } /** @@ -2038,7 +2075,7 @@ ApiError electLeader(String topic, int partitionId, ElectionType electionType, partition, topicId, partitionId, - new LeaderAcceptor(clusterControl, partition), + leaderAcceptorFor(topic, partition), featureControl.metadataVersionOrThrow(), getTopicEffectiveMinIsr(topic) ) @@ -2191,13 +2228,7 @@ void maybeTriggerLeaderChangeForPartitionsWithoutPreferredLeader( continue; } - // Skip diskless topics: the metadata transformer handles leader routing, - // so controller-level preferred leader election is unnecessary. - // After reassignment, the leader may not match the preferred replica (replicas[0]) - // because PartitionChangeBuilder.electLeader() prefers keeping the current leader. - // This is intentional — the metadata transformer routes requests independently of - // the preferred replica order. - if (isDisklessTopic(topic.name)) { + if (!shouldTrackPreferredLeader(topic.name)) { continue; } @@ -2207,12 +2238,12 @@ void maybeTriggerLeaderChangeForPartitionsWithoutPreferredLeader( continue; } - // Attempt to perform a preferred leader election + // Attempt to perform a preferred leader election. new PartitionChangeBuilder( partition, topicPartition.topicId(), topicPartition.partitionId(), - new LeaderAcceptor(clusterControl, partition), + leaderAcceptorFor(topic.name, partition), featureControl.metadataVersionOrThrow(), getTopicEffectiveMinIsr(topic.name) ) @@ -2348,6 +2379,13 @@ void createPartitions(ControllerRequestContext context, List partitionAssignments; List> isrs; + // For diskless (managed or not), ISR includes all replicas regardless of fenced state. + // Data lives in object storage, so broker fencing doesn't affect data availability. + boolean isDiskless = isDisklessTopic(topic.name()); + Predicate brokerFilter = isDiskless + ? x -> true + : clusterControl::isActive; + if (topic.assignments() != null) { partitionAssignments = new ArrayList<>(); isrs = new ArrayList<>(); @@ -2356,13 +2394,15 @@ void createPartitions(ControllerRequestContext context, PartitionAssignment partitionAssignment = new PartitionAssignment(replicas, clusterDescriber); validateManualPartitionAssignment(partitionAssignment, OptionalInt.of(replicationFactor)); partitionAssignments.add(partitionAssignment); - List isr = partitionAssignment.replicas().stream(). - filter(clusterControl::isActive).toList(); - if (isr.isEmpty()) { + // At least one active broker required for initial leader election + if (replicas.stream().noneMatch(clusterControl::isActive)) { throw new InvalidReplicaAssignmentException( "All brokers specified in the manual partition assignment for " + "partition " + (startPartitionId + i) + " are fenced or in controlled shutdown."); } + List isr = isDiskless + ? partitionAssignment.replicas().stream().sorted(activeFirstComparator()).toList() + : partitionAssignment.replicas().stream().filter(brokerFilter).toList(); isrs.add(isr); } } else { @@ -2372,13 +2412,6 @@ void createPartitions(ControllerRequestContext context, ).assignments(); isrs = partitionAssignments.stream().map(PartitionAssignment::replicas).toList(); } - // For unmanaged diskless, ISR includes all replicas regardless of fenced state — - // consistent with topic creation. Data lives in object storage, so broker fencing - // doesn't affect data availability. - boolean isDiskless = isDisklessTopic(topic.name()); - Predicate brokerFilter = (isDiskless && !isDisklessManagedReplicasEnabled) - ? x -> true - : clusterControl::isActive; int partitionId = startPartitionId; for (int i = 0; i < partitionAssignments.size(); i++) { @@ -2494,7 +2527,7 @@ void generateLeaderAndIsrUpdates(String context, partition, topicIdPart.topicId(), topicIdPart.partitionId(), - new LeaderAcceptor(clusterControl, partition, isAcceptableLeader), + leaderAcceptorFor(topic.name, partition, isAcceptableLeader), featureControl.metadataVersionOrThrow(), getTopicEffectiveMinIsr(topic.name) ); @@ -2615,7 +2648,7 @@ Optional cancelPartitionReassignment(String topicName, part, tp.topicId(), tp.partitionId(), - new LeaderAcceptor(clusterControl, part), + leaderAcceptorFor(topicName, part), featureControl.metadataVersionOrThrow(), getTopicEffectiveMinIsr(topicName) ); @@ -2678,22 +2711,16 @@ Optional changePartitionReassignment(TopicIdPartition tp, PartitionReassignmentReplicas reassignment = new PartitionReassignmentReplicas(currentAssignment, targetAssignment); - boolean isDiskless = isDisklessTopic(topics.get(tp.topicId()).name); - - // Diskless topics don't use local directories — skip the directory check in leader election. - // Any active (unfenced, not in controlled shutdown) broker can lead a diskless partition, - // since it reads all data from object storage rather than local disk. - IntPredicate leaderAcceptor = isDiskless - ? clusterControl::isActive - : new LeaderAcceptor(clusterControl, part); + String topicName = topics.get(tp.topicId()).name; + boolean isDiskless = isDisklessTopic(topicName); PartitionChangeBuilder builder = new PartitionChangeBuilder( part, tp.topicId(), tp.partitionId(), - leaderAcceptor, + leaderAcceptorFor(topicName, part), featureControl.metadataVersionOrThrow(), - getTopicEffectiveMinIsr(topics.get(tp.topicId()).name) + getTopicEffectiveMinIsr(topicName) ); builder.setEligibleLeaderReplicasEnabled(featureControl.isElrFeatureEnabled()); @@ -2714,15 +2741,16 @@ Optional changePartitionReassignment(TopicIdPartition tp, // storage. A future optimization could pre-warm target brokers' caches before // completing the reassignment. if (!target.replicas().equals(currentReplicas)) { - List activeIsr = target.replicas().stream() - .filter(clusterControl::isActive) - .toList(); - if (activeIsr.isEmpty()) { + // Reject if no target broker is active (can't elect a leader). + boolean anyActive = target.replicas().stream() + .anyMatch(clusterControl::isActive); + if (!anyActive) { throw new InvalidReplicaAssignmentException( "None of the target replicas " + target.replicas() + " are active."); } + // ISR = all replicas: data is in object storage, fencing doesn't affect availability. builder.setTargetReplicas(target.replicas()); - builder.setTargetIsr(activeIsr); + builder.setTargetIsr(target.replicas()); } } else { if (!reassignment.replicas().equals(currentReplicas)) { @@ -2804,7 +2832,7 @@ ControllerResult handleAssignReplicasToDirs(As partitionRegistration, topicId, partitionIndex, - new LeaderAcceptor(clusterControl, partitionRegistration), + leaderAcceptorFor(topicName, partitionRegistration), featureControl.metadataVersionOrThrow(), getTopicEffectiveMinIsr(topicName) ) @@ -3046,6 +3074,34 @@ public String toString() { } } + // Classic topics always participate in preferred leader balancing. Diskless topics only + // participate when the managed-replicas flag is enabled — without it, the metadata + // transformer handles leader routing and there's no multi-replica leadership to balance. + private boolean shouldTrackPreferredLeader(String topicName) { + return !isDisklessTopic(topicName) || isDisklessManagedReplicasEnabled; + } + + // Active brokers sort first so buildPartitionRegistration picks an active leader via isr.get(0). + private Comparator activeFirstComparator() { + return Comparator.comparingInt(b -> clusterControl.isActive(b) ? 0 : 1); + } + + /** + * Returns the appropriate leader acceptor for the given topic. Diskless topics skip the + * directory-liveness check since they don't use local storage. + */ + private IntPredicate leaderAcceptorFor(String topicName, PartitionRegistration partition) { + return isDisklessTopic(topicName) + ? clusterControl::isActive + : new LeaderAcceptor(clusterControl, partition); + } + + private IntPredicate leaderAcceptorFor(String topicName, PartitionRegistration partition, IntPredicate isAcceptableLeader) { + return isDisklessTopic(topicName) + ? isAcceptableLeader + : new LeaderAcceptor(clusterControl, partition, isAcceptableLeader); + } + private static final class LeaderAcceptor implements IntPredicate { private final ClusterControlManager clusterControl; private final PartitionRegistration partition; diff --git a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java index ba61330bd2..0613da2cf2 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java +++ b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java @@ -5262,7 +5262,7 @@ public void testReassignDisklessPartitions() { } @Test - public void testReassignDisklessPartitionsToFencedBrokerExcludesFromIsr() { + public void testReassignDisklessPartitionsToFencedBrokerIncludesInIsr() { MetadataVersion metadataVersion = MetadataVersion.latestTesting(); ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder() .setMetadataVersion(metadataVersion) @@ -5296,10 +5296,10 @@ public void testReassignDisklessPartitionsToFencedBrokerExcludesFromIsr() { ctx.replay(alterResult.records()); - // Verify replicas include both, but ISR only includes the active broker + // Verify replicas include both, and ISR includes all replicas (diskless: data in object storage) PartitionRegistration partition = replication.getPartition(createResult.topicId(), 0); assertEquals(List.of(1, 2), Replicas.toList(partition.replicas)); - assertEquals(List.of(1), Replicas.toList(partition.isr)); + assertEquals(List.of(1, 2), Replicas.toList(partition.isr)); } @Test @@ -5337,7 +5337,48 @@ public void testReassignDisklessPartitionsToAllFencedBrokersIsRejected() { } @Test - public void testPeriodicLeaderBalancingSkipsDisklessTopics() { + public void testPeriodicLeaderBalancingSkipsUnmanagedDisklessTopics() { + MetadataVersion metadataVersion = MetadataVersion.latestTesting(); + ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder() + .setMetadataVersion(metadataVersion) + .setDisklessStorageSystemEnabled(true) + .setDisklessManagedReplicasEnabled(false) + .build(); + + ReplicationControlManager replication = ctx.replicationControl; + ctx.registerBrokers(0, 1); + ctx.unfenceBrokers(0, 1); + + // Create a diskless topic with RF=1 (unmanaged — no manual assignment allowed) + String disklessTopic = "diskless-foo"; + ctx.createTestTopic(disklessTopic, 1, (short) 1, + Map.of(DISKLESS_ENABLE_CONFIG, "true"), NONE.code()); + + // Create an imbalanced classic topic to prove the balancer is functional + CreatableTopicResult classicResult = ctx.createTestTopic( + "classic-foo", new int[][] {new int[] {0, 1}}); + // Reassign to [1, 0] — makes it imbalanced (preferred=1, leader=0) + ControllerResult alterResult = + replication.alterPartitionReassignments( + new AlterPartitionReassignmentsRequestData().setTopics(List.of( + new ReassignableTopic().setName("classic-foo").setPartitions(List.of( + new ReassignablePartition().setPartitionIndex(0).setReplicas(List.of(1, 0))))))); + ctx.replay(alterResult.records()); + + // Balancer produces records for the imbalanced classic topic but NOT for unmanaged diskless + ControllerResult balanceResult = replication.maybeBalancePartitionLeaders(); + assertEquals(1, balanceResult.records().size(), + "Balancer should produce exactly one record (classic topic only, not diskless)"); + + // Verify only the classic topic was rebalanced + ctx.replay(balanceResult.records()); + PartitionRegistration classicPartition = replication.getPartition(classicResult.topicId(), 0); + assertEquals(1, classicPartition.leader, + "Classic topic leader should move to preferred replica"); + } + + @Test + public void testPeriodicLeaderBalancingRebalancesManagedDisklessTopics() { MetadataVersion metadataVersion = MetadataVersion.latestTesting(); ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder() .setMetadataVersion(metadataVersion) @@ -5364,17 +5405,65 @@ public void testPeriodicLeaderBalancingSkipsDisklessTopics() { new ReassignablePartition().setPartitionIndex(0).setReplicas(List.of(1, 0))))))); ctx.replay(alterResult.records()); - // Verify partition has preferred replica 1 but leader 0 (imbalanced from classic perspective) + // Verify partition has preferred replica 1 but leader 0 (imbalanced) PartitionRegistration partition = replication.getPartition(createResult.topicId(), 0); assertEquals(List.of(1, 0), Replicas.toList(partition.replicas)); assertEquals(0, partition.leader); assertFalse(partition.hasPreferredLeader(), "Leader should not be the preferred replica after reassignment"); - // Periodic leader balancing should produce NO records for diskless topics + // Periodic leader balancing SHOULD rebalance managed diskless topics ControllerResult balanceResult = replication.maybeBalancePartitionLeaders(); - assertTrue(balanceResult.records().isEmpty(), - "Periodic leader balancing should skip diskless topics"); + assertFalse(balanceResult.records().isEmpty(), + "Periodic leader balancing should rebalance managed diskless topics"); + + // Replay balance records and verify leader moved to preferred replica + ctx.replay(balanceResult.records()); + PartitionRegistration balanced = replication.getPartition(createResult.topicId(), 0); + assertEquals(1, balanced.leader, + "Leader should have moved to preferred replica after balancing"); + assertTrue(balanced.hasPreferredLeader(), + "Partition should now have preferred leader after balancing"); + } + + @Test + public void testManagedDisklessIsrExpandsOnBrokerUnfenced() { + MetadataVersion metadataVersion = MetadataVersion.latestTesting(); + ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder() + .setMetadataVersion(metadataVersion) + .setDisklessStorageSystemEnabled(true) + .setDisklessManagedReplicasEnabled(true) + .build(); + + ReplicationControlManager replication = ctx.replicationControl; + ctx.registerBrokers(0, 1, 2); + ctx.unfenceBrokers(0, 1, 2); + + // Create a diskless topic with RF=3 + String disklessTopic = "diskless-foo"; + CreatableTopicResult createResult = ctx.createTestTopic( + disklessTopic, new int[][] {new int[] {0, 1, 2}}, + Map.of(DISKLESS_ENABLE_CONFIG, "true"), (short) 0); + + // Verify initial ISR = [0, 1, 2] + PartitionRegistration partition = replication.getPartition(createResult.topicId(), 0); + assertEquals(List.of(0, 1, 2), Replicas.toList(partition.isr)); + assertEquals(0, partition.leader); + + // Fence broker 0 (current leader) — ISR shrinks, leader moves + ctx.fenceBrokers(0); + partition = replication.getPartition(createResult.topicId(), 0); + assertEquals(2, partition.isr.length, "ISR should shrink on fencing"); + assertNotEquals(0, partition.leader, "Leader should move away from fenced broker"); + + // Unfence broker 0 — ISR should expand back to include all replicas + ctx.unfenceBrokers(0, 1, 2); + partition = replication.getPartition(createResult.topicId(), 0); + assertEquals(3, partition.isr.length, + "ISR should expand back to all replicas after unfencing for diskless managed topics"); + assertTrue(Replicas.contains(partition.isr, 0), "Broker 0 should be back in ISR"); + assertTrue(Replicas.contains(partition.isr, 1), "Broker 1 should be in ISR"); + assertTrue(Replicas.contains(partition.isr, 2), "Broker 2 should be in ISR"); } @Test @@ -5449,7 +5538,7 @@ public void testAddPartitionsManualAssignment() { } @Test - public void testAddPartitionsIsrExcludesFencedBrokers() { + public void testAddPartitionsIsrIncludesFencedBrokersForDiskless() { MetadataVersion metadataVersion = MetadataVersion.latestTesting(); ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder() .setMetadataVersion(metadataVersion) @@ -5478,10 +5567,60 @@ public void testAddPartitionsIsrExcludesFencedBrokers() { assertEquals(NONE.code(), addResult.response().get(0).errorCode()); ctx.replay(addResult.records()); - // Replicas include both, but ISR only includes the active broker + // For diskless topics, ISR includes all replicas regardless of fenced state PartitionRegistration partition = replication.getPartition(createResult.topicId(), 1); assertEquals(List.of(1, 2), Replicas.toList(partition.replicas)); - assertEquals(List.of(1), Replicas.toList(partition.isr)); + assertEquals(List.of(1, 2), Replicas.toList(partition.isr)); + } + + @Test + public void testAddPartitionsRejectsAllFencedBrokersForDiskless() { + MetadataVersion metadataVersion = MetadataVersion.latestTesting(); + ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder() + .setMetadataVersion(metadataVersion) + .setDisklessStorageSystemEnabled(true) + .setDisklessManagedReplicasEnabled(true) + .build(); + + ReplicationControlManager replication = ctx.replicationControl; + ctx.registerBrokers(0, 1, 2); + ctx.unfenceBrokers(0, 1, 2); + + String topic = "foo"; + ctx.createTestTopic(topic, new int[][] {new int[] {0, 1}}, + Map.of(DISKLESS_ENABLE_CONFIG, "true"), (short) 0); + + // Fence brokers 1 and 2 + ctx.fenceBrokers(1, 2); + + // Add partition with manual assignment where ALL target brokers are fenced + ControllerRequestContext requestContext = anonymousContextFor(ApiKeys.CREATE_PARTITIONS); + ControllerResult> addResult = + replication.createPartitions(requestContext, List.of( + new CreatePartitionsTopic().setName(topic).setCount(2).setAssignments(List.of( + new CreatePartitionsAssignment().setBrokerIds(List.of(1, 2)))))); + assertEquals(INVALID_REPLICA_ASSIGNMENT.code(), addResult.response().get(0).errorCode(), + "Should reject when all target brokers are fenced — no active leader possible"); + } + + @Test + public void testCreateTopicManualAssignmentRejectsAllFencedBrokersForDiskless() { + MetadataVersion metadataVersion = MetadataVersion.latestTesting(); + ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder() + .setMetadataVersion(metadataVersion) + .setDisklessStorageSystemEnabled(true) + .setDisklessManagedReplicasEnabled(true) + .build(); + + ctx.registerBrokers(0, 1, 2); + ctx.unfenceBrokers(0, 1, 2); + + // Fence brokers 1 and 2 + ctx.fenceBrokers(1, 2); + + // Create topic with manual assignment where all brokers are fenced — should be rejected + ctx.createTestTopic("bar", new int[][] {new int[] {1, 2}}, + Map.of(DISKLESS_ENABLE_CONFIG, "true"), INVALID_REPLICA_ASSIGNMENT.code()); } @Test From 0cb900649e27bca530287a73ece7af0a170e75c2 Mon Sep 17 00:00:00 2001 From: Jorge Esteban Quilcate Otoya Date: Tue, 30 Jun 2026 19:33:45 +0300 Subject: [PATCH 2/2] fixup! fix(inkless:controller): fix leader skew for managed diskless after rolling restart --- .../controller/ReplicationControlManager.java | 27 +++++++++---- .../ReplicationControlManagerTest.java | 38 +++++++++++++++++++ 2 files changed, 57 insertions(+), 8 deletions(-) diff --git a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java index 127762fc62..364fbd3bbe 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java +++ b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java @@ -1891,14 +1891,25 @@ private void expandIsrForDisklessManagedPartitions(int brokerId, List record = new PartitionChangeBuilder( + partition, + topic.id, + partitionId, + leaderAcceptorFor(topic.name, partition), + featureControl.metadataVersionOrThrow(), + getTopicEffectiveMinIsr(topic.name) + ) + .setEligibleLeaderReplicasEnabled(featureControl.isElrFeatureEnabled()) + .setTargetIsr(Replicas.toList(Replicas.copyWith(partition.isr, brokerId))) + .setDefaultDirProvider(clusterDescriber) + .build(); + if (record.isPresent()) { + records.add(record.get()); + expanded++; + } } } if (expanded > 0) { diff --git a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java index 0613da2cf2..405ff71c60 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java +++ b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java @@ -5943,6 +5943,44 @@ void testCreateDisklessTopicWithRFGreaterThanOneRejectedWhenManagedDisabled() { assertEquals(Errors.INVALID_REPLICATION_FACTOR.code(), result.response().topics().find("foo").errorCode()); } + + @Test + public void testUnfenceExpandsIsrAndClearsElr() { + // Regression test: expandIsrForDisklessManagedPartitions must reconcile ELR so that + // a broker added back to ISR on unfence is removed from ELR (ISR ∩ ELR = ∅, KIP-966). + MetadataVersion metadataVersion = MetadataVersion.latestTesting(); + ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder() + .setMetadataVersion(metadataVersion) + .setIsElrEnabled(true) + .setDisklessStorageSystemEnabled(true) + .setDisklessManagedReplicasEnabled(true) + .build(); + + ReplicationControlManager replication = ctx.replicationControl; + ctx.registerBrokers(0, 1, 2); + ctx.unfenceBrokers(0, 1, 2); + + // Create a diskless topic with RF=3 and minISR=3 so any fencing drives ISR below minISR. + CreatableTopicResult createResult = ctx.createTestTopic("foo", + new int[][] {new int[] {0, 1, 2}}, Map.of(DISKLESS_ENABLE_CONFIG, "true"), (short) 0); + ctx.alterTopicConfig("foo", TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "3"); + Uuid topicId = createResult.topicId(); + + // Fence broker 2: ISR shrinks to {0, 1}, ELR gets {2} (ISR < minISR=3). + ctx.fenceBrokers(2); + PartitionRegistration afterFence = replication.getPartition(topicId, 0); + assertFalse(Replicas.contains(afterFence.isr, 2), "broker 2 must be out of ISR after fencing"); + assertTrue(Replicas.contains(afterFence.elr, 2), "broker 2 must be in ELR after fencing"); + + // Unfence broker 2: expandIsrForDisklessManagedPartitions fires. + ctx.unfenceBrokers(2); + PartitionRegistration afterUnfence = replication.getPartition(topicId, 0); + + // ISR must contain broker 2 again. + assertTrue(Replicas.contains(afterUnfence.isr, 2), "broker 2 must be back in ISR after unfencing"); + // ELR must NOT contain broker 2 — ISR ∩ ELR = ∅ invariant (KIP-966). + assertFalse(Replicas.contains(afterUnfence.elr, 2), "broker 2 must not be in ELR after ISR expansion"); + } } @Nested