Skip to content
Open
34 changes: 34 additions & 0 deletions clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,40 @@ default DescribeTopicPartitionsResult describeTopicPartitions(Collection<String>
*/
DescribeTopicPartitionsResult describeTopicPartitions(Collection<String> topics, DescribeTopicsOptions options);

/**
* Override the classic-to-diskless switch state of a single partition. This is an operator
* tool that writes the {@code classicToDisklessStartOffset} directly on the controller without
* requiring the caller to be the partition leader.
*
* This is a convenience method for {@link #alterDisklessSwitch(String, int, long, AlterDisklessSwitchOptions)}
* with default options.
*
* @param topic The topic name.
* @param partition The partition index.
* @param sealOffset The seal offset to commit: {@code >= 0} forces (re-)sealing at that offset,
* {@code -1} aborts the switch and reverts the partition to classic, and
* {@code -2} re-arms the switch as pending.
* @return The AlterDisklessSwitchResult.
*/
default AlterDisklessSwitchResult alterDisklessSwitch(String topic, int partition, long sealOffset) {
return alterDisklessSwitch(topic, partition, sealOffset, new AlterDisklessSwitchOptions());
}

/**
* Override the classic-to-diskless switch state of a single partition. This is an operator
* tool that writes the {@code classicToDisklessStartOffset} directly on the controller without
* requiring the caller to be the partition leader.
*
* @param topic The topic name.
* @param partition The partition index.
* @param sealOffset The seal offset to commit: {@code >= 0} forces (re-)sealing at that offset,
* {@code -1} aborts the switch and reverts the partition to classic, and
* {@code -2} re-arms the switch as pending.
* @param options The options to use.
* @return The AlterDisklessSwitchResult.
*/
AlterDisklessSwitchResult alterDisklessSwitch(String topic, int partition, long sealOffset, AlterDisklessSwitchOptions options);

/**
* Get information about the nodes in the cluster, using the default options.
* <p>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kafka.clients.admin;

import org.apache.kafka.common.annotation.InterfaceStability;

/**
* Options for {@link Admin#alterDisklessSwitch(String, int, long, AlterDisklessSwitchOptions)}.
*
* The API of this class is evolving, see {@link Admin} for details.
*/
@InterfaceStability.Unstable
public class AlterDisklessSwitchOptions extends AbstractOptions<AlterDisklessSwitchOptions> {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kafka.clients.admin;

import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.annotation.InterfaceStability;

/**
* The result of the {@link Admin#alterDisklessSwitch(String, int, long, AlterDisklessSwitchOptions)} call.
*
* The API of this class is evolving, see {@link Admin} for details.
*/
@InterfaceStability.Unstable
public class AlterDisklessSwitchResult {
private final KafkaFuture<Void> future;

AlterDisklessSwitchResult(final KafkaFuture<Void> future) {
this.future = future;
}

/**
* Return a future which succeeds if the operation is successful.
*/
public KafkaFuture<Void> all() {
return future;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,11 @@ public DescribeTopicPartitionsResult describeTopicPartitions(Collection<String>
return delegate.describeTopicPartitions(topics, options);
}

@Override
public AlterDisklessSwitchResult alterDisklessSwitch(String topic, int partition, long sealOffset, AlterDisklessSwitchOptions options) {
return delegate.alterDisklessSwitch(topic, partition, sealOffset, options);
}

@Override
public DescribeClusterResult describeCluster(DescribeClusterOptions options) {
return delegate.describeCluster(options);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.internals.KafkaFutureImpl;
import org.apache.kafka.common.message.AddRaftVoterRequestData;
import org.apache.kafka.common.message.AlterDisklessSwitchRequestData;
import org.apache.kafka.common.message.AlterPartitionReassignmentsRequestData;
import org.apache.kafka.common.message.AlterPartitionReassignmentsRequestData.ReassignableTopic;
import org.apache.kafka.common.message.AlterReplicaLogDirsRequestData;
Expand Down Expand Up @@ -186,6 +187,8 @@
import org.apache.kafka.common.requests.AddRaftVoterResponse;
import org.apache.kafka.common.requests.AlterClientQuotasRequest;
import org.apache.kafka.common.requests.AlterClientQuotasResponse;
import org.apache.kafka.common.requests.AlterDisklessSwitchRequest;
import org.apache.kafka.common.requests.AlterDisklessSwitchResponse;
import org.apache.kafka.common.requests.AlterPartitionReassignmentsRequest;
import org.apache.kafka.common.requests.AlterPartitionReassignmentsResponse;
import org.apache.kafka.common.requests.AlterReplicaLogDirsRequest;
Expand Down Expand Up @@ -2508,9 +2511,11 @@ void handleResponse(AbstractResponse abstractResponse) {
"Topic " + topic.name() + " changed while fetching paginated response"));
return;
}
existing.partitions().addAll(topic.partitions());
existing.partitions().addAll(copyPartitionsWithTaggedFields(topic.partitions()));
} else {
accumulated.topics().add(topic.duplicate());
DescribeTopicPartitionsResponseTopic copy = topic.duplicate();
copy.setPartitions(copyPartitionsWithTaggedFields(topic.partitions()));
accumulated.topics().add(copy);
}
}

Expand All @@ -2531,6 +2536,18 @@ void handleFailure(Throwable throwable) {
return new DescribeTopicPartitionsResult(future);
}

private static List<DescribeTopicPartitionsResponsePartition> copyPartitionsWithTaggedFields(
List<DescribeTopicPartitionsResponsePartition> partitions) {
List<DescribeTopicPartitionsResponsePartition> copies = new ArrayList<>(partitions.size());
for (DescribeTopicPartitionsResponsePartition partition : partitions) {
DescribeTopicPartitionsResponsePartition copy = partition.duplicate();
// duplicate() drops the unknown tagged fields
copy.unknownTaggedFields().addAll(partition.unknownTaggedFields());
copies.add(copy);
}
return copies;
}

@Override
public DescribeClusterResult describeCluster(DescribeClusterOptions options) {
final KafkaFutureImpl<Collection<Node>> describeClusterFuture = new KafkaFutureImpl<>();
Expand Down Expand Up @@ -4882,6 +4899,43 @@ void handleFailure(Throwable throwable) {
return new UnregisterBrokerResult(future);
}

@Override
public AlterDisklessSwitchResult alterDisklessSwitch(String topic, int partition, long sealOffset,
AlterDisklessSwitchOptions options) {
Comment on lines +4889 to +4890

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we add some validation here? For example, abort the request if disklessStartOffset was never set at all, and if diskless.enable is not set to true this should not work at all, as it's a recovery tool and not a tool to switch to diskless.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moreover, in case the switch is aborted, diskless.enable should also be reset to false.

final KafkaFutureImpl<Void> future = new KafkaFutureImpl<>();
final long now = time.milliseconds();
final Call call = new Call("alterDisklessSwitch", calcDeadlineMs(now, options.timeoutMs()),
new LeastLoadedBrokerOrActiveKController()) {

@Override
AlterDisklessSwitchRequest.Builder createRequest(int timeoutMs) {
AlterDisklessSwitchRequestData data = new AlterDisklessSwitchRequestData()
.setTopicName(topic)
.setPartitionIndex(partition)
.setSealOffset(sealOffset);
return new AlterDisklessSwitchRequest.Builder(data);
}

@Override
void handleResponse(AbstractResponse abstractResponse) {
final AlterDisklessSwitchResponse response = (AlterDisklessSwitchResponse) abstractResponse;
Errors error = Errors.forCode(response.data().errorCode());
if (error == Errors.NONE) {
future.complete(null);
} else {
future.completeExceptionally(error.exception(response.data().errorMessage()));
}
}

@Override
void handleFailure(Throwable throwable) {
future.completeExceptionally(throwable);
}
};
runnable.call(call, now);
return new AlterDisklessSwitchResult(future);
}

Comment on lines +4888 to +4924

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

alterDisklessSwitch updates only classicToDisklessStartOffset, and for sealOffset >= 0 writes a new disklessLeaderEpoch. For sealOffset == -1 or -2, though, it leaves any existing disklessProducerStates and disklessLeaderEpoch untouched.

That seems inconsistent with the intended state model: -1 means not switched/aborted, and -2 means pending/re-armed. In both states, producer states and disklessLeaderEpoch should be chosen again by the next initDisklessLog flow, not carried over from the previous completed switch. Otherwise the metadata can contain stale switch state even after an abort or re-arm.

I think this path should explicitly clear dependent switch metadata when sealOffset < 0, probably by making PartitionRegistration.merge() enforce the invariant: if an explicit classicToDisklessStartOffset tag is present and negative, reset disklessProducerStates to empty and disklessLeaderEpoch to NO_DISKLESS_LEADER_EPOCH.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As a follow up, in case of sealOffset >= 0, we should also give the possibility to the client to reset the disklessProducerStates or leave them as they were. By default i would leave them as they are.

@Override
public DescribeProducersResult describeProducers(Collection<TopicPartition> topicPartitions, DescribeProducersOptions options) {
PartitionLeaderStrategy.PartitionLeaderFuture<DescribeProducersResult.PartitionProducerState> future =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,8 @@ public enum ApiKeys {
DESCRIBE_SHARE_GROUP_OFFSETS(ApiMessageType.DESCRIBE_SHARE_GROUP_OFFSETS),
ALTER_SHARE_GROUP_OFFSETS(ApiMessageType.ALTER_SHARE_GROUP_OFFSETS),
DELETE_SHARE_GROUP_OFFSETS(ApiMessageType.DELETE_SHARE_GROUP_OFFSETS),
INIT_DISKLESS_LOG(ApiMessageType.INIT_DISKLESS_LOG, true);
INIT_DISKLESS_LOG(ApiMessageType.INIT_DISKLESS_LOG, true),
ALTER_DISKLESS_SWITCH(ApiMessageType.ALTER_DISKLESS_SWITCH, false, true);


private static final Map<ApiMessageType.ListenerType, EnumSet<ApiKeys>> APIS_BY_LISTENER =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,8 @@ private static AbstractRequest doParseRequest(ApiKeys apiKey, short apiVersion,
return DeleteShareGroupOffsetsRequest.parse(readable, apiVersion);
case INIT_DISKLESS_LOG:
return InitDisklessLogRequest.parse(readable, apiVersion);
case ALTER_DISKLESS_SWITCH:
return AlterDisklessSwitchRequest.parse(readable, apiVersion);
default:
throw new AssertionError(String.format("ApiKey %s is not currently handled in `parseRequest`, the " +
"code should be updated to do so.", apiKey));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,8 @@ public static AbstractResponse parseResponse(ApiKeys apiKey, Readable readable,
return DeleteShareGroupOffsetsResponse.parse(readable, version);
case INIT_DISKLESS_LOG:
return InitDisklessLogResponse.parse(readable, version);
case ALTER_DISKLESS_SWITCH:
return AlterDisklessSwitchResponse.parse(readable, version);
default:
throw new AssertionError(String.format("ApiKey %s is not currently handled in `parseResponse`, the " +
"code should be updated to do so.", apiKey));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kafka.common.requests;

import org.apache.kafka.common.message.AlterDisklessSwitchRequestData;
import org.apache.kafka.common.message.AlterDisklessSwitchResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Readable;

public class AlterDisklessSwitchRequest extends AbstractRequest {

public static class Builder extends AbstractRequest.Builder<AlterDisklessSwitchRequest> {
private final AlterDisklessSwitchRequestData data;

public Builder(AlterDisklessSwitchRequestData data) {
super(ApiKeys.ALTER_DISKLESS_SWITCH);
this.data = data;
}

@Override
public AlterDisklessSwitchRequest build(short version) {
return new AlterDisklessSwitchRequest(data, version);
}

@Override
public String toString() {
return data.toString();
}
}

private final AlterDisklessSwitchRequestData data;

public AlterDisklessSwitchRequest(AlterDisklessSwitchRequestData data, short version) {
super(ApiKeys.ALTER_DISKLESS_SWITCH, version);
this.data = data;
}

@Override
public AlterDisklessSwitchRequestData data() {
return data;
}

@Override
public AlterDisklessSwitchResponse getErrorResponse(int throttleTimeMs, Throwable e) {
ApiError apiError = ApiError.fromThrowable(e);
return new AlterDisklessSwitchResponse(new AlterDisklessSwitchResponseData()
.setThrottleTimeMs(throttleTimeMs)
.setErrorCode(apiError.error().code())
.setErrorMessage(apiError.message()));
}

public static AlterDisklessSwitchRequest parse(Readable readable, short version) {
return new AlterDisklessSwitchRequest(new AlterDisklessSwitchRequestData(readable, version), version);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kafka.common.requests;

import org.apache.kafka.common.message.AlterDisklessSwitchResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.Readable;

import java.util.EnumMap;
import java.util.Map;

public class AlterDisklessSwitchResponse extends AbstractResponse {
private final AlterDisklessSwitchResponseData data;

public AlterDisklessSwitchResponse(AlterDisklessSwitchResponseData data) {
super(ApiKeys.ALTER_DISKLESS_SWITCH);
this.data = data;
}

@Override
public AlterDisklessSwitchResponseData data() {
return data;
}

@Override
public int throttleTimeMs() {
return data.throttleTimeMs();
}

@Override
public void maybeSetThrottleTimeMs(int throttleTimeMs) {
data.setThrottleTimeMs(throttleTimeMs);
}

@Override
public Map<Errors, Integer> errorCounts() {
Map<Errors, Integer> errorCounts = new EnumMap<>(Errors.class);
if (data.errorCode() != 0) {
errorCounts.put(Errors.forCode(data.errorCode()), 1);
}
return errorCounts;
}

public static AlterDisklessSwitchResponse parse(Readable readable, short version) {
return new AlterDisklessSwitchResponse(new AlterDisklessSwitchResponseData(readable, version));
}

@Override
public boolean shouldClientThrottle(short version) {
return true;
}
}
Loading
Loading