Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
Expand Down Expand Up @@ -583,9 +584,7 @@ public void replay(PartitionRecord record) {
isReassignmentInProgress(prevPartInfo), isReassignmentInProgress(newPartInfo));
}

// Diskless topics are excluded: the metadata transformer handles leader routing,
// so tracking preferred leader imbalance is unnecessary.
if (!isDisklessTopic(topicInfo.name)) {
if (shouldTrackPreferredLeader(topicInfo.name)) {
if (newPartInfo.hasPreferredLeader()) {
imbalancedPartitions.remove(new TopicIdPartition(record.topicId(), record.partitionId()));
} else {
Expand Down Expand Up @@ -633,7 +632,7 @@ public void replay(PartitionChangeRecord record) {
record.topicId();
newPartitionInfo.maybeLogPartitionChange(log, topicPart, prevPartitionInfo);

if (!isDisklessTopic(topicInfo.name)) {
if (shouldTrackPreferredLeader(topicInfo.name)) {
if (newPartitionInfo.hasPreferredLeader()) {
imbalancedPartitions.remove(new TopicIdPartition(record.topicId(), record.partitionId()));
} else {
Expand Down Expand Up @@ -927,13 +926,19 @@ private ApiError createTopic(ControllerRequestContext context,
PartitionAssignment partitionAssignment = new PartitionAssignment(assignment.brokerIds(), clusterDescriber);
validateManualPartitionAssignment(partitionAssignment, replicationFactor);
replicationFactor = OptionalInt.of(assignment.brokerIds().size());
List<Integer> isr = assignment.brokerIds().stream().
filter(clusterControl::isActive).toList();
if (isr.isEmpty()) {
// At least one active broker is required for initial leader election,
// even for diskless (data is in object storage, but clients need a leader to connect to).
if (assignment.brokerIds().stream().noneMatch(clusterControl::isActive)) {
return new ApiError(Errors.INVALID_REPLICA_ASSIGNMENT,
"All brokers specified in the manual partition assignment for " +
"partition " + assignment.partitionIndex() + " are fenced or in controlled shutdown.");
}
// For diskless: ISR = all replicas (data in object storage, fencing doesn't affect availability).
// Active replicas first so buildPartitionRegistration picks an active leader via isr.get(0).
// For classic: ISR = active replicas only.
List<Integer> isr = disklessEnabled
? assignment.brokerIds().stream().sorted(activeFirstComparator()).toList()
: assignment.brokerIds().stream().filter(clusterControl::isActive).toList();
Comment thread
jeqo marked this conversation as resolved.
newParts.put(
assignment.partitionIndex(),
buildPartitionRegistration(partitionAssignment, isr)
Expand Down Expand Up @@ -977,13 +982,14 @@ private ApiError createTopic(ControllerRequestContext context,
numPartitions,
replicationFactor
), clusterDescriber);
brokerFilter = clusterControl::isActive;
// For diskless (managed or not): ISR = all replicas regardless of fenced state.
// Data lives in object storage, so broker fencing doesn't affect availability.
brokerFilter = disklessEnabled ? x -> true : clusterControl::isActive;
} else {
topicAssignment = createDisklessAssignment(numPartitions);
Comment thread
jeqo marked this conversation as resolved.
if (topicAssignment == null) {
return new ApiError(Errors.BROKER_NOT_AVAILABLE, "No brokers available to create diskless topic.");
}
// For diskless, it doesn't matter if a broker is fenced of not.
brokerFilter = x -> true;
}

Expand Down Expand Up @@ -1442,7 +1448,7 @@ ControllerResult<AlterPartitionResponseData> alterPartition(
partition,
topic.id,
partitionId,
new LeaderAcceptor(clusterControl, partition),
leaderAcceptorFor(topic.name, partition),
featureControl.metadataVersionOrThrow(),
getTopicEffectiveMinIsr(topic.name)
)
Expand Down Expand Up @@ -1868,6 +1874,37 @@ void handleBrokerUnfenced(int brokerId, long brokerEpoch, List<ApiMessageAndVers
(short) 0));
generateLeaderAndIsrUpdates("handleBrokerUnfenced", NO_LEADER, brokerId, NO_LEADER, records,
brokersToIsrs.partitionsWithNoLeader());

if (isDisklessManagedReplicasEnabled) {
expandIsrForDisklessManagedPartitions(brokerId, records);
}
}

// Full scan of all topics/partitions is acceptable: broker unfence is a rare state transition
// (not a hot path) and the per-partition work is O(1) array membership checks.
private void expandIsrForDisklessManagedPartitions(int brokerId, List<ApiMessageAndVersion> records) {
int expanded = 0;
for (TopicControlInfo topic : topics.values()) {
if (!isDisklessTopic(topic.name)) continue;
for (var entry : topic.parts.entrySet()) {
int partitionId = entry.getKey();
PartitionRegistration partition = entry.getValue();
Comment thread
jeqo marked this conversation as resolved.
if (!Replicas.contains(partition.replicas, brokerId)) continue;
Comment thread
jeqo marked this conversation as resolved.
if (Replicas.contains(partition.isr, brokerId)) continue;
// Broker is a replica but not in ISR — expand ISR
int[] newIsr = Replicas.copyWith(partition.isr, brokerId);
PartitionChangeRecord changeRecord = new PartitionChangeRecord()
.setTopicId(topic.id)
.setPartitionId(partitionId)
.setIsr(Replicas.toList(newIsr));

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This sets ISR via a raw PartitionChangeRecord instead of PartitionChangeBuilder, so ELR is never reconciled. If ELR is enabled (and it is default for 4.1+) and ISR previously shrank below min.insync.replicas, the returning broker is in elr. So this adds it back to ISR without removing it from ELR or clearing ELR once ISR ≥ minISR so it violates one of the invariants from ELR KIP that says that ELR can't be ISR.

This might be a bit irrelevant for diskless at all but I wonder if we should maintain that.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch! Even though the scenario is unlikely, the invariant violation is real. I'll switch to PartitionChangeBuilder to get proper ELR cleanup when ISR expands back.

records.add(new ApiMessageAndVersion(changeRecord, (short) 0));
expanded++;
}
}
if (expanded > 0) {
log.info("handleBrokerUnfenced: expanded ISR for {} diskless managed partition(s) " +
"to include broker {}", expanded, brokerId);
}
}

/**
Expand Down Expand Up @@ -2038,7 +2075,7 @@ ApiError electLeader(String topic, int partitionId, ElectionType electionType,
partition,
topicId,
partitionId,
new LeaderAcceptor(clusterControl, partition),
leaderAcceptorFor(topic, partition),
featureControl.metadataVersionOrThrow(),
getTopicEffectiveMinIsr(topic)
)
Expand Down Expand Up @@ -2191,13 +2228,7 @@ void maybeTriggerLeaderChangeForPartitionsWithoutPreferredLeader(
continue;
}

// Skip diskless topics: the metadata transformer handles leader routing,
// so controller-level preferred leader election is unnecessary.
// After reassignment, the leader may not match the preferred replica (replicas[0])
// because PartitionChangeBuilder.electLeader() prefers keeping the current leader.
// This is intentional — the metadata transformer routes requests independently of
// the preferred replica order.
if (isDisklessTopic(topic.name)) {
if (!shouldTrackPreferredLeader(topic.name)) {
continue;
}

Expand All @@ -2207,12 +2238,12 @@ void maybeTriggerLeaderChangeForPartitionsWithoutPreferredLeader(
continue;
}

// Attempt to perform a preferred leader election
// Attempt to perform a preferred leader election.
new PartitionChangeBuilder(
Comment thread
jeqo marked this conversation as resolved.
partition,
topicPartition.topicId(),
topicPartition.partitionId(),
new LeaderAcceptor(clusterControl, partition),
leaderAcceptorFor(topic.name, partition),
featureControl.metadataVersionOrThrow(),
getTopicEffectiveMinIsr(topic.name)
)
Expand Down Expand Up @@ -2348,6 +2379,13 @@ void createPartitions(ControllerRequestContext context,

List<PartitionAssignment> partitionAssignments;
List<List<Integer>> isrs;
// For diskless (managed or not), ISR includes all replicas regardless of fenced state.
// Data lives in object storage, so broker fencing doesn't affect data availability.
boolean isDiskless = isDisklessTopic(topic.name());
Predicate<Integer> brokerFilter = isDiskless
? x -> true
: clusterControl::isActive;

if (topic.assignments() != null) {
partitionAssignments = new ArrayList<>();
isrs = new ArrayList<>();
Expand All @@ -2356,13 +2394,15 @@ void createPartitions(ControllerRequestContext context,
PartitionAssignment partitionAssignment = new PartitionAssignment(replicas, clusterDescriber);
validateManualPartitionAssignment(partitionAssignment, OptionalInt.of(replicationFactor));
partitionAssignments.add(partitionAssignment);
List<Integer> isr = partitionAssignment.replicas().stream().
filter(clusterControl::isActive).toList();
if (isr.isEmpty()) {
// At least one active broker required for initial leader election
if (replicas.stream().noneMatch(clusterControl::isActive)) {
throw new InvalidReplicaAssignmentException(
"All brokers specified in the manual partition assignment for " +
"partition " + (startPartitionId + i) + " are fenced or in controlled shutdown.");
}
List<Integer> isr = isDiskless
? partitionAssignment.replicas().stream().sorted(activeFirstComparator()).toList()
: partitionAssignment.replicas().stream().filter(brokerFilter).toList();
isrs.add(isr);
}
} else {
Expand All @@ -2372,13 +2412,6 @@ void createPartitions(ControllerRequestContext context,
).assignments();
isrs = partitionAssignments.stream().map(PartitionAssignment::replicas).toList();
}
// For unmanaged diskless, ISR includes all replicas regardless of fenced state —
// consistent with topic creation. Data lives in object storage, so broker fencing
// doesn't affect data availability.
boolean isDiskless = isDisklessTopic(topic.name());
Predicate<Integer> brokerFilter = (isDiskless && !isDisklessManagedReplicasEnabled)
? x -> true
: clusterControl::isActive;

int partitionId = startPartitionId;
for (int i = 0; i < partitionAssignments.size(); i++) {
Expand Down Expand Up @@ -2494,7 +2527,7 @@ void generateLeaderAndIsrUpdates(String context,
partition,
topicIdPart.topicId(),
topicIdPart.partitionId(),
new LeaderAcceptor(clusterControl, partition, isAcceptableLeader),
leaderAcceptorFor(topic.name, partition, isAcceptableLeader),
featureControl.metadataVersionOrThrow(),
getTopicEffectiveMinIsr(topic.name)
);
Expand Down Expand Up @@ -2615,7 +2648,7 @@ Optional<ApiMessageAndVersion> cancelPartitionReassignment(String topicName,
part,
tp.topicId(),
tp.partitionId(),
new LeaderAcceptor(clusterControl, part),
leaderAcceptorFor(topicName, part),
featureControl.metadataVersionOrThrow(),
getTopicEffectiveMinIsr(topicName)
);
Expand Down Expand Up @@ -2678,22 +2711,16 @@ Optional<ApiMessageAndVersion> changePartitionReassignment(TopicIdPartition tp,
PartitionReassignmentReplicas reassignment =
new PartitionReassignmentReplicas(currentAssignment, targetAssignment);

boolean isDiskless = isDisklessTopic(topics.get(tp.topicId()).name);

// Diskless topics don't use local directories — skip the directory check in leader election.
// Any active (unfenced, not in controlled shutdown) broker can lead a diskless partition,
// since it reads all data from object storage rather than local disk.
IntPredicate leaderAcceptor = isDiskless
? clusterControl::isActive
: new LeaderAcceptor(clusterControl, part);
String topicName = topics.get(tp.topicId()).name;
boolean isDiskless = isDisklessTopic(topicName);

PartitionChangeBuilder builder = new PartitionChangeBuilder(
part,
tp.topicId(),
tp.partitionId(),
leaderAcceptor,
leaderAcceptorFor(topicName, part),
featureControl.metadataVersionOrThrow(),
getTopicEffectiveMinIsr(topics.get(tp.topicId()).name)
getTopicEffectiveMinIsr(topicName)
);
builder.setEligibleLeaderReplicasEnabled(featureControl.isElrFeatureEnabled());

Expand All @@ -2714,15 +2741,16 @@ Optional<ApiMessageAndVersion> changePartitionReassignment(TopicIdPartition tp,
// storage. A future optimization could pre-warm target brokers' caches before
// completing the reassignment.
if (!target.replicas().equals(currentReplicas)) {
List<Integer> activeIsr = target.replicas().stream()
.filter(clusterControl::isActive)
.toList();
if (activeIsr.isEmpty()) {
// Reject if no target broker is active (can't elect a leader).
boolean anyActive = target.replicas().stream()
.anyMatch(clusterControl::isActive);
if (!anyActive) {
throw new InvalidReplicaAssignmentException(
"None of the target replicas " + target.replicas() + " are active.");
}
// ISR = all replicas: data is in object storage, fencing doesn't affect availability.
builder.setTargetReplicas(target.replicas());
builder.setTargetIsr(activeIsr);
builder.setTargetIsr(target.replicas());
}
} else {
if (!reassignment.replicas().equals(currentReplicas)) {
Expand Down Expand Up @@ -2804,7 +2832,7 @@ ControllerResult<AssignReplicasToDirsResponseData> handleAssignReplicasToDirs(As
partitionRegistration,
topicId,
partitionIndex,
new LeaderAcceptor(clusterControl, partitionRegistration),
leaderAcceptorFor(topicName, partitionRegistration),
featureControl.metadataVersionOrThrow(),
getTopicEffectiveMinIsr(topicName)
)
Expand Down Expand Up @@ -3046,6 +3074,34 @@ public String toString() {
}
}

// Classic topics always participate in preferred leader balancing. Diskless topics only
// participate when the managed-replicas flag is enabled — without it, the metadata
// transformer handles leader routing and there's no multi-replica leadership to balance.
private boolean shouldTrackPreferredLeader(String topicName) {
return !isDisklessTopic(topicName) || isDisklessManagedReplicasEnabled;
}

// Active brokers sort first so buildPartitionRegistration picks an active leader via isr.get(0).
private Comparator<Integer> activeFirstComparator() {
return Comparator.comparingInt(b -> clusterControl.isActive(b) ? 0 : 1);
}

/**
* Returns the appropriate leader acceptor for the given topic. Diskless topics skip the
* directory-liveness check since they don't use local storage.
*/
private IntPredicate leaderAcceptorFor(String topicName, PartitionRegistration partition) {
return isDisklessTopic(topicName)
? clusterControl::isActive
: new LeaderAcceptor(clusterControl, partition);
}

private IntPredicate leaderAcceptorFor(String topicName, PartitionRegistration partition, IntPredicate isAcceptableLeader) {
return isDisklessTopic(topicName)
? isAcceptableLeader
: new LeaderAcceptor(clusterControl, partition, isAcceptableLeader);
}

private static final class LeaderAcceptor implements IntPredicate {
private final ClusterControlManager clusterControl;
private final PartitionRegistration partition;
Expand Down
Loading
Loading