diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java b/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java index 613a2dfb7b..4aa2386b90 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java @@ -354,6 +354,40 @@ default DescribeTopicPartitionsResult describeTopicPartitions(Collection */ DescribeTopicPartitionsResult describeTopicPartitions(Collection topics, DescribeTopicsOptions options); + /** + * Override the classic-to-diskless switch state of a single partition. This is an operator + * tool that writes the {@code classicToDisklessStartOffset} directly on the controller without + * requiring the caller to be the partition leader. + * + * This is a convenience method for {@link #alterDisklessSwitch(String, int, long, AlterDisklessSwitchOptions)} + * with default options. + * + * @param topic The topic name. + * @param partition The partition index. + * @param sealOffset The seal offset to commit: {@code >= 0} forces (re-)sealing at that offset, + * {@code -1} aborts the switch and reverts the partition to classic, and + * {@code -2} re-arms the switch as pending. + * @return The AlterDisklessSwitchResult. + */ + default AlterDisklessSwitchResult alterDisklessSwitch(String topic, int partition, long sealOffset) { + return alterDisklessSwitch(topic, partition, sealOffset, new AlterDisklessSwitchOptions()); + } + + /** + * Override the classic-to-diskless switch state of a single partition. This is an operator + * tool that writes the {@code classicToDisklessStartOffset} directly on the controller without + * requiring the caller to be the partition leader. + * + * @param topic The topic name. + * @param partition The partition index. + * @param sealOffset The seal offset to commit: {@code >= 0} forces (re-)sealing at that offset, + * {@code -1} aborts the switch and reverts the partition to classic, and + * {@code -2} re-arms the switch as pending. + * @param options The options to use. + * @return The AlterDisklessSwitchResult. + */ + AlterDisklessSwitchResult alterDisklessSwitch(String topic, int partition, long sealOffset, AlterDisklessSwitchOptions options); + /** * Get information about the nodes in the cluster, using the default options. *

diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/AlterDisklessSwitchOptions.java b/clients/src/main/java/org/apache/kafka/clients/admin/AlterDisklessSwitchOptions.java new file mode 100644 index 0000000000..a135d90715 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/admin/AlterDisklessSwitchOptions.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.clients.admin; + +import org.apache.kafka.common.annotation.InterfaceStability; + +/** + * Options for {@link Admin#alterDisklessSwitch(String, int, long, AlterDisklessSwitchOptions)}. + * + * The API of this class is evolving, see {@link Admin} for details. + */ +@InterfaceStability.Unstable +public class AlterDisklessSwitchOptions extends AbstractOptions { + private boolean clearProducerStates = false; + + /** + * When forcing a seal (offset {@code >= 0}), whether to clear the committed producer states. + * Ignored for negative seal offsets, which always clear them. + */ + public AlterDisklessSwitchOptions clearProducerStates(boolean clearProducerStates) { + this.clearProducerStates = clearProducerStates; + return this; + } + + public boolean clearProducerStates() { + return clearProducerStates; + } +} diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/AlterDisklessSwitchResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/AlterDisklessSwitchResult.java new file mode 100644 index 0000000000..ea1a59f4b7 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/admin/AlterDisklessSwitchResult.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.clients.admin; + +import org.apache.kafka.common.KafkaFuture; +import org.apache.kafka.common.annotation.InterfaceStability; + +/** + * The result of the {@link Admin#alterDisklessSwitch(String, int, long, AlterDisklessSwitchOptions)} call. + * + * The API of this class is evolving, see {@link Admin} for details. + */ +@InterfaceStability.Unstable +public class AlterDisklessSwitchResult { + private final KafkaFuture future; + + AlterDisklessSwitchResult(final KafkaFuture future) { + this.future = future; + } + + /** + * Return a future which succeeds if the operation is successful. + */ + public KafkaFuture all() { + return future; + } +} diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/ForwardingAdmin.java b/clients/src/main/java/org/apache/kafka/clients/admin/ForwardingAdmin.java index fba7132891..647137cc5f 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/ForwardingAdmin.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/ForwardingAdmin.java @@ -83,6 +83,11 @@ public DescribeTopicPartitionsResult describeTopicPartitions(Collection return delegate.describeTopicPartitions(topics, options); } + @Override + public AlterDisklessSwitchResult alterDisklessSwitch(String topic, int partition, long sealOffset, AlterDisklessSwitchOptions options) { + return delegate.alterDisklessSwitch(topic, partition, sealOffset, options); + } + @Override public DescribeClusterResult describeCluster(DescribeClusterOptions options) { return delegate.describeCluster(options); diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java index 8b38f6e53a..2d637a144a 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java @@ -108,6 +108,7 @@ import org.apache.kafka.common.errors.UnsupportedVersionException; import org.apache.kafka.common.internals.KafkaFutureImpl; import org.apache.kafka.common.message.AddRaftVoterRequestData; +import org.apache.kafka.common.message.AlterDisklessSwitchRequestData; import org.apache.kafka.common.message.AlterPartitionReassignmentsRequestData; import org.apache.kafka.common.message.AlterPartitionReassignmentsRequestData.ReassignableTopic; import org.apache.kafka.common.message.AlterReplicaLogDirsRequestData; @@ -186,6 +187,8 @@ import org.apache.kafka.common.requests.AddRaftVoterResponse; import org.apache.kafka.common.requests.AlterClientQuotasRequest; import org.apache.kafka.common.requests.AlterClientQuotasResponse; +import org.apache.kafka.common.requests.AlterDisklessSwitchRequest; +import org.apache.kafka.common.requests.AlterDisklessSwitchResponse; import org.apache.kafka.common.requests.AlterPartitionReassignmentsRequest; import org.apache.kafka.common.requests.AlterPartitionReassignmentsResponse; import org.apache.kafka.common.requests.AlterReplicaLogDirsRequest; @@ -2508,9 +2511,11 @@ void handleResponse(AbstractResponse abstractResponse) { "Topic " + topic.name() + " changed while fetching paginated response")); return; } - existing.partitions().addAll(topic.partitions()); + existing.partitions().addAll(copyPartitionsWithTaggedFields(topic.partitions())); } else { - accumulated.topics().add(topic.duplicate()); + DescribeTopicPartitionsResponseTopic copy = topic.duplicate(); + copy.setPartitions(copyPartitionsWithTaggedFields(topic.partitions())); + accumulated.topics().add(copy); } } @@ -2531,6 +2536,18 @@ void handleFailure(Throwable throwable) { return new DescribeTopicPartitionsResult(future); } + private static List copyPartitionsWithTaggedFields( + List partitions) { + List copies = new ArrayList<>(partitions.size()); + for (DescribeTopicPartitionsResponsePartition partition : partitions) { + DescribeTopicPartitionsResponsePartition copy = partition.duplicate(); + // duplicate() drops the unknown tagged fields + copy.unknownTaggedFields().addAll(partition.unknownTaggedFields()); + copies.add(copy); + } + return copies; + } + @Override public DescribeClusterResult describeCluster(DescribeClusterOptions options) { final KafkaFutureImpl> describeClusterFuture = new KafkaFutureImpl<>(); @@ -4882,6 +4899,44 @@ void handleFailure(Throwable throwable) { return new UnregisterBrokerResult(future); } + @Override + public AlterDisklessSwitchResult alterDisklessSwitch(String topic, int partition, long sealOffset, + AlterDisklessSwitchOptions options) { + final KafkaFutureImpl future = new KafkaFutureImpl<>(); + final long now = time.milliseconds(); + final Call call = new Call("alterDisklessSwitch", calcDeadlineMs(now, options.timeoutMs()), + new LeastLoadedBrokerOrActiveKController()) { + + @Override + AlterDisklessSwitchRequest.Builder createRequest(int timeoutMs) { + AlterDisklessSwitchRequestData data = new AlterDisklessSwitchRequestData() + .setTopicName(topic) + .setPartitionIndex(partition) + .setSealOffset(sealOffset) + .setClearProducerStates(options.clearProducerStates()); + return new AlterDisklessSwitchRequest.Builder(data); + } + + @Override + void handleResponse(AbstractResponse abstractResponse) { + final AlterDisklessSwitchResponse response = (AlterDisklessSwitchResponse) abstractResponse; + Errors error = Errors.forCode(response.data().errorCode()); + if (error == Errors.NONE) { + future.complete(null); + } else { + future.completeExceptionally(error.exception(response.data().errorMessage())); + } + } + + @Override + void handleFailure(Throwable throwable) { + future.completeExceptionally(throwable); + } + }; + runnable.call(call, now); + return new AlterDisklessSwitchResult(future); + } + @Override public DescribeProducersResult describeProducers(Collection topicPartitions, DescribeProducersOptions options) { PartitionLeaderStrategy.PartitionLeaderFuture future = diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java index e85de34380..bb24c8f590 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java @@ -135,7 +135,8 @@ public enum ApiKeys { DESCRIBE_SHARE_GROUP_OFFSETS(ApiMessageType.DESCRIBE_SHARE_GROUP_OFFSETS), ALTER_SHARE_GROUP_OFFSETS(ApiMessageType.ALTER_SHARE_GROUP_OFFSETS), DELETE_SHARE_GROUP_OFFSETS(ApiMessageType.DELETE_SHARE_GROUP_OFFSETS), - INIT_DISKLESS_LOG(ApiMessageType.INIT_DISKLESS_LOG, true); + INIT_DISKLESS_LOG(ApiMessageType.INIT_DISKLESS_LOG, true), + ALTER_DISKLESS_SWITCH(ApiMessageType.ALTER_DISKLESS_SWITCH, false, true); private static final Map> APIS_BY_LISTENER = diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java index f2cc1d7d15..b8ce8bd32f 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java @@ -356,6 +356,8 @@ private static AbstractRequest doParseRequest(ApiKeys apiKey, short apiVersion, return DeleteShareGroupOffsetsRequest.parse(readable, apiVersion); case INIT_DISKLESS_LOG: return InitDisklessLogRequest.parse(readable, apiVersion); + case ALTER_DISKLESS_SWITCH: + return AlterDisklessSwitchRequest.parse(readable, apiVersion); default: throw new AssertionError(String.format("ApiKey %s is not currently handled in `parseRequest`, the " + "code should be updated to do so.", apiKey)); diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java index 0ea17682d4..f5c94118ef 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java @@ -293,6 +293,8 @@ public static AbstractResponse parseResponse(ApiKeys apiKey, Readable readable, return DeleteShareGroupOffsetsResponse.parse(readable, version); case INIT_DISKLESS_LOG: return InitDisklessLogResponse.parse(readable, version); + case ALTER_DISKLESS_SWITCH: + return AlterDisklessSwitchResponse.parse(readable, version); default: throw new AssertionError(String.format("ApiKey %s is not currently handled in `parseResponse`, the " + "code should be updated to do so.", apiKey)); diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AlterDisklessSwitchRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/AlterDisklessSwitchRequest.java new file mode 100644 index 0000000000..4352a3d418 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/requests/AlterDisklessSwitchRequest.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.requests; + +import org.apache.kafka.common.message.AlterDisklessSwitchRequestData; +import org.apache.kafka.common.message.AlterDisklessSwitchResponseData; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.Readable; + +public class AlterDisklessSwitchRequest extends AbstractRequest { + + public static class Builder extends AbstractRequest.Builder { + private final AlterDisklessSwitchRequestData data; + + public Builder(AlterDisklessSwitchRequestData data) { + super(ApiKeys.ALTER_DISKLESS_SWITCH); + this.data = data; + } + + @Override + public AlterDisklessSwitchRequest build(short version) { + return new AlterDisklessSwitchRequest(data, version); + } + + @Override + public String toString() { + return data.toString(); + } + } + + private final AlterDisklessSwitchRequestData data; + + public AlterDisklessSwitchRequest(AlterDisklessSwitchRequestData data, short version) { + super(ApiKeys.ALTER_DISKLESS_SWITCH, version); + this.data = data; + } + + @Override + public AlterDisklessSwitchRequestData data() { + return data; + } + + @Override + public AlterDisklessSwitchResponse getErrorResponse(int throttleTimeMs, Throwable e) { + ApiError apiError = ApiError.fromThrowable(e); + return new AlterDisklessSwitchResponse(new AlterDisklessSwitchResponseData() + .setThrottleTimeMs(throttleTimeMs) + .setErrorCode(apiError.error().code()) + .setErrorMessage(apiError.message())); + } + + public static AlterDisklessSwitchRequest parse(Readable readable, short version) { + return new AlterDisklessSwitchRequest(new AlterDisklessSwitchRequestData(readable, version), version); + } +} diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AlterDisklessSwitchResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/AlterDisklessSwitchResponse.java new file mode 100644 index 0000000000..534ae8a266 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/requests/AlterDisklessSwitchResponse.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.requests; + +import org.apache.kafka.common.message.AlterDisklessSwitchResponseData; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.protocol.Readable; + +import java.util.EnumMap; +import java.util.Map; + +public class AlterDisklessSwitchResponse extends AbstractResponse { + private final AlterDisklessSwitchResponseData data; + + public AlterDisklessSwitchResponse(AlterDisklessSwitchResponseData data) { + super(ApiKeys.ALTER_DISKLESS_SWITCH); + this.data = data; + } + + @Override + public AlterDisklessSwitchResponseData data() { + return data; + } + + @Override + public int throttleTimeMs() { + return data.throttleTimeMs(); + } + + @Override + public void maybeSetThrottleTimeMs(int throttleTimeMs) { + data.setThrottleTimeMs(throttleTimeMs); + } + + @Override + public Map errorCounts() { + Map errorCounts = new EnumMap<>(Errors.class); + if (data.errorCode() != 0) { + errorCounts.put(Errors.forCode(data.errorCode()), 1); + } + return errorCounts; + } + + public static AlterDisklessSwitchResponse parse(Readable readable, short version) { + return new AlterDisklessSwitchResponse(new AlterDisklessSwitchResponseData(readable, version)); + } + + @Override + public boolean shouldClientThrottle(short version) { + return true; + } +} diff --git a/clients/src/main/resources/common/message/AlterDisklessSwitchRequest.json b/clients/src/main/resources/common/message/AlterDisklessSwitchRequest.json new file mode 100644 index 0000000000..c9718fcaca --- /dev/null +++ b/clients/src/main/resources/common/message/AlterDisklessSwitchRequest.json @@ -0,0 +1,33 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +{ + "apiKey": 501, + "type": "request", + "listeners": ["broker", "controller"], + "name": "AlterDisklessSwitchRequest", + "validVersions": "0", + "flexibleVersions": "0+", + "fields": [ + { "name": "TopicName", "type": "string", "versions": "0+", "entityType": "topicName", + "about": "The name of the topic whose classic-to-diskless switch state is being altered." }, + { "name": "PartitionIndex", "type": "int32", "versions": "0+", + "about": "The partition index." }, + { "name": "SealOffset", "type": "int64", "versions": "0+", + "about": "The classic-to-diskless start offset to commit: >= 0 forces (re-)sealing at that offset, -1 aborts the switch and reverts the partition to classic, and -2 re-arms the switch as pending." }, + { "name": "ClearProducerStates", "type": "bool", "versions": "0+", "default": "false", + "about": "When forcing a seal (SealOffset >= 0), whether to clear the committed producer states. Ignored for negative seal offsets, which always clear them." } + ] +} diff --git a/clients/src/main/resources/common/message/AlterDisklessSwitchResponse.json b/clients/src/main/resources/common/message/AlterDisklessSwitchResponse.json new file mode 100644 index 0000000000..48058be5c4 --- /dev/null +++ b/clients/src/main/resources/common/message/AlterDisklessSwitchResponse.json @@ -0,0 +1,30 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +{ + "apiKey": 501, + "type": "response", + "name": "AlterDisklessSwitchResponse", + "validVersions": "0", + "flexibleVersions": "0+", + "fields": [ + { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+", + "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." }, + { "name": "ErrorCode", "type": "int16", "versions": "0+", + "about": "The error code, or 0 if there was no error." }, + { "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", + "about": "The error message, or null if there was no error." } + ] +} diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java index aaf7b50a28..577dbf93e9 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java @@ -169,6 +169,7 @@ import org.apache.kafka.common.message.WriteTxnMarkersResponseData; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.protocol.types.RawTaggedField; import org.apache.kafka.common.quota.ClientQuotaAlteration; import org.apache.kafka.common.quota.ClientQuotaEntity; import org.apache.kafka.common.quota.ClientQuotaFilter; @@ -1819,6 +1820,35 @@ public void testDescribeTopicPartitionsRawResponse() throws ExecutionException, } } + @Test + public void testDescribeTopicPartitionsPreservesUnknownTaggedFields() throws Exception { + int classicToDisklessStartOffsetTag = 100; + byte[] encodedOffset = ByteBuffer.allocate(Long.BYTES).putLong(42L).array(); + + try (AdminClientUnitTestEnv env = mockClientEnv()) { + env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); + String topicName = "test-topic"; + Uuid topicId = Uuid.randomUuid(); + + DescribeTopicPartitionsResponseData responseData = new DescribeTopicPartitionsResponseData(); + addPartitionToDescribeTopicPartitionsResponse(responseData, topicName, topicId, singletonList(0)); + responseData.topics().find(topicName).partitions().get(0).unknownTaggedFields() + .add(new RawTaggedField(classicToDisklessStartOffsetTag, encodedOffset)); + env.kafkaClient().prepareResponse(new DescribeTopicPartitionsResponse(responseData)); + + DescribeTopicPartitionsResult result = env.adminClient().describeTopicPartitions( + singletonList(topicName), new DescribeTopicsOptions()); + DescribeTopicPartitionsResponseData data = result.rawResponse().get(); + + List taggedFields = data.topics().find(topicName) + .partitions().get(0).unknownTaggedFields(); + assertEquals(1, taggedFields.size()); + RawTaggedField preserved = taggedFields.get(0); + assertEquals(classicToDisklessStartOffsetTag, preserved.tag()); + assertEquals(42L, ByteBuffer.wrap(preserved.data()).getLong()); + } + } + @Test public void testDescribeTopicPartitionsEmptyTopicsThrows() { try (AdminClientUnitTestEnv env = mockClientEnv()) { diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java index 140a3d1a69..fd87df326e 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java @@ -1264,6 +1264,11 @@ public DescribeTopicPartitionsResult describeTopicPartitions(Collection throw new UnsupportedOperationException("Not implemented yet"); } + @Override + public AlterDisklessSwitchResult alterDisklessSwitch(String topic, int partition, long sealOffset, AlterDisklessSwitchOptions options) { + throw new UnsupportedOperationException("Not implemented yet"); + } + @Override public DescribeMetadataQuorumResult describeMetadataQuorum(DescribeMetadataQuorumOptions options) { throw new UnsupportedOperationException("Not implemented yet"); diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java index bc9c2acb5b..ff04ce1890 100644 --- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java +++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java @@ -49,6 +49,8 @@ import org.apache.kafka.common.message.AllocateProducerIdsResponseData; import org.apache.kafka.common.message.AlterClientQuotasResponseData; import org.apache.kafka.common.message.AlterConfigsResponseData; +import org.apache.kafka.common.message.AlterDisklessSwitchRequestData; +import org.apache.kafka.common.message.AlterDisklessSwitchResponseData; import org.apache.kafka.common.message.AlterPartitionReassignmentsRequestData; import org.apache.kafka.common.message.AlterPartitionReassignmentsResponseData; import org.apache.kafka.common.message.AlterPartitionRequestData; @@ -1078,6 +1080,7 @@ private AbstractRequest getRequest(ApiKeys apikey, short version) { case ALTER_SHARE_GROUP_OFFSETS: return createAlterShareGroupOffsetsRequest(version); case DELETE_SHARE_GROUP_OFFSETS: return createDeleteShareGroupOffsetsRequest(version); case INIT_DISKLESS_LOG: return createInitDisklessLogRequest(version); + case ALTER_DISKLESS_SWITCH: return createAlterDisklessSwitchRequest(version); default: throw new IllegalArgumentException("Unknown API key " + apikey); } } @@ -1174,6 +1177,7 @@ private AbstractResponse getResponse(ApiKeys apikey, short version) { case ALTER_SHARE_GROUP_OFFSETS: return createAlterShareGroupOffsetsResponse(); case DELETE_SHARE_GROUP_OFFSETS: return createDeleteShareGroupOffsetsResponse(); case INIT_DISKLESS_LOG: return createInitDisklessLogResponse(); + case ALTER_DISKLESS_SWITCH: return createAlterDisklessSwitchResponse(); default: throw new IllegalArgumentException("Unknown API key " + apikey); } } @@ -1308,6 +1312,20 @@ private InitDisklessLogResponse createInitDisklessLogResponse() { return new InitDisklessLogResponse(data); } + private AlterDisklessSwitchRequest createAlterDisklessSwitchRequest(short version) { + AlterDisklessSwitchRequestData data = new AlterDisklessSwitchRequestData() + .setTopicName("foo") + .setPartitionIndex(0) + .setSealOffset(100L); + return new AlterDisklessSwitchRequest.Builder(data).build(version); + } + + private AlterDisklessSwitchResponse createAlterDisklessSwitchResponse() { + return new AlterDisklessSwitchResponse(new AlterDisklessSwitchResponseData() + .setThrottleTimeMs(123) + .setErrorCode(Errors.NONE.code())); + } + private DescribeTopicPartitionsRequest createDescribeTopicPartitionsRequest(short version) { DescribeTopicPartitionsRequestData data = new DescribeTopicPartitionsRequestData() .setTopics(singletonList(new DescribeTopicPartitionsRequestData.TopicRequest().setName("foo"))) diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala index 73fca0f7c1..e6d04e8810 100755 --- a/core/src/main/scala/kafka/cluster/Partition.scala +++ b/core/src/main/scala/kafka/cluster/Partition.scala @@ -327,6 +327,17 @@ class Partition(val topicPartition: TopicPartition, } } + /** + * Unseal this Partition so classic appends are accepted again. Used when a classic-to-diskless + * switch is aborted and the partition reverts to a normal classic topic. + */ + def unseal(): Unit = inWriteLock(leaderIsrUpdateLock) { + if (_sealed) { + _sealed = false + stateChangeLogger.info(s"Unsealed partition $topicPartition after classic-to-diskless switch abort") + } + } + /** * Abort all ongoing transactions by appending ABORT markers directly to the log. * Diskless topics do not support transactions, so any in-flight transaction must diff --git a/core/src/main/scala/kafka/server/ControllerApis.scala b/core/src/main/scala/kafka/server/ControllerApis.scala index 87f6da038d..373cf27965 100644 --- a/core/src/main/scala/kafka/server/ControllerApis.scala +++ b/core/src/main/scala/kafka/server/ControllerApis.scala @@ -141,6 +141,7 @@ class ControllerApis( case ApiKeys.REMOVE_RAFT_VOTER => handleRemoveRaftVoter(request) case ApiKeys.UPDATE_RAFT_VOTER => handleUpdateRaftVoter(request) case ApiKeys.INIT_DISKLESS_LOG => handleInitDisklessLogRequest(request) + case ApiKeys.ALTER_DISKLESS_SWITCH => handleAlterDisklessSwitchRequest(request) case _ => throw new ApiException(s"Unsupported ApiKey ${request.context.header.apiKey}") } @@ -688,6 +689,21 @@ class ControllerApis( } } + def handleAlterDisklessSwitchRequest(request: RequestChannel.Request): CompletableFuture[Unit] = { + val alterDisklessSwitchRequest = request.body[AlterDisklessSwitchRequest] + authHelper.authorizeClusterOperation(request, ALTER) + val context = new ControllerRequestContext(request.context.header.data, request.context.principal, + OptionalLong.empty()) + controller.alterDisklessSwitch(context, alterDisklessSwitchRequest.data).handle[Unit] { (result, e) => + val response = if (e != null) { + alterDisklessSwitchRequest.getErrorResponse(e) + } else { + new AlterDisklessSwitchResponse(result) + } + requestHelper.sendResponseMaybeThrottle(request, _ => response) + } + } + def handleBrokerHeartBeatRequest(request: RequestChannel.Request): CompletableFuture[Unit] = { val heartbeatRequest = request.body[BrokerHeartbeatRequest] authHelper.authorizeClusterOperation(request, CLUSTER_ACTION) diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 5c7dfe316e..4e5cfef127 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -251,6 +251,7 @@ class KafkaApis(val requestChannel: RequestChannel, case ApiKeys.DELETE_SHARE_GROUP_OFFSETS => handleDeleteShareGroupOffsetsRequest(request).exceptionally(handleError) case ApiKeys.STREAMS_GROUP_DESCRIBE => handleStreamsGroupDescribe(request).exceptionally(handleError) case ApiKeys.STREAMS_GROUP_HEARTBEAT => handleStreamsGroupHeartbeat(request).exceptionally(handleError) + case ApiKeys.ALTER_DISKLESS_SWITCH => forwardToController(request) case _ => throw new IllegalStateException(s"No handler for request api key ${request.header.apiKey}") } } catch { diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 1f7f83e0de..0e8dfca6e4 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -3511,6 +3511,13 @@ class ReplicaManager(val config: KafkaConfig, val partitionAssignedDirectoryId = directoryIds.find(_._1.topicPartition() == tp).map(_._2) partition.makeLeader(info.partition, isNew, offsetCheckpoints, Some(info.topicId), partitionAssignedDirectoryId) + // A classic topic must never stay sealed. If this partition was sealed for a + // classic-to-diskless switch that was then aborted, unseal it so classic produces resume. + if (!isDiskless && partition.isSealed) { + partition.unseal() + initDisklessLogManager.foreach(_.removePartition(tp)) + } + changedPartitions.add(partition) if (isConsolidatingDisklessTopic) { consolidatingDisklessPartitionsToStartFetching.put(tp, partition) diff --git a/core/src/test/java/kafka/server/InklessTopicTypeSwitcherClusterTest.java b/core/src/test/java/kafka/server/InklessTopicTypeSwitcherClusterTest.java index 0520b4bcc3..9588078b86 100644 --- a/core/src/test/java/kafka/server/InklessTopicTypeSwitcherClusterTest.java +++ b/core/src/test/java/kafka/server/InklessTopicTypeSwitcherClusterTest.java @@ -23,6 +23,7 @@ import org.apache.kafka.clients.admin.ConfigEntry; import org.apache.kafka.clients.admin.CreateTopicsResult; import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.clients.admin.OffsetSpec; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; @@ -32,9 +33,11 @@ import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.config.ConfigResource; import org.apache.kafka.common.config.TopicConfig; import org.apache.kafka.common.errors.InvalidConfigurationException; +import org.apache.kafka.common.message.DescribeTopicPartitionsResponseData; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.requests.AlterConfigsRequest; @@ -44,6 +47,8 @@ import org.apache.kafka.common.test.KafkaClusterTestKit; import org.apache.kafka.common.test.TestKitNodes; import org.apache.kafka.coordinator.group.GroupCoordinatorConfig; +import org.apache.kafka.metadata.InitDisklessLogFields; +import org.apache.kafka.metadata.PartitionRegistration; import org.apache.kafka.server.IntegrationTestUtils; import org.apache.kafka.server.config.ReplicationConfigs; import org.apache.kafka.server.config.ServerConfigs; @@ -433,7 +438,7 @@ public void testSwitchRejectedWhenPartitionIsOfflineOrUnderReplicated() throws E // Now the switch should succeed alterTopicConfigWithIncrementalAlterConfigs(admin, topic, Map.of(TopicConfig.DISKLESS_ENABLE_CONFIG, "true")); - assertEquals("true", getTopicConfig(admin, topic).get(TopicConfig.DISKLESS_ENABLE_CONFIG)); + waitForTopicDisklessValue(admin, topic, "true"); } } @@ -506,6 +511,77 @@ public void testUncleanLeaderElectionRejectedWhileSwitchPending() throws Excepti } } + @Test + public void testAlterDisklessSwitchOverridesSealOffset() throws Exception { + final String topic = "switch-seal-override-" + UUID.randomUUID().toString().substring(0, 8); + + try (Admin admin = AdminClient.create(baseClientConfigs())) { + // Create a classic topic; partition 0 starts out un-switched (-1). + admin.createTopics(List.of( + new NewTopic(topic, 1, (short) 3) + .configs(Map.of(TopicConfig.DISKLESS_ENABLE_CONFIG, "false")) + )).all().get(30, TimeUnit.SECONDS); + waitForSealOffset(admin, topic, 0, + offset -> offset == PartitionRegistration.NO_CLASSIC_TO_DISKLESS_START_OFFSET); + + // Produce some records so the partition has a non-empty log; a concrete seal must point at + // or below the log end offset, otherwise the broker marks the partition offline. + final TopicPartition tp = new TopicPartition(topic, 0); + try (Producer producer = new KafkaProducer<>(producerConfigs())) { + for (int i = 0; i < 10; i++) { + producer.send(new ProducerRecord<>(topic, 0, null, + ("value-" + i).getBytes(StandardCharsets.UTF_8))).get(10, TimeUnit.SECONDS); + } + } + final long endOffset = admin.listOffsets(Map.of(tp, OffsetSpec.latest())) + .all().get(20, TimeUnit.SECONDS).get(tp).offset(); + log.warn("[stage=produced] topic={} endOffset={}", topic, endOffset); + + // Switch to diskless + alterTopicConfigWithIncrementalAlterConfigs( + admin, topic, Map.of(TopicConfig.DISKLESS_ENABLE_CONFIG, "true")); + waitForSealOffset(admin, topic, 0, offset -> offset != PartitionRegistration.NO_CLASSIC_TO_DISKLESS_START_OFFSET); + + // Force a concrete seal at the end offset via the operator admin API. + admin.alterDisklessSwitch(topic, 0, endOffset).all().get(20, TimeUnit.SECONDS); + waitForSealOffset(admin, topic, 0, offset -> offset == endOffset); + log.warn("[stage=sealed] topic={} forced seal offset={}", topic, endOffset); + } + } + + private long waitForSealOffset(final Admin admin, + final String topic, + final int partition, + final java.util.function.LongPredicate condition) throws Exception { + final long deadline = System.currentTimeMillis() + Duration.ofSeconds(60).toMillis(); + long last = PartitionRegistration.NO_CLASSIC_TO_DISKLESS_START_OFFSET; + while (System.currentTimeMillis() < deadline) { + last = readSealOffset(admin, topic, partition); + if (condition.test(last)) { + return last; + } + Thread.sleep(500); + } + throw new AssertionError("Seal offset for " + topic + "-" + partition + + " did not reach the expected state within timeout; last observed=" + last); + } + + private long readSealOffset(final Admin admin, final String topic, final int partition) throws Exception { + final DescribeTopicPartitionsResponseData responseData = + admin.describeTopicPartitions(List.of(topic)).rawResponse().get(10, TimeUnit.SECONDS); + final DescribeTopicPartitionsResponseData.DescribeTopicPartitionsResponseTopic topicData = + responseData.topics().find(topic); + if (topicData == null) { + // Topic metadata may not have propagated to this broker yet; treat as not-yet-switched. + return PartitionRegistration.NO_CLASSIC_TO_DISKLESS_START_OFFSET; + } + return topicData.partitions().stream() + .filter(p -> p.partitionIndex() == partition) + .findFirst() + .map(p -> InitDisklessLogFields.decodeClassicToDisklessStartOffset(p.unknownTaggedFields())) + .orElse(PartitionRegistration.NO_CLASSIC_TO_DISKLESS_START_OFFSET); + } + private void alterTopicConfigWithLegacyAlterConfigs(final String topic, final Map newConfigs) throws Exception { final ConfigResource topicResource = new ConfigResource(ConfigResource.Type.TOPIC, topic); diff --git a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala index 60c56d22e6..6ecd9bcc36 100644 --- a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala +++ b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala @@ -4141,6 +4141,23 @@ class PartitionTest extends AbstractPartitionTest { partition.appendRecordsToLeader(newRecords, origin = AppendOrigin.CLIENT, requiredAcks = 0, requestLocal)) } + @Test + def testUnsealPartitionAllowsAppends(): Unit = { + val leaderEpoch = 1 + partition = setupPartitionWithMocks(leaderEpoch, isLeader = true) + + val requestLocal = RequestLocal.withThreadConfinedCaching + + partition.seal() + assertTrue(partition.isSealed) + + partition.unseal() + assertFalse(partition.isSealed) + + val records = TestUtils.records(List(new SimpleRecord("k".getBytes, "v".getBytes))) + partition.appendRecordsToLeader(records, origin = AppendOrigin.CLIENT, requiredAcks = 0, requestLocal) + } + @Test def testSealedPartitionStabilizesLeo(): Unit = { val leaderEpoch = 1 diff --git a/docs/inkless/CLASSIC_TO_DISKLESS_SWITCH.md b/docs/inkless/CLASSIC_TO_DISKLESS_SWITCH.md index 728e6cd688..1d4a20e9d1 100644 --- a/docs/inkless/CLASSIC_TO_DISKLESS_SWITCH.md +++ b/docs/inkless/CLASSIC_TO_DISKLESS_SWITCH.md @@ -30,8 +30,8 @@ The switch state lives in the KRaft metadata log, but `classicToDisklessStartOff * **High tag numbers (100+).** `classicToDisklessStartOffset` uses tag `100` (`CLASSIC_TO_DISKLESS_START_OFFSET_TAG`). The 100+ range is reserved to avoid colliding with upstream tagged fields. Because the tag is unknown to vanilla Kafka, it is **silently skipped** when read by an upstream broker, which keeps the on-disk metadata format compatible in both directions. Two sibling fields ride along on the same record using the same mechanism: producer states (tag `101`) and the diskless leader epoch (tag `102`). * **Custom serde in `InitDisklessLogFields`.** The value is encoded by hand into the tagged field's raw byte payload: a single big-endian `int64` (8 bytes) written via `ByteBuffer.putLong`, and read back via `ByteBuffer.getLong`. There is no generated message class for it. -* **Sentinel is never serialized.** When `classicToDisklessStartOffset == -1`, the tag is omitted entirely to save space; on read, an absent tag decodes back to `-1` (`NO_CLASSIC_TO_DISKLESS_START_OFFSET`). Only `-2` and committed `>= 0` values are actually written. -* **Replay and merge.** On load, `PartitionRegistration` decodes the tag from `PartitionRecord.unknownTaggedFields()`; `merge(PartitionChangeRecord)` applies updates from change records and carries the existing value forward when a change record omits the tag; `toRecord()` re-encodes it. This is how the value survives metadata-log replay and propagates to brokers via metadata deltas. +* **Sentinel is normally not serialized.** When `classicToDisklessStartOffset == -1`, automatic paths (`toRecord()` snapshots and ordinary change records) omit the tag entirely to save space; on read, an absent tag decodes back to `-1` (`NO_CLASSIC_TO_DISKLESS_START_OFFSET`). The one exception is the operator seal/abort path, which encodes an explicit `-1` to actively reset a sealed partition back to classic (see [Operator override](#operator-override-of-the-seal-offset)). +* **Replay and merge.** On load, `PartitionRegistration` decodes the tag from `PartitionRecord.unknownTaggedFields()`; `merge(PartitionChangeRecord)` applies updates from change records and carries the existing value forward when a change record **omits** the tag; `toRecord()` re-encodes it. Crucially, "tag omitted" and "tag explicitly set to `-1`" are distinguished at decode time (`decodeClassicToDisklessStartOffsetIfPresent` returns an empty `OptionalLong` only when the tag is absent): an omitted tag keeps the existing value, while a present tag — including an explicit `-1` — is applied verbatim. This is how the value survives metadata-log replay and propagates to brokers via metadata deltas, and how an operator can abort a switch. ## Protocol Steps @@ -137,4 +137,18 @@ When a broker applies a newly committed seal offset, it reconciles the local cla * if local LEO is above the seal, it truncates to the seal because offsets at and above the seal belong to diskless storage * if a leader's local LEO is below the seal, it marks the partition offline because the classic prefix is incomplete -If leadership changes while the switch is pending, the new leader seals and re-drives the `InitDisklessLog` controller request. If leadership changes after the final offset is committed, the new leader skips the controller step and initializes the control plane directly from metadata. \ No newline at end of file +If leadership changes while the switch is pending, the new leader seals and re-drives the `InitDisklessLog` controller request. If leadership changes after the final offset is committed, the new leader skips the controller step and initializes the control plane directly from metadata. + +## Operator override of the seal offset + +The normal flow above is leader-driven: the broker proposes the seal offset and the controller validates it. For recovery and manual intervention, the `kafka-topic-switch.sh seal` operator command can override the seal state of a single partition directly, without being the partition leader. It is backed by the `AlterDisklessSwitch` controller RPC (admin → controller, authorized as `ALTER` on the cluster), which writes a `PartitionChangeRecord` carrying the requested `classicToDisklessStartOffset`. It is a recovery tool for a topic that is being switched, not a way to start a switch, so the controller rejects it unless the topic already has `diskless.enable=true`. + +* **`>= 0`** forces (re-)sealing at that offset. The controller also captures the current leader epoch as the diskless leader epoch, exactly as the leader-driven commit does, so consolidation truncation stays correct. Unlike `InitDisklessLog`, this is accepted even when the partition has already committed a seal offset, so an operator can correct a bad seal — but only *downward*: a re-seal may not exceed the current committed seal, since that is the classic log end offset and offsets beyond it exist only in diskless storage (routing them to the classic log would break consumption). Committed producer states are left unchanged unless `--clear-producer-states` is passed, which writes an explicit empty producer-states tag to reset them. +* **`-1`** aborts the switch and reverts the partition to classic. The tag is written explicitly (see [How it is stored](#how-it-is-stored)) so that `merge` resets the value rather than treating the record as "unchanged". The controller also emits a `ConfigRecord` turning `diskless.enable` back off; since that config is topic-level, aborting one partition reverts the whole topic to classic. +* **`-2`** re-arms the switch as pending and re-sets the leader to bump the leader epoch, forcing the broker to seal again — the same mechanism as the initial switch-pending mark. + +The negative (abort / re-arm) offsets are only accepted while the switch is still pending — i.e. the partition has not yet committed a seal (`classicToDisklessStartOffset < 0`). Once a seal is committed, diskless data may have been written past it, and reverting to classic would strand that data (and can move consumers backward), so the controller rejects `-1`/`-2` in that state. + +Because a negative offset means the partition is not switched, `merge` also drops the dependent switch metadata (producer states + the diskless leader epoch) whenever an explicit negative `classicToDisklessStartOffset` is applied. This prevents stale state from a previous switch attempt surviving an abort or re-arm; the next `InitDisklessLog` flow re-captures both. + +The command validates before committing: for a concrete (`>= 0`) seal it requires the seal offset to be at or below the partition end offset, as a sanity bound so a seal cannot point past the end of the log. The end offset is read via `listOffsets(LATEST)`, which returns the leader's log end offset, not the replication high watermark — it does not by itself prove every in-sync replica has replicated up to the seal. The `--dry-run` flag performs this validation and reports the action without writing anything. diff --git a/metadata/src/main/java/org/apache/kafka/controller/Controller.java b/metadata/src/main/java/org/apache/kafka/controller/Controller.java index 6374784ff1..0c848293a1 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/Controller.java +++ b/metadata/src/main/java/org/apache/kafka/controller/Controller.java @@ -22,6 +22,8 @@ import org.apache.kafka.common.config.ConfigResource; import org.apache.kafka.common.message.AllocateProducerIdsRequestData; import org.apache.kafka.common.message.AllocateProducerIdsResponseData; +import org.apache.kafka.common.message.AlterDisklessSwitchRequestData; +import org.apache.kafka.common.message.AlterDisklessSwitchResponseData; import org.apache.kafka.common.message.AlterPartitionReassignmentsRequestData; import org.apache.kafka.common.message.AlterPartitionReassignmentsResponseData; import org.apache.kafka.common.message.AlterPartitionRequestData; @@ -435,6 +437,22 @@ CompletableFuture initDisklessLog( InitDisklessLogRequestData request ); + /** + * Override the classic-to-diskless switch state of a single partition. Unlike + * {@link #initDisklessLog}, this is an operator-driven write that does not require the caller + * to be the partition leader: a non-negative seal offset forces (re-)sealing, {@code -1} aborts + * the switch and reverts the partition to classic, and {@code -2} re-arms the switch as pending. + * + * @param context The controller request context. + * @param request The AlterDisklessSwitch request data. + * + * @return A future yielding the response with the top-level error code. + */ + CompletableFuture alterDisklessSwitch( + ControllerRequestContext context, + AlterDisklessSwitchRequestData request + ); + /** * Begin shutting down, but don't block. You must still call close to clean up all * resources. diff --git a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java index 7351dd0549..a3dc61c1f1 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java +++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java @@ -30,6 +30,8 @@ import org.apache.kafka.common.errors.UnknownTopicOrPartitionException; import org.apache.kafka.common.message.AllocateProducerIdsRequestData; import org.apache.kafka.common.message.AllocateProducerIdsResponseData; +import org.apache.kafka.common.message.AlterDisklessSwitchRequestData; +import org.apache.kafka.common.message.AlterDisklessSwitchResponseData; import org.apache.kafka.common.message.AlterPartitionReassignmentsRequestData; import org.apache.kafka.common.message.AlterPartitionReassignmentsResponseData; import org.apache.kafka.common.message.AlterPartitionRequestData; @@ -1808,6 +1810,15 @@ public CompletableFuture initDisklessLog( () -> replicationControl.initDisklessLog(context, request)); } + @Override + public CompletableFuture alterDisklessSwitch( + ControllerRequestContext context, + AlterDisklessSwitchRequestData request + ) { + return appendWriteEvent("alterDisklessSwitch", context.deadlineNs(), + () -> replicationControl.alterDisklessSwitch(request)); + } + @Override public CompletableFuture alterUserScramCredentials( ControllerRequestContext context, diff --git a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java index e0077438b4..ad8701ddc6 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java +++ b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java @@ -38,6 +38,8 @@ import org.apache.kafka.common.errors.UnknownTopicOrPartitionException; import org.apache.kafka.common.errors.UnsupportedVersionException; import org.apache.kafka.common.internals.Topic; +import org.apache.kafka.common.message.AlterDisklessSwitchRequestData; +import org.apache.kafka.common.message.AlterDisklessSwitchResponseData; import org.apache.kafka.common.message.AlterPartitionReassignmentsRequestData; import org.apache.kafka.common.message.AlterPartitionReassignmentsRequestData.ReassignablePartition; import org.apache.kafka.common.message.AlterPartitionReassignmentsRequestData.ReassignableTopic; @@ -1667,6 +1669,96 @@ ControllerResult initDisklessLog( return ControllerResult.of(records, new InitDisklessLogResponseData().setTopics(topicResponses)); } + /** + * Overrides a single partition's classic-to-diskless switch state. Unlike {@link #initDisklessLog}, + * the caller is an administrator rather than the partition leader, so this performs no leader/epoch + * fencing. The requested seal offset is written verbatim as the {@code classicToDisklessStartOffset} + * tagged field on a {@link PartitionChangeRecord}: + * + *

+ */ + ControllerResult alterDisklessSwitch( + AlterDisklessSwitchRequestData request + ) { + long sealOffset = request.sealOffset(); + if (sealOffset < PartitionRegistration.CLASSIC_TO_DISKLESS_SWITCH_PENDING) { + throw new InvalidRequestException("Invalid seal offset " + sealOffset + + "; must be >= -2 (-2 re-arms, -1 aborts, >= 0 seals at that offset)."); + } + + Uuid topicId = topicsByName.get(request.topicName()); + if (topicId == null) { + throw new UnknownTopicOrPartitionException("Topic not found: " + request.topicName()); + } + if (!isDisklessTopic(request.topicName())) { + throw new InvalidRequestException("Topic " + request.topicName() + + " does not have " + DISKLESS_ENABLE_CONFIG + " enabled. AlterDisklessSwitch only operates " + + "on topics that are being switched to diskless."); + } + TopicControlInfo topic = topics.get(topicId); + PartitionRegistration partition = topic.parts.get(request.partitionIndex()); + if (partition == null) { + throw new UnknownTopicOrPartitionException("Partition not found: " + + request.topicName() + "-" + request.partitionIndex()); + } + if (partition.classicToDisklessStartOffset == PartitionRegistration.NO_CLASSIC_TO_DISKLESS_START_OFFSET) { + throw new InvalidRequestException("Partition " + request.topicName() + "-" + + request.partitionIndex() + " is not part of a classic-to-diskless switch; there is " + + "nothing to override."); + } + if (sealOffset < 0 && partition.classicToDisklessStartOffset >= 0) { + throw new InvalidRequestException("Cannot abort or re-arm the classic-to-diskless switch for " + + request.topicName() + "-" + request.partitionIndex() + ": it has already committed a seal " + + "offset (" + partition.classicToDisklessStartOffset + "), and diskless data may exist past it."); + } + if (sealOffset > 0 && partition.classicToDisklessStartOffset >= 0 + && sealOffset > partition.classicToDisklessStartOffset) { + throw new InvalidRequestException("Cannot seal " + request.topicName() + "-" + + request.partitionIndex() + " at offset " + sealOffset + ": it exceeds the committed seal " + + "offset (" + partition.classicToDisklessStartOffset + "), beyond which no classic data exists."); + } + + PartitionChangeRecord record = new PartitionChangeRecord() + .setTopicId(topicId) + .setPartitionId(request.partitionIndex()); + record.unknownTaggedFields().add( + InitDisklessLogFields.encodeClassicToDisklessStartOffset(sealOffset)); + + List records = new ArrayList<>(); + if (sealOffset >= 0) { + record.unknownTaggedFields().add( + InitDisklessLogFields.encodeDisklessLeaderEpoch(partition.leaderEpoch)); + if (request.clearProducerStates()) { + // Write an explicit empty producer-states tag; without it merge() leaves them unchanged. + record.unknownTaggedFields().add( + InitDisklessLogFields.encodeProducerStates(List.of())); + } + } else if (sealOffset == PartitionRegistration.CLASSIC_TO_DISKLESS_SWITCH_PENDING) { + // Bump the leader epoch to force the broker to seal again, as the switch-pending mark does. + record.setLeader(partition.leader); + } else { + // Aborting reverts the topic to classic + records.add(new ApiMessageAndVersion(new ConfigRecord() + .setResourceType(ResourceType.TOPIC.code()) + .setResourceName(topic.name) + .setName(DISKLESS_ENABLE_CONFIG) + .setValue("false"), (short) 0)); + } + records.add(new ApiMessageAndVersion(record, (short) 0)); + + log.info("AlterDisklessSwitch for {}-{}: classicToDisklessStartOffset={}", + topic.name, request.partitionIndex(), sealOffset); + + return ControllerResult.of(records, new AlterDisklessSwitchResponseData()); + } + /** * Validates that a batch of topics will create less than {@value MAX_PARTITIONS_PER_BATCH}. Exceeding this number of topics per batch * has led to out-of-memory exceptions. We use this validation to fail earlier to avoid allocating the memory. diff --git a/metadata/src/main/java/org/apache/kafka/metadata/InitDisklessLogFields.java b/metadata/src/main/java/org/apache/kafka/metadata/InitDisklessLogFields.java index d039395ac8..a4686d5b3e 100644 --- a/metadata/src/main/java/org/apache/kafka/metadata/InitDisklessLogFields.java +++ b/metadata/src/main/java/org/apache/kafka/metadata/InitDisklessLogFields.java @@ -22,6 +22,8 @@ import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; +import java.util.Optional; +import java.util.OptionalLong; /** * Helpers for encoding/decoding Inkless-specific fields for classic-diskless migrations @@ -46,12 +48,17 @@ public static RawTaggedField encodeClassicToDisklessStartOffset(long classicToDi } public static long decodeClassicToDisklessStartOffset(List taggedFields) { + return decodeClassicToDisklessStartOffsetIfPresent(taggedFields) + .orElse(PartitionRegistration.NO_CLASSIC_TO_DISKLESS_START_OFFSET); + } + + public static OptionalLong decodeClassicToDisklessStartOffsetIfPresent(List taggedFields) { for (RawTaggedField field : taggedFields) { if (field.tag() == CLASSIC_TO_DISKLESS_START_OFFSET_TAG) { - return ByteBuffer.wrap(field.data()).getLong(); + return OptionalLong.of(ByteBuffer.wrap(field.data()).getLong()); } } - return PartitionRegistration.NO_CLASSIC_TO_DISKLESS_START_OFFSET; + return OptionalLong.empty(); } // --- disklessLeaderEpoch (tag 102): a single int32 --- @@ -103,6 +110,10 @@ public static RawTaggedField encodeProducerStates(List state } public static List decodeProducerStates(List taggedFields) { + return decodeProducerStatesIfPresent(taggedFields).orElse(List.of()); + } + + public static Optional> decodeProducerStatesIfPresent(List taggedFields) { for (RawTaggedField field : taggedFields) { if (field.tag() == PRODUCER_STATES_TAG) { ByteBuffer buf = ByteBuffer.wrap(field.data()); @@ -118,9 +129,9 @@ public static List decodeProducerStates(List buf.getLong() )); } - return entries; + return Optional.of(entries); } } - return List.of(); + return Optional.empty(); } } diff --git a/metadata/src/main/java/org/apache/kafka/metadata/PartitionRegistration.java b/metadata/src/main/java/org/apache/kafka/metadata/PartitionRegistration.java index d4bd312c23..3338700f46 100644 --- a/metadata/src/main/java/org/apache/kafka/metadata/PartitionRegistration.java +++ b/metadata/src/main/java/org/apache/kafka/metadata/PartitionRegistration.java @@ -30,6 +30,7 @@ import java.util.Arrays; import java.util.List; import java.util.Objects; +import java.util.OptionalLong; import static org.apache.kafka.metadata.LeaderConstants.NO_LEADER_CHANGE; @@ -303,18 +304,23 @@ public PartitionRegistration merge(PartitionChangeRecord record) { int[] newElr = (record.eligibleLeaderReplicas() == null) ? elr : Replicas.toArray(record.eligibleLeaderReplicas()); int[] newLastKnownElr = (record.lastKnownElr() == null) ? lastKnownElr : Replicas.toArray(record.lastKnownElr()); - long newClassicToDisklessStartOffset = InitDisklessLogFields.decodeClassicToDisklessStartOffset(record.unknownTaggedFields()); - List newDisklessProducerStates = - InitDisklessLogFields.decodeProducerStates(record.unknownTaggedFields()); - if (newClassicToDisklessStartOffset == NO_CLASSIC_TO_DISKLESS_START_OFFSET) { - newClassicToDisklessStartOffset = classicToDisklessStartOffset; - newDisklessProducerStates = disklessProducerStates; - } - // The diskless leader epoch is captured at the classic-to-diskless switch and only carried by that - // single change record; every other change record omits the tag, so keep the existing value. - int newDisklessLeaderEpoch = InitDisklessLogFields.decodeDisklessLeaderEpoch(record.unknownTaggedFields()); - if (newDisklessLeaderEpoch == NO_DISKLESS_LEADER_EPOCH) { - newDisklessLeaderEpoch = disklessLeaderEpoch; + OptionalLong recordStartOffset = + InitDisklessLogFields.decodeClassicToDisklessStartOffsetIfPresent(record.unknownTaggedFields()); + long newClassicToDisklessStartOffset = recordStartOffset.orElse(classicToDisklessStartOffset); + + List newDisklessProducerStates; + int newDisklessLeaderEpoch; + // A negative start offset is a switch-undo, so we must drop stale diskless metadata + if (recordStartOffset.isPresent() && recordStartOffset.getAsLong() < 0) { + newDisklessProducerStates = List.of(); + newDisklessLeaderEpoch = NO_DISKLESS_LEADER_EPOCH; + } else { + newDisklessProducerStates = InitDisklessLogFields.decodeProducerStatesIfPresent(record.unknownTaggedFields()) + .orElse(disklessProducerStates); + newDisklessLeaderEpoch = InitDisklessLogFields.decodeDisklessLeaderEpoch(record.unknownTaggedFields()); + if (newDisklessLeaderEpoch == NO_DISKLESS_LEADER_EPOCH) { + newDisklessLeaderEpoch = disklessLeaderEpoch; + } } return new PartitionRegistration(newReplicas, newDirectories, diff --git a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java index 12b7ffaf3e..9e40bbad6f 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java +++ b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java @@ -25,11 +25,15 @@ import org.apache.kafka.common.config.ConfigResource; import org.apache.kafka.common.config.TopicConfig; import org.apache.kafka.common.errors.InvalidReplicaAssignmentException; +import org.apache.kafka.common.errors.InvalidRequestException; import org.apache.kafka.common.errors.InvalidTopicException; import org.apache.kafka.common.errors.PolicyViolationException; import org.apache.kafka.common.errors.StaleBrokerEpochException; +import org.apache.kafka.common.errors.UnknownTopicOrPartitionException; import org.apache.kafka.common.errors.UnsupportedVersionException; import org.apache.kafka.common.internals.Topic; +import org.apache.kafka.common.message.AlterDisklessSwitchRequestData; +import org.apache.kafka.common.message.AlterDisklessSwitchResponseData; import org.apache.kafka.common.message.AlterPartitionReassignmentsRequestData; import org.apache.kafka.common.message.AlterPartitionReassignmentsRequestData.ReassignablePartition; import org.apache.kafka.common.message.AlterPartitionReassignmentsRequestData.ReassignableTopic; @@ -7680,4 +7684,204 @@ public void testLegacySwitchAllowedWhenUncleanLeaderElectionDisabled() { } + @Test + public void testAlterDisklessSwitchForcesSeal() { + ReplicationControlTestContext ctx = disklessSwitchTestContext(); + ReplicationControlManager replicationControl = ctx.replicationControl; + Uuid topicId = createSwitchingTestTopic(ctx); + + ControllerResult result = + replicationControl.alterDisklessSwitch(new AlterDisklessSwitchRequestData() + .setTopicName("foo").setPartitionIndex(0).setSealOffset(100L)); + assertEquals((short) 0, result.response().errorCode()); + ctx.replay(result.records()); + + PartitionRegistration partition = replicationControl.getPartition(topicId, 0); + assertEquals(100L, partition.classicToDisklessStartOffset); + // The current leader epoch is captured as the diskless leader epoch. + assertEquals(partition.leaderEpoch, partition.disklessLeaderEpoch); + // By default a forced seal leaves producer states untouched: no producer-states tag is written. + PartitionChangeRecord record = (PartitionChangeRecord) result.records().get(0).message(); + assertTrue(InitDisklessLogFields.decodeProducerStatesIfPresent(record.unknownTaggedFields()).isEmpty()); + } + + @Test + public void testAlterDisklessSwitchForcesSealClearingProducerStates() { + ReplicationControlTestContext ctx = disklessSwitchTestContext(); + ReplicationControlManager replicationControl = ctx.replicationControl; + createSwitchingTestTopic(ctx); + + ControllerResult result = + replicationControl.alterDisklessSwitch(new AlterDisklessSwitchRequestData() + .setTopicName("foo").setPartitionIndex(0).setSealOffset(100L).setClearProducerStates(true)); + assertEquals((short) 0, result.response().errorCode()); + + // With clearProducerStates the record carries an explicit empty producer-states tag so merge() clears them. + PartitionChangeRecord record = (PartitionChangeRecord) result.records().get(0).message(); + assertEquals(List.of(), InitDisklessLogFields.decodeProducerStatesIfPresent(record.unknownTaggedFields()) + .orElseThrow(() -> new AssertionError("expected an explicit producer-states tag"))); + } + + @Test + public void testAlterDisklessSwitchAbortsSwitch() { + ReplicationControlTestContext ctx = disklessSwitchTestContext(); + ReplicationControlManager replicationControl = ctx.replicationControl; + Uuid topicId = createSwitchingTestTopic(ctx); + + // Abort the pending switch back to classic. + ctx.replay(replicationControl.alterDisklessSwitch(new AlterDisklessSwitchRequestData() + .setTopicName("foo").setPartitionIndex(0).setSealOffset(-1L)).records()); + PartitionRegistration aborted = replicationControl.getPartition(topicId, 0); + assertEquals(PartitionRegistration.NO_CLASSIC_TO_DISKLESS_START_OFFSET, aborted.classicToDisklessStartOffset); + assertEquals(List.of(), aborted.disklessProducerStates); + assertEquals(PartitionRegistration.NO_DISKLESS_LEADER_EPOCH, aborted.disklessLeaderEpoch); + assertEquals("false", ctx.configurationControl.currentTopicConfig("foo").get(DISKLESS_ENABLE_CONFIG)); + } + + @Test + public void testAlterDisklessSwitchReArms() { + ReplicationControlTestContext ctx = disklessSwitchTestContext(); + ReplicationControlManager replicationControl = ctx.replicationControl; + Uuid topicId = createSwitchingTestTopic(ctx); + int leaderEpochBefore = replicationControl.getPartition(topicId, 0).leaderEpoch; + + // Re-arm the pending switch. + ctx.replay(replicationControl.alterDisklessSwitch(new AlterDisklessSwitchRequestData() + .setTopicName("foo").setPartitionIndex(0).setSealOffset(-2L)).records()); + + PartitionRegistration partition = replicationControl.getPartition(topicId, 0); + assertEquals(PartitionRegistration.CLASSIC_TO_DISKLESS_SWITCH_PENDING, partition.classicToDisklessStartOffset); + // Re-arming bumps the leader epoch to force the broker to seal again. + assertEquals(leaderEpochBefore + 1, partition.leaderEpoch); + } + + @Test + public void testAlterDisklessSwitchCannotAbortCommittedSeal() { + ReplicationControlTestContext ctx = disklessSwitchTestContext(); + ReplicationControlManager replicationControl = ctx.replicationControl; + createSwitchingTestTopic(ctx); + + // Commit a seal; diskless data may now exist past it, so abort/re-arm must be rejected. + ctx.replay(replicationControl.alterDisklessSwitch(new AlterDisklessSwitchRequestData() + .setTopicName("foo").setPartitionIndex(0).setSealOffset(100L)).records()); + + assertThrows(InvalidRequestException.class, () -> + replicationControl.alterDisklessSwitch(new AlterDisklessSwitchRequestData() + .setTopicName("foo").setPartitionIndex(0).setSealOffset(-1L))); + assertThrows(InvalidRequestException.class, () -> + replicationControl.alterDisklessSwitch(new AlterDisklessSwitchRequestData() + .setTopicName("foo").setPartitionIndex(0).setSealOffset(-2L))); + } + + @Test + public void testAlterDisklessSwitchCannotReSealBeyondCommittedSeal() { + ReplicationControlTestContext ctx = disklessSwitchTestContext(); + ReplicationControlManager replicationControl = ctx.replicationControl; + Uuid topicId = createSwitchingTestTopic(ctx); + + // Commit a seal at 100. The classic log is truncated to the seal, so 100 is its end offset. + ctx.replay(replicationControl.alterDisklessSwitch(new AlterDisklessSwitchRequestData() + .setTopicName("foo").setPartitionIndex(0).setSealOffset(100L)).records()); + + // Re-sealing beyond the committed seal would route non-existent classic offsets and is rejected. + assertThrows(InvalidRequestException.class, () -> + replicationControl.alterDisklessSwitch(new AlterDisklessSwitchRequestData() + .setTopicName("foo").setPartitionIndex(0).setSealOffset(150L))); + + // Re-sealing at or below the committed seal is allowed (correcting a bad seal downward). + ctx.replay(replicationControl.alterDisklessSwitch(new AlterDisklessSwitchRequestData() + .setTopicName("foo").setPartitionIndex(0).setSealOffset(50L)).records()); + assertEquals(50L, replicationControl.getPartition(topicId, 0).classicToDisklessStartOffset); + } + + @Test + public void testAlterDisklessSwitchRejectsInvalidOffset() { + ReplicationControlTestContext ctx = disklessSwitchTestContext(); + ReplicationControlManager replicationControl = ctx.replicationControl; + createSwitchingTestTopic(ctx); + + assertThrows(InvalidRequestException.class, () -> + replicationControl.alterDisklessSwitch(new AlterDisklessSwitchRequestData() + .setTopicName("foo").setPartitionIndex(0).setSealOffset(-3L))); + } + + @Test + public void testAlterDisklessSwitchRejectsClassicTopics() { + ReplicationControlTestContext ctx = disklessSwitchTestContext(); + ReplicationControlManager replicationControl = ctx.replicationControl; + ctx.registerBrokers(0, 1, 2); + ctx.unfenceBrokers(0, 1, 2); + ctx.createTestTopic("foo", new int[][] {new int[] {0, 1, 2}}); + + assertThrows(InvalidRequestException.class, () -> + replicationControl.alterDisklessSwitch(new AlterDisklessSwitchRequestData() + .setTopicName("foo").setPartitionIndex(0).setSealOffset(100L))); + } + + @Test + public void testAlterDisklessSwitchRejectsPartitionNotInSwitch() { + ReplicationControlTestContext ctx = disklessSwitchTestContext(); + ReplicationControlManager replicationControl = ctx.replicationControl; + ctx.registerBrokers(0, 1, 2); + ctx.unfenceBrokers(0, 1, 2); + // Born-diskless topic: diskless.enable=true but never part of a switch (classicToDisklessStartOffset=-1). + CreateTopicsRequestData.CreatableTopicConfigCollection configs = + new CreateTopicsRequestData.CreatableTopicConfigCollection(); + configs.add(new CreateTopicsRequestData.CreatableTopicConfig() + .setName(DISKLESS_ENABLE_CONFIG).setValue("true")); + CreateTopicsRequestData request = new CreateTopicsRequestData(); + request.topics().add(new CreatableTopic() + .setName("foo").setNumPartitions(-1).setReplicationFactor((short) -1).setConfigs(configs)); + ctx.replay(replicationControl.createTopics( + anonymousContextFor(ApiKeys.CREATE_TOPICS), request, Set.of("foo")).records()); + + assertThrows(InvalidRequestException.class, () -> + replicationControl.alterDisklessSwitch(new AlterDisklessSwitchRequestData() + .setTopicName("foo").setPartitionIndex(0).setSealOffset(100L))); + } + + @Test + public void testAlterDisklessSwitchRejectsUnknownTopic() { + ReplicationControlTestContext ctx = disklessSwitchTestContext(); + ReplicationControlManager replicationControl = ctx.replicationControl; + + assertThrows(UnknownTopicOrPartitionException.class, () -> + replicationControl.alterDisklessSwitch(new AlterDisklessSwitchRequestData() + .setTopicName("nonexistent").setPartitionIndex(0).setSealOffset(100L))); + } + + @Test + public void testAlterDisklessSwitchRejectsUnknownPartition() { + ReplicationControlTestContext ctx = disklessSwitchTestContext(); + ReplicationControlManager replicationControl = ctx.replicationControl; + createSwitchingTestTopic(ctx); + + assertThrows(UnknownTopicOrPartitionException.class, () -> + replicationControl.alterDisklessSwitch(new AlterDisklessSwitchRequestData() + .setTopicName("foo").setPartitionIndex(5).setSealOffset(100L))); + } + + private static ReplicationControlTestContext disklessSwitchTestContext() { + return new ReplicationControlTestContext.Builder() + .setStaticConfig(ServerConfigs.DISKLESS_ALLOW_FROM_CLASSIC_ENABLE_CONFIG, true) + .setDisklessStorageSystemEnabled(true) + .build(); + } + + private static Uuid createSwitchingTestTopic(ReplicationControlTestContext ctx) { + ctx.registerBrokers(0, 1, 2); + ctx.unfenceBrokers(0, 1, 2); + Uuid topicId = ctx.createTestTopic("foo", new int[][] {new int[] {0, 1, 2}}, + Map.of(DISKLESS_ENABLE_CONFIG, "false"), (short) 0).topicId(); + + ConfigResource resource = new ConfigResource(ConfigResource.Type.TOPIC, "foo"); + Map>> configChanges = Map.of( + resource, Map.of(DISKLESS_ENABLE_CONFIG, + new AbstractMap.SimpleImmutableEntry<>(AlterConfigOp.OpType.SET, "true"))); + List switchRecords = ctx.replicationControl.markClassicToDisklessSwitchStarted( + configChanges, Map.of(resource, ApiError.NONE)); + ctx.replay(ctx.configurationControl.incrementalAlterConfigs(configChanges, true).records()); + ctx.replay(switchRecords); + return topicId; + } } diff --git a/metadata/src/test/java/org/apache/kafka/metadata/PartitionRegistrationTest.java b/metadata/src/test/java/org/apache/kafka/metadata/PartitionRegistrationTest.java index 8bb507c660..0026eea50b 100644 --- a/metadata/src/test/java/org/apache/kafka/metadata/PartitionRegistrationTest.java +++ b/metadata/src/test/java/org/apache/kafka/metadata/PartitionRegistrationTest.java @@ -468,6 +468,45 @@ public void testMergePreservesDisklessFields() { assertEquals(producerStates, merged.disklessProducerStates); } + @Test + public void testMergeStartOffsetWithoutProducerStatesTagPreservesProducerStates() { + List producerStates = List.of( + new InitDisklessLogFields.ProducerStateEntry(1L, (short) 0, 0, 10, 100L, 5000L) + ); + PartitionRegistration original = new PartitionRegistration.Builder(). + setReplicas(new int[]{1, 2, 3}).setDirectories(DirectoryId.unassignedArray(3)). + setIsr(new int[]{1, 2, 3}).setLeader(1).setLeaderRecoveryState(LeaderRecoveryState.RECOVERED). + setLeaderEpoch(0).setPartitionEpoch(0).setClassicToDisklessStartOffset(42L). + setDisklessProducerStates(producerStates).build(); + + PartitionChangeRecord changeRecord = new PartitionChangeRecord(); + changeRecord.unknownTaggedFields().add( + InitDisklessLogFields.encodeClassicToDisklessStartOffset(99L)); + + PartitionRegistration merged = original.merge(changeRecord); + assertEquals(99L, merged.classicToDisklessStartOffset); + assertEquals(producerStates, merged.disklessProducerStates); + } + + @Test + public void testMergeExplicitEmptyProducerStatesClearsThem() { + List producerStates = List.of( + new InitDisklessLogFields.ProducerStateEntry(1L, (short) 0, 0, 10, 100L, 5000L) + ); + PartitionRegistration original = new PartitionRegistration.Builder(). + setReplicas(new int[]{1, 2, 3}).setDirectories(DirectoryId.unassignedArray(3)). + setIsr(new int[]{1, 2, 3}).setLeader(1).setLeaderRecoveryState(LeaderRecoveryState.RECOVERED). + setLeaderEpoch(0).setPartitionEpoch(0).setClassicToDisklessStartOffset(42L). + setDisklessProducerStates(producerStates).build(); + + PartitionChangeRecord changeRecord = new PartitionChangeRecord(); + changeRecord.unknownTaggedFields().add( + InitDisklessLogFields.encodeProducerStates(List.of())); + + PartitionRegistration merged = original.merge(changeRecord); + assertEquals(List.of(), merged.disklessProducerStates); + } + @Test public void testDisklessFieldsInEqualsAndHashCode() { List states = List.of( @@ -532,6 +571,50 @@ public void testMergeFromSwitchPendingToActualOffset() { assertEquals(100L, merged.classicToDisklessStartOffset); } + @Test + public void testMergeAbortsSwitchWithExplicitMinusOne() { + List producerStates = List.of( + new InitDisklessLogFields.ProducerStateEntry(1L, (short) 0, 0, 10, 100L, 5000L)); + PartitionRegistration original = new PartitionRegistration.Builder(). + setReplicas(new int[]{1, 2, 3}).setDirectories(DirectoryId.unassignedArray(3)). + setIsr(new int[]{1, 2, 3}).setLeader(1).setLeaderRecoveryState(LeaderRecoveryState.RECOVERED). + setLeaderEpoch(0).setPartitionEpoch(0).setClassicToDisklessStartOffset(100L). + setDisklessProducerStates(producerStates).setDisklessLeaderEpoch(7).build(); + + PartitionChangeRecord changeRecord = new PartitionChangeRecord(); + changeRecord.unknownTaggedFields().add( + InitDisklessLogFields.encodeClassicToDisklessStartOffset( + PartitionRegistration.NO_CLASSIC_TO_DISKLESS_START_OFFSET)); + + PartitionRegistration merged = original.merge(changeRecord); + assertEquals(PartitionRegistration.NO_CLASSIC_TO_DISKLESS_START_OFFSET, merged.classicToDisklessStartOffset); + // Aborting must drop the dependent switch metadata from the previous completed switch. + assertEquals(List.of(), merged.disklessProducerStates); + assertEquals(PartitionRegistration.NO_DISKLESS_LEADER_EPOCH, merged.disklessLeaderEpoch); + } + + @Test + public void testMergeReArmsSwitchWithExplicitMinusTwo() { + List producerStates = List.of( + new InitDisklessLogFields.ProducerStateEntry(1L, (short) 0, 0, 10, 100L, 5000L)); + PartitionRegistration original = new PartitionRegistration.Builder(). + setReplicas(new int[]{1, 2, 3}).setDirectories(DirectoryId.unassignedArray(3)). + setIsr(new int[]{1, 2, 3}).setLeader(1).setLeaderRecoveryState(LeaderRecoveryState.RECOVERED). + setLeaderEpoch(0).setPartitionEpoch(0).setClassicToDisklessStartOffset(100L). + setDisklessProducerStates(producerStates).setDisklessLeaderEpoch(7).build(); + + PartitionChangeRecord changeRecord = new PartitionChangeRecord(); + changeRecord.unknownTaggedFields().add( + InitDisklessLogFields.encodeClassicToDisklessStartOffset( + PartitionRegistration.CLASSIC_TO_DISKLESS_SWITCH_PENDING)); + + PartitionRegistration merged = original.merge(changeRecord); + assertEquals(PartitionRegistration.CLASSIC_TO_DISKLESS_SWITCH_PENDING, merged.classicToDisklessStartOffset); + // Re-arming must drop the dependent switch metadata so the next initDisklessLog re-captures it. + assertEquals(List.of(), merged.disklessProducerStates); + assertEquals(PartitionRegistration.NO_DISKLESS_LEADER_EPOCH, merged.disklessLeaderEpoch); + } + @Test public void testMergePreservesSwitchPending() { PartitionRegistration original = new PartitionRegistration.Builder(). diff --git a/server/src/main/java/org/apache/kafka/network/RequestConvertToJson.java b/server/src/main/java/org/apache/kafka/network/RequestConvertToJson.java index 4f9042d9b0..a79b88c940 100644 --- a/server/src/main/java/org/apache/kafka/network/RequestConvertToJson.java +++ b/server/src/main/java/org/apache/kafka/network/RequestConvertToJson.java @@ -28,6 +28,8 @@ import org.apache.kafka.common.message.AlterClientQuotasResponseDataJsonConverter; import org.apache.kafka.common.message.AlterConfigsRequestDataJsonConverter; import org.apache.kafka.common.message.AlterConfigsResponseDataJsonConverter; +import org.apache.kafka.common.message.AlterDisklessSwitchRequestDataJsonConverter; +import org.apache.kafka.common.message.AlterDisklessSwitchResponseDataJsonConverter; import org.apache.kafka.common.message.AlterPartitionReassignmentsRequestDataJsonConverter; import org.apache.kafka.common.message.AlterPartitionReassignmentsResponseDataJsonConverter; import org.apache.kafka.common.message.AlterPartitionRequestDataJsonConverter; @@ -212,6 +214,8 @@ import org.apache.kafka.common.requests.AlterClientQuotasResponse; import org.apache.kafka.common.requests.AlterConfigsRequest; import org.apache.kafka.common.requests.AlterConfigsResponse; +import org.apache.kafka.common.requests.AlterDisklessSwitchRequest; +import org.apache.kafka.common.requests.AlterDisklessSwitchResponse; import org.apache.kafka.common.requests.AlterPartitionReassignmentsRequest; import org.apache.kafka.common.requests.AlterPartitionReassignmentsResponse; import org.apache.kafka.common.requests.AlterPartitionRequest; @@ -503,6 +507,8 @@ public static JsonNode request(AbstractRequest request) { IncrementalAlterConfigsRequestDataJsonConverter.write(((IncrementalAlterConfigsRequest) request).data(), request.version()); case INIT_DISKLESS_LOG -> InitDisklessLogRequestDataJsonConverter.write(((InitDisklessLogRequest) request).data(), request.version()); + case ALTER_DISKLESS_SWITCH -> + AlterDisklessSwitchRequestDataJsonConverter.write(((AlterDisklessSwitchRequest) request).data(), request.version()); case INITIALIZE_SHARE_GROUP_STATE -> InitializeShareGroupStateRequestDataJsonConverter.write(((InitializeShareGroupStateRequest) request).data(), request.version()); case INIT_PRODUCER_ID -> @@ -686,6 +692,8 @@ public static JsonNode response(AbstractResponse response, short version) { IncrementalAlterConfigsResponseDataJsonConverter.write(((IncrementalAlterConfigsResponse) response).data(), version); case INIT_DISKLESS_LOG -> InitDisklessLogResponseDataJsonConverter.write(((InitDisklessLogResponse) response).data(), version); + case ALTER_DISKLESS_SWITCH -> + AlterDisklessSwitchResponseDataJsonConverter.write(((AlterDisklessSwitchResponse) response).data(), version); case INITIALIZE_SHARE_GROUP_STATE -> InitializeShareGroupStateResponseDataJsonConverter.write(((InitializeShareGroupStateResponse) response).data(), version); case INIT_PRODUCER_ID -> diff --git a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/TestingMetricsInterceptingAdminClient.java b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/TestingMetricsInterceptingAdminClient.java index ef865f89a7..74dd72e71a 100644 --- a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/TestingMetricsInterceptingAdminClient.java +++ b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/TestingMetricsInterceptingAdminClient.java @@ -31,6 +31,8 @@ import org.apache.kafka.clients.admin.AlterConfigsResult; import org.apache.kafka.clients.admin.AlterConsumerGroupOffsetsOptions; import org.apache.kafka.clients.admin.AlterConsumerGroupOffsetsResult; +import org.apache.kafka.clients.admin.AlterDisklessSwitchOptions; +import org.apache.kafka.clients.admin.AlterDisklessSwitchResult; import org.apache.kafka.clients.admin.AlterPartitionReassignmentsOptions; import org.apache.kafka.clients.admin.AlterPartitionReassignmentsResult; import org.apache.kafka.clients.admin.AlterReplicaLogDirsOptions; @@ -404,6 +406,11 @@ public DescribeTopicPartitionsResult describeTopicPartitions(final Collection initDisklessLog( throw new UnsupportedOperationException("not implemented"); } + @Override + public CompletableFuture alterDisklessSwitch( + ControllerRequestContext context, + AlterDisklessSwitchRequestData request + ) { + throw new UnsupportedOperationException("not implemented"); + } + public void beginShutdown() { this.active = false; } diff --git a/tools/src/main/java/org/apache/kafka/tools/TopicSwitchCommand.java b/tools/src/main/java/org/apache/kafka/tools/TopicSwitchCommand.java index ed7364592a..c178cebb0d 100644 --- a/tools/src/main/java/org/apache/kafka/tools/TopicSwitchCommand.java +++ b/tools/src/main/java/org/apache/kafka/tools/TopicSwitchCommand.java @@ -18,6 +18,7 @@ package org.apache.kafka.tools; import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.AlterDisklessSwitchOptions; import org.apache.kafka.clients.admin.DescribeTopicPartitionsResult; import org.apache.kafka.clients.admin.DescribeTopicsOptions; import org.apache.kafka.clients.admin.ListOffsetsResult; @@ -85,6 +86,14 @@ static void execute(String... args) throws Exception { } break; case "seal": + try (Admin adminClient = Admin.create(properties)) { + sealCommand(System.out, adminClient, topic, + namespace.getInt("partition"), + Optional.ofNullable(namespace.getLong("offset")), + namespace.getBoolean("clear_producer_states"), + namespace.getBoolean("dry_run")); + } + break; case "repair": throw new RuntimeException("Command \"" + command + "\" not implemented"); default: @@ -123,6 +132,23 @@ private static ArgumentParser argumentParser() { .help("Topic name for the specified topic."); } + sealParser.addArgument("--partition", "-p") + .action(store()) + .type(Integer.class) + .required(true) + .help("The partition index to seal."); + sealParser.addArgument("--offset", "-o") + .action(store()) + .type(Long.class) + .help("The seal offset to commit: >= 0 forces (re-)sealing at that offset, -1 aborts the " + + "switch and reverts the partition to classic, and -2 re-arms the switch as pending. " + + "-1 and -2 are only allowed while the switch is still pending (no seal committed yet). " + + "If omitted, the partition's current end offset is used; omitting it is only allowed " + + "before a seal is committed, otherwise an explicit offset is required."); + sealParser.addArgument("--clear-producer-states") + .action(storeTrue()) + .help("When forcing a seal (offset >= 0), clear the committed producer states. Ignored for " + + "negative offsets, which always clear them."); sealParser.addArgument("--dry-run", "-d") .action(storeTrue()) .help("Whether to only perform validation when adjusting the seal offset."); @@ -196,6 +222,67 @@ static void stateCommand(PrintStream stream, Admin adminClient, String topic) th } } + static void sealCommand(PrintStream stream, Admin adminClient, String topic, int partition, + Optional offset, boolean clearProducerStates, boolean dryRun) throws Exception { + if (offset.isPresent() && offset.get() < PartitionRegistration.CLASSIC_TO_DISKLESS_SWITCH_PENDING) { + throw new RuntimeException("Invalid seal offset " + offset.get() + + "; must be >= -2 (-2 re-arms, -1 aborts, >= 0 seals at that offset)."); + } + + long sealOffset; + if (offset.isEmpty()) { + long committedSeal = readClassicToDisklessStartOffset(adminClient, topic, partition); + if (committedSeal >= 0) { + throw new RuntimeException(String.format( + "%s-%d already has a committed seal offset (%d); pass an explicit --offset to re-seal.", + topic, partition, committedSeal)); + } + TopicPartition tp = new TopicPartition(topic, partition); + sealOffset = adminClient.listOffsets(Map.of(tp, OffsetSpec.latest())) + .all().get().get(tp).offset(); + stream.printf("Validated %s-%d: sealing at end offset %d.%n", topic, partition, sealOffset); + } else { + sealOffset = offset.get(); + } + + if (dryRun) { + stream.printf("[dry-run] Would set %s-%d classicToDisklessStartOffset to %s.%n", + topic, partition, describeSealOffset(sealOffset)); + return; + } + + AlterDisklessSwitchOptions options = new AlterDisklessSwitchOptions() + .clearProducerStates(clearProducerStates); + adminClient.alterDisklessSwitch(topic, partition, sealOffset, options).all().get(); + stream.printf("Set %s-%d classicToDisklessStartOffset to %s.%n", + topic, partition, describeSealOffset(sealOffset)); + } + + private static long readClassicToDisklessStartOffset(Admin adminClient, String topic, int partition) + throws Exception { + DescribeTopicPartitionsResponseData responseData = adminClient + .describeTopicPartitions(List.of(topic), new DescribeTopicsOptions()).rawResponse().get(); + DescribeTopicPartitionsResponseTopic topicData = responseData.topics().find(topic); + if (topicData == null) { + throw new RuntimeException("Topic not found: " + topic); + } + return topicData.partitions().stream() + .filter(p -> p.partitionIndex() == partition) + .findFirst() + .map(p -> InitDisklessLogFields.decodeClassicToDisklessStartOffset(p.unknownTaggedFields())) + .orElseThrow(() -> new RuntimeException("Partition not found: " + topic + "-" + partition)); + } + + private static String describeSealOffset(long sealOffset) { + if (sealOffset == PartitionRegistration.NO_CLASSIC_TO_DISKLESS_START_OFFSET) { + return "-1 (abort switch, revert to classic)"; + } else if (sealOffset == PartitionRegistration.CLASSIC_TO_DISKLESS_SWITCH_PENDING) { + return "-2 (re-arm switch)"; + } else { + return String.valueOf(sealOffset); + } + } + private static String formatStartOffset(long offset) { if (offset == PartitionRegistration.NO_CLASSIC_TO_DISKLESS_START_OFFSET) { return "-1 (not switched)"; diff --git a/tools/src/test/java/org/apache/kafka/tools/TopicSwitchCommandTest.java b/tools/src/test/java/org/apache/kafka/tools/TopicSwitchCommandTest.java index 4a6b37247e..3f4c87ddc1 100644 --- a/tools/src/test/java/org/apache/kafka/tools/TopicSwitchCommandTest.java +++ b/tools/src/test/java/org/apache/kafka/tools/TopicSwitchCommandTest.java @@ -17,6 +17,8 @@ package org.apache.kafka.tools; import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.AlterDisklessSwitchOptions; +import org.apache.kafka.clients.admin.AlterDisklessSwitchResult; import org.apache.kafka.clients.admin.DescribeTopicPartitionsResult; import org.apache.kafka.clients.admin.DescribeTopicsOptions; import org.apache.kafka.clients.admin.ListOffsetsResult; @@ -40,11 +42,19 @@ import java.nio.charset.StandardCharsets; import java.util.List; import java.util.Map; +import java.util.Optional; +import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.anyMap; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @ExtendWith(MockitoExtension.class) @@ -170,6 +180,110 @@ public void testStateCommandMixedPartitions() throws Exception { assertTrue(output.contains("classicToDisklessStartOffset=-1 (not switched)")); } + @Test + public void testSealCommitsExplicitOffset() throws Exception { + mockAlterDisklessSwitch(); + + String output = runSealCommand(0, Optional.of(100L), false); + + verify(adminClient).alterDisklessSwitch(eq(TOPIC), eq(0), eq(100L), any(AlterDisklessSwitchOptions.class)); + assertTrue(output.contains("Set test-topic-0 classicToDisklessStartOffset to 100")); + } + + @Test + public void testSealDefaultsToEndOffsetWhenPending() throws Exception { + // Pending switch (offset -2): default seals at the current end offset. + mockDescribeSeal(0, -2L); + mockEndOffset(0, 150); + mockAlterDisklessSwitch(); + + String output = runSealCommand(0, Optional.empty(), false); + + verify(adminClient).alterDisklessSwitch(eq(TOPIC), eq(0), eq(150L), any(AlterDisklessSwitchOptions.class)); + assertTrue(output.contains("Set test-topic-0 classicToDisklessStartOffset to 150")); + } + + @Test + public void testSealDefaultRejectedWhenAlreadyCommitted() throws Exception { + // Already committed (offset 100): defaulting would use the diskless end offset, so require --offset. + mockDescribeSeal(0, 100L); + + RuntimeException ex = assertThrows(RuntimeException.class, + () -> runSealCommand(0, Optional.empty(), false)); + assertTrue(ex.getMessage().contains("already has a committed seal offset (100)")); + verify(adminClient, never()).alterDisklessSwitch(any(), anyInt(), anyLong(), any()); + } + + @Test + public void testSealDryRunDoesNotCommit() throws Exception { + String output = runSealCommand(0, Optional.of(100L), true); + + verify(adminClient, never()).alterDisklessSwitch(any(), anyInt(), anyLong(), any()); + assertTrue(output.contains("[dry-run]")); + assertTrue(output.contains("Would set test-topic-0 classicToDisklessStartOffset to 100")); + } + + @Test + public void testSealAbortDoesNotCheckEndOffset() throws Exception { + mockAlterDisklessSwitch(); + + String output = runSealCommand(0, Optional.of(-1L), false); + + verify(adminClient).alterDisklessSwitch(eq(TOPIC), eq(0), eq(-1L), any(AlterDisklessSwitchOptions.class)); + assertTrue(output.contains("-1 (abort switch, revert to classic)")); + assertFalse(output.contains("end offset")); + } + + @Test + public void testSealReArm() throws Exception { + mockAlterDisklessSwitch(); + + String output = runSealCommand(0, Optional.of(-2L), false); + + verify(adminClient).alterDisklessSwitch(eq(TOPIC), eq(0), eq(-2L), any(AlterDisklessSwitchOptions.class)); + assertTrue(output.contains("-2 (re-arm switch)")); + } + + @Test + public void testSealRejectsInvalidOffset() { + RuntimeException ex = assertThrows(RuntimeException.class, + () -> runSealCommand(0, Optional.of(-3L), false)); + assertTrue(ex.getMessage().contains("Invalid seal offset -3")); + } + + private void mockEndOffset(int partition, long offset) { + when(adminClient.listOffsets(anyMap())) + .thenAnswer(invocation -> { + TopicPartition tp = new TopicPartition(TOPIC, partition); + Map> futures = Map.of( + tp, KafkaFuture.completedFuture( + new ListOffsetsResult.ListOffsetsResultInfo(offset, -1, Optional.empty()))); + return new ListOffsetsResult(futures); + }); + } + + // Mocks describeTopicPartitions so the seal command can read the current committed offset. + private void mockDescribeSeal(int partition, long classicToDisklessStartOffset) { + DescribeTopicPartitionsResponseData responseData = buildResponseData(List.of( + buildPartition(partition, 1, 0, classicToDisklessStartOffset, -1))); + when(adminClient.describeTopicPartitions(ArgumentMatchers.any(), any(DescribeTopicsOptions.class))) + .thenReturn(new DescribeTopicPartitionsResult(KafkaFuture.completedFuture(responseData))); + } + + private void mockAlterDisklessSwitch() { + AlterDisklessSwitchResult result = mock(AlterDisklessSwitchResult.class); + when(result.all()).thenReturn(KafkaFuture.completedFuture(null)); + when(adminClient.alterDisklessSwitch(any(), anyInt(), anyLong(), any(AlterDisklessSwitchOptions.class))) + .thenReturn(result); + } + + private String runSealCommand(int partition, Optional offset, boolean dryRun) throws Exception { + ByteArrayOutputStream out = new ByteArrayOutputStream(); + PrintStream stream = new PrintStream(out, false, StandardCharsets.UTF_8); + TopicSwitchCommand.sealCommand(stream, adminClient, TOPIC, partition, offset, false, dryRun); + return out.toString(StandardCharsets.UTF_8); + } + private String runStateCommand() throws Exception { ByteArrayOutputStream out = new ByteArrayOutputStream(); PrintStream stream = new PrintStream(out, false, StandardCharsets.UTF_8);