diff --git a/core/src/test/java/kafka/server/InklessConfigsTest.java b/core/src/test/java/kafka/server/InklessConfigsTest.java index e3f2e34863..f1d25896a3 100644 --- a/core/src/test/java/kafka/server/InklessConfigsTest.java +++ b/core/src/test/java/kafka/server/InklessConfigsTest.java @@ -391,8 +391,10 @@ void canEnableDisklessOnClassicTopicWhenAllowFromClassicEnabled() throws Excepti createTopic(admin, topic, Map.of()); assertEquals("false", getTopicConfig(admin, topic).get(DISKLESS_ENABLE_CONFIG)); - alterTopicConfig(admin, topic, Map.of(DISKLESS_ENABLE_CONFIG, "true")); - assertEquals("true", getTopicConfig(admin, topic).get(DISKLESS_ENABLE_CONFIG)); + alterTopicConfig(admin, topic, Map.of(REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true", DISKLESS_ENABLE_CONFIG, "true")); + Map topicConfig = getTopicConfig(admin, topic); + assertEquals("true", topicConfig.get(DISKLESS_ENABLE_CONFIG)); + assertEquals("true", topicConfig.get(REMOTE_LOG_STORAGE_ENABLE_CONFIG)); } finally { cluster.close(); } diff --git a/core/src/test/java/kafka/server/InklessTopicTypeSwitcherClusterTest.java b/core/src/test/java/kafka/server/InklessTopicTypeSwitcherClusterTest.java index 0520b4bcc3..939c90f2cf 100644 --- a/core/src/test/java/kafka/server/InklessTopicTypeSwitcherClusterTest.java +++ b/core/src/test/java/kafka/server/InklessTopicTypeSwitcherClusterTest.java @@ -221,7 +221,9 @@ public void switchClassicTopicToDiskless(final AlterConfigsMode alterConfigsMode try { log.warn("[stage=switch-start] Enabling diskless for topic={} via {}", classicToDisklessTopic, alterConfigsMode); - alterTopicConfig(admin, classicToDisklessTopic, Map.of(TopicConfig.DISKLESS_ENABLE_CONFIG, "true"), alterConfigsMode); + alterTopicConfig(admin, classicToDisklessTopic, Map.of( + TopicConfig.DISKLESS_ENABLE_CONFIG, "true", + TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true"), alterConfigsMode); log.warn("[stage=await-switch] Waiting for diskless.enable=true on topic={}", classicToDisklessTopic); waitForTopicDisklessValue(admin, classicToDisklessTopic, "true"); @@ -404,6 +406,27 @@ private void alterTopicConfigWithIncrementalAlterConfigs(final Admin admin, admin.incrementalAlterConfigs(Map.of(topicResource, operations)).all().get(20, TimeUnit.SECONDS); } + @Test + public void testSwitchRejectedWhenRemoteStorageDisabled() throws Exception { + final String topic = "switch-remote-disabled-" + UUID.randomUUID().toString().substring(0, 8); + + try (Admin admin = AdminClient.create(baseClientConfigs())) { + // Create a classic topic with RF=3 + admin.createTopics(List.of( + new NewTopic(topic, 1, (short) 3) + .configs(Map.of(TopicConfig.DISKLESS_ENABLE_CONFIG, "false")) + )).all().get(30, TimeUnit.SECONDS); + + // Attempt to switch to diskless + final ExecutionException ex = assertThrows(ExecutionException.class, () -> + alterTopicConfigWithIncrementalAlterConfigs(admin, topic, Map.of( + TopicConfig.DISKLESS_ENABLE_CONFIG, "true"))); + assertInstanceOf(InvalidConfigurationException.class, ex.getCause()); + assertTrue(ex.getCause().getMessage().contains("remote storage must be enabled"), + "Expected 'remote storage must be enabled' in: " + ex.getCause().getMessage()); + } + } + @Test public void testSwitchRejectedWhenPartitionIsOfflineOrUnderReplicated() throws Exception { final String topic = "switch-unhealthy-" + UUID.randomUUID().toString().substring(0, 8); @@ -422,7 +445,9 @@ public void testSwitchRejectedWhenPartitionIsOfflineOrUnderReplicated() throws E // Attempt to switch to diskless (should fail with INVALID_CONFIG due to under-replication) final ExecutionException ex = assertThrows(ExecutionException.class, () -> - alterTopicConfigWithIncrementalAlterConfigs(admin, topic, Map.of(TopicConfig.DISKLESS_ENABLE_CONFIG, "true"))); + alterTopicConfigWithIncrementalAlterConfigs(admin, topic, Map.of( + TopicConfig.DISKLESS_ENABLE_CONFIG, "true", + TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true"))); assertInstanceOf(InvalidConfigurationException.class, ex.getCause()); assertTrue(ex.getCause().getMessage().contains("under-replicated"), "Expected 'under-replicated' in: " + ex.getCause().getMessage()); @@ -432,8 +457,12 @@ public void testSwitchRejectedWhenPartitionIsOfflineOrUnderReplicated() throws E waitForIsrRecovery(admin, topic, 0, 3); // Now the switch should succeed - alterTopicConfigWithIncrementalAlterConfigs(admin, topic, Map.of(TopicConfig.DISKLESS_ENABLE_CONFIG, "true")); - assertEquals("true", getTopicConfig(admin, topic).get(TopicConfig.DISKLESS_ENABLE_CONFIG)); + alterTopicConfigWithIncrementalAlterConfigs(admin, topic, Map.of( + TopicConfig.DISKLESS_ENABLE_CONFIG, "true", + TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true")); + Map topicConfig = getTopicConfig(admin, topic); + assertEquals("true", topicConfig.get(TopicConfig.DISKLESS_ENABLE_CONFIG)); + assertEquals("true", topicConfig.get(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG)); } } @@ -486,7 +515,9 @@ public void testUncleanLeaderElectionRejectedWhileSwitchPending() throws Excepti // Enable diskless to mark partition as switch pending alterTopicConfigWithIncrementalAlterConfigs( - admin, topic, Map.of(TopicConfig.DISKLESS_ENABLE_CONFIG, "true")); + admin, topic, Map.of( + TopicConfig.DISKLESS_ENABLE_CONFIG, "true", + TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true")); // Should fail trying to enable unclean leader election final ExecutionException ex = assertThrows(ExecutionException.class, () -> diff --git a/core/src/test/scala/unit/kafka/log/LogConfigTest.scala b/core/src/test/scala/unit/kafka/log/LogConfigTest.scala index 31adfb9bf5..0ce5131416 100644 --- a/core/src/test/scala/unit/kafka/log/LogConfigTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogConfigTest.scala @@ -677,7 +677,8 @@ class LogConfigTest { // Case 7: diskless is enabled and remote storage becomes enabled assertValid(existingWithDisklessTrueRemoteFalse, setDisklessTrueRemoteStorageTrue, kafkaConfig, disklessAllowFromClassic = true, remoteStorageConsolidationEnabled = true) assertValid(existingWithDisklessTrueRemoteFalse, setDisklessTrueRemoteStorageTrue, kafkaConfig, remoteStorageConsolidationEnabled = true) - assertInvalid(existingWithDisklessTrueRemoteFalse, setDisklessTrueRemoteStorageTrue, mutualExclusionError, kafkaConfig, disklessAllowFromClassic = true) + // Enabling remote storage on a diskless topic is a valid classic-to-diskless switch + assertValid(existingWithDisklessTrueRemoteFalse, setDisklessTrueRemoteStorageTrue, kafkaConfig, disklessAllowFromClassic = true) assertInvalid(existingWithDisklessTrueRemoteFalse, setDisklessTrueRemoteStorageTrue, mutualExclusionError, kafkaConfig) // Case 8: if diskless and remote is enabled, can't disable remote storage @@ -718,8 +719,8 @@ class LogConfigTest { assertValid(existingWithRemoteTrue, setDisklessTrueWithExistingRemoteTrue, kafkaConfig, disklessAllowFromClassic = true, remoteStorageConsolidationEnabled = true) // CLASSIC→DISKLESS direct switch: both diskless.enable=true and remote.storage.enable=true on a topic with neither config assertValid(existingWithoutDisklessOrRemote, setDisklessTrueWithExistingRemoteTrue, kafkaConfig, disklessAllowFromClassic = true, remoteStorageConsolidationEnabled = true) - // Same switch rejected without consolidation gate - assertInvalid(existingWithoutDisklessOrRemote, setDisklessTrueWithExistingRemoteTrue, mutualExclusionError, kafkaConfig, disklessAllowFromClassic = true, remoteStorageConsolidationEnabled = false) + // Same switch is allowed without consolidation: the switch only requires allow-from-classic + assertValid(existingWithoutDisklessOrRemote, setDisklessTrueWithExistingRemoteTrue, kafkaConfig, disklessAllowFromClassic = true, remoteStorageConsolidationEnabled = false) // Case 2: set diskless.enable=false with allowFromClassic=true - disabling diskless is still forbidden val setDisklessFalse = topicProps(TopicConfig.DISKLESS_ENABLE_CONFIG -> "false") diff --git a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java index e0077438b4..0387aff3c6 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java +++ b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java @@ -3090,8 +3090,16 @@ private ApiError validateDisklessSwitchInvariants( } } - // All partitions must be healthy to initiate a switch if (initiatingSwitch) { + ConfigEntry remoteStorageEntry = effectiveTopicConfigs.get(REMOTE_LOG_STORAGE_ENABLE_CONFIG); + boolean remoteStorageEnabled = remoteStorageEntry != null + && Boolean.parseBoolean(remoteStorageEntry.value()); + if (!remoteStorageEnabled && !isDisklessRemoteStorageConsolidationEnabled) { + return new ApiError(INVALID_CONFIG, + "Cannot switch topic " + topicName + " to diskless: " + + "remote storage must be enabled."); + } + // All partitions must be healthy to initiate a switch return validatePartitionsForSwitch(topicInfo); } diff --git a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java index 12b7ffaf3e..f55f7b7ae7 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java +++ b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java @@ -6760,7 +6760,8 @@ public void testSwitchRejectedWhenPartitionIsOffline() { .build(); ctx.registerBrokers(0, 1, 2); ctx.unfenceBrokers(0, 1, 2); - ctx.createTestTopic("foo", new int[][] {new int[] {0, 1, 2}}); + ctx.createTestTopic("foo", new int[][] {new int[] {0, 1, 2}}, + Map.of(REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true"), (short) 0); // Fence all brokers to make the partition offline ctx.fenceBrokers(0, 1, 2); @@ -6784,7 +6785,8 @@ public void testSwitchRejectedWhenReassignmentInProgress() { .build(); ctx.registerBrokers(0, 1, 2, 3); ctx.unfenceBrokers(0, 1, 2, 3); - ctx.createTestTopic("foo", new int[][] {new int[] {0, 1, 2}}); + ctx.createTestTopic("foo", new int[][] {new int[] {0, 1, 2}}, + Map.of(REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true"), (short) 0); // Start a reassignment ControllerResult alterResult = @@ -6814,7 +6816,8 @@ public void testSwitchRejectedWhenUnderReplicated() { .build(); ctx.registerBrokers(0, 1, 2); ctx.unfenceBrokers(0, 1, 2); - Uuid fooId = ctx.createTestTopic("foo", new int[][] {new int[] {0, 1, 2}}).topicId(); + Uuid fooId = ctx.createTestTopic("foo", new int[][] {new int[] {0, 1, 2}}, + Map.of(REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true"), (short) 0).topicId(); // Shrink ISR to make it under-replicated (fence one broker then unfence it without rejoining ISR) ctx.fenceBrokers(2); @@ -6844,7 +6847,8 @@ public void testSwitchRejectedWhenElrIsNonEmpty() { .build(); ctx.registerBrokers(0, 1, 2); ctx.unfenceBrokers(0, 1, 2); - Uuid fooId = ctx.createTestTopic("foo", new int[][] {new int[] {0, 1, 2}}).topicId(); + Uuid fooId = ctx.createTestTopic("foo", new int[][] {new int[] {0, 1, 2}}, + Map.of(REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true"), (short) 0).topicId(); // Fence broker 2 — ISR drops below minISR (3), so broker 2 goes to ELR ctx.fenceBrokers(2); @@ -6875,7 +6879,8 @@ public void testSwitchRejectedWhenLastKnownElrIsNonEmpty() { .build(); ctx.registerBrokers(0, 1, 2); ctx.unfenceBrokers(0, 1, 2); - Uuid fooId = ctx.createTestTopic("foo", new int[][] {new int[] {0, 1, 2}}).topicId(); + Uuid fooId = ctx.createTestTopic("foo", new int[][] {new int[] {0, 1, 2}}, + Map.of(REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true"), (short) 0).topicId(); // Fence broker 2 — ISR drops below minISR (3), so broker 2 goes to ELR ctx.fenceBrokers(2); @@ -6905,7 +6910,8 @@ public void testSwitchRejectedWhenRecovering() { .build(); ctx.registerBrokers(0, 1, 2); ctx.unfenceBrokers(0, 1, 2); - Uuid fooId = ctx.createTestTopic("foo", new int[][] {new int[] {0, 1, 2}}).topicId(); + Uuid fooId = ctx.createTestTopic("foo", new int[][] {new int[] {0, 1, 2}}, + Map.of(REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true"), (short) 0).topicId(); // Fence brokers 1, 2 to shrink ISR to [0] ctx.fenceBrokers(1, 2); @@ -7042,15 +7048,18 @@ public void testLegacyAlterConfigsRejectsImplicitSwitchWhenUnderReplicated() { PartitionRegistration partition = ctx.replicationControl.getPartition(fooId, 0); assertTrue(partition.isr.length < partition.replicas.length); - // Legacy AlterConfigs with only retention.ms (omits diskless.enable). - // Since this would implicitly switch via broker default and the partition - // is under-replicated, it must be rejected. + // Legacy AlterConfigs with retention.ms and remote.storage.enable=true + // (omits diskless.enable). Since this would implicitly switch via broker + // default and the partition is under-replicated, it must be rejected. + Map legacyConfigs = Map.of( + "retention.ms", "86400000", + REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true"); ControllerResult> legacyResult = ctx.configurationControl.legacyAlterConfigs( - Map.of(resource, Map.of("retention.ms", "86400000")), + Map.of(resource, legacyConfigs), false, r -> ctx.replicationControl.validateClassicToDisklessSwitchPreconditionForLegacy( - r, Map.of(resource, Map.of("retention.ms", "86400000")))); + r, Map.of(resource, legacyConfigs))); assertEquals(Errors.INVALID_CONFIG, legacyResult.response().get(resource).error(), "Legacy AlterConfigs should reject implicit diskless switch when under-replicated"); @@ -7074,11 +7083,12 @@ public void testLegacyAlterConfigsEmitsSwitchRecordsForImplicitSwitch() { ConfigResource resource = new ConfigResource(ConfigResource.Type.TOPIC, "foo"); - // Legacy AlterConfigs with only retention.ms (omits diskless.enable). - // Partitions are healthy, so the implicit switch should succeed and produce - // switch-pending records. + // Legacy AlterConfigs with retention.ms and remote.storage.enable=true + // (omits diskless.enable). Partitions are healthy, so the implicit switch + // should succeed and produce switch-pending records. Map> newConfigs = - Map.of(resource, Map.of("retention.ms", "86400000")); + Map.of(resource, Map.of("retention.ms", "86400000", + REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true")); ControllerResult> legacyResult = ctx.configurationControl.legacyAlterConfigs(newConfigs, false, r -> ctx.replicationControl.validateClassicToDisklessSwitchPreconditionForLegacy( @@ -7149,6 +7159,96 @@ public void testElectLeadersRejectsUncleanElectionForPendingSwitchPartition() { "Expected pending switch message in: " + partitionResult.errorMessage()); } + @Test + public void testSwitchRejectedWhenRemoteStorageNotEnabled() { + ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder() + .setStaticConfig(ServerConfigs.DISKLESS_ALLOW_FROM_CLASSIC_ENABLE_CONFIG, true) + .build(); + ctx.registerBrokers(0, 1, 2); + ctx.unfenceBrokers(0, 1, 2); + ctx.createTestTopic("foo", new int[][] {new int[] {0, 1, 2}}); + + ConfigResource resource = new ConfigResource(ConfigResource.Type.TOPIC, "foo"); + Map>> configChanges = Map.of( + resource, Map.of(DISKLESS_ENABLE_CONFIG, + new AbstractMap.SimpleImmutableEntry<>(AlterConfigOp.OpType.SET, "true"))); + + ApiError error = + ctx.replicationControl.validateClassicToDisklessSwitchPrecondition(resource, configChanges); + + assertEquals(Errors.INVALID_CONFIG, error.error()); + assertEquals("Cannot switch topic foo to diskless: " + + "remote storage must be enabled.", error.message()); + } + + @Test + public void testSwitchAllowedWhenRemoteStorageEnabledOnTopic() { + ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder() + .setStaticConfig(ServerConfigs.DISKLESS_ALLOW_FROM_CLASSIC_ENABLE_CONFIG, true) + .build(); + ctx.registerBrokers(0, 1, 2); + ctx.unfenceBrokers(0, 1, 2); + ctx.createTestTopic("foo", new int[][] {new int[] {0, 1, 2}}, + Map.of(REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true"), (short) 0); + + ConfigResource resource = new ConfigResource(ConfigResource.Type.TOPIC, "foo"); + Map>> configChanges = Map.of( + resource, Map.of(DISKLESS_ENABLE_CONFIG, + new AbstractMap.SimpleImmutableEntry<>(AlterConfigOp.OpType.SET, "true"))); + + ApiError error = + ctx.replicationControl.validateClassicToDisklessSwitchPrecondition(resource, configChanges); + + assertEquals(ApiError.NONE, error); + } + + @Test + public void testSwitchAllowedWhenRemoteStorageEnabledInSameBatch() { + ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder() + .setStaticConfig(ServerConfigs.DISKLESS_ALLOW_FROM_CLASSIC_ENABLE_CONFIG, true) + .build(); + ctx.registerBrokers(0, 1, 2); + ctx.unfenceBrokers(0, 1, 2); + ctx.createTestTopic("foo", new int[][] {new int[] {0, 1, 2}}); + + ConfigResource resource = new ConfigResource(ConfigResource.Type.TOPIC, "foo"); + Map>> configChanges = Map.of( + resource, Map.of( + DISKLESS_ENABLE_CONFIG, + new AbstractMap.SimpleImmutableEntry<>(AlterConfigOp.OpType.SET, "true"), + REMOTE_LOG_STORAGE_ENABLE_CONFIG, + new AbstractMap.SimpleImmutableEntry<>(AlterConfigOp.OpType.SET, "true"))); + + ApiError error = + ctx.replicationControl.validateClassicToDisklessSwitchPrecondition(resource, configChanges); + + assertEquals(ApiError.NONE, error); + } + + @Test + public void testLegacySwitchRejectedWhenRemoteStorageNotEnabled() { + ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder() + .setStaticConfig(ServerConfigs.DISKLESS_ALLOW_FROM_CLASSIC_ENABLE_CONFIG, true) + .setDisklessStorageSystemEnabled(true) + .setDefaultDisklessEnable(true) + .build(); + ctx.registerBrokers(0, 1, 2); + ctx.unfenceBrokers(0, 1, 2); + ctx.createTestTopic("foo", new int[][] {new int[] {0, 1, 2}}, + Map.of(DISKLESS_ENABLE_CONFIG, "false"), (short) 0); + + ConfigResource resource = new ConfigResource(ConfigResource.Type.TOPIC, "foo"); + Map> newConfigs = Map.of(resource, Map.of( + DISKLESS_ENABLE_CONFIG, "true")); + + ApiError error = + ctx.replicationControl.validateClassicToDisklessSwitchPreconditionForLegacy(resource, newConfigs); + + assertEquals(Errors.INVALID_CONFIG, error.error()); + assertEquals("Cannot switch topic foo to diskless: " + + "remote storage must be enabled.", error.message()); + } + @Test public void testSwitchRejectedWhenUncleanLeaderElectionAlreadyEnabled() { ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder() @@ -7233,7 +7333,8 @@ public void testSwitchAllowedWhenUncleanLeaderElectionDisabled() { .build(); ctx.registerBrokers(0, 1, 2); ctx.unfenceBrokers(0, 1, 2); - ctx.createTestTopic("foo", new int[][] {new int[] {0, 1, 2}}); + ctx.createTestTopic("foo", new int[][] {new int[] {0, 1, 2}}, + Map.of(REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true"), (short) 0); ConfigResource resource = new ConfigResource(ConfigResource.Type.TOPIC, "foo"); Map>> configChanges = Map.of( @@ -7369,7 +7470,8 @@ public void testIncrementalDeleteDisklessEnableRejectedWhenUnderReplicated() { ctx.registerBrokers(0, 1, 2); ctx.unfenceBrokers(0, 1, 2); Uuid fooId = ctx.createTestTopic("foo", new int[][] {new int[] {0, 1, 2}}, - Map.of(DISKLESS_ENABLE_CONFIG, "false"), (short) 0).topicId(); + Map.of(DISKLESS_ENABLE_CONFIG, "false", + REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true"), (short) 0).topicId(); // Make the partition under-replicated ctx.fenceBrokers(2); @@ -7666,11 +7768,13 @@ public void testLegacySwitchAllowedWhenUncleanLeaderElectionDisabled() { ctx.registerBrokers(0, 1, 2); ctx.unfenceBrokers(0, 1, 2); ctx.createTestTopic("foo", new int[][] {new int[] {0, 1, 2}}, - Map.of(DISKLESS_ENABLE_CONFIG, "false"), (short) 0); + Map.of(DISKLESS_ENABLE_CONFIG, "false", + REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true"), (short) 0); ConfigResource resource = new ConfigResource(ConfigResource.Type.TOPIC, "foo"); Map> newConfigs = Map.of(resource, Map.of( - DISKLESS_ENABLE_CONFIG, "true")); + DISKLESS_ENABLE_CONFIG, "true", + REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true")); ApiError error = ctx.replicationControl.validateClassicToDisklessSwitchPreconditionForLegacy(resource, newConfigs); diff --git a/metadata/src/test/java/org/apache/kafka/metadata/FakeKafkaConfigSchema.java b/metadata/src/test/java/org/apache/kafka/metadata/FakeKafkaConfigSchema.java index e1e315e7de..babbbd1c33 100644 --- a/metadata/src/test/java/org/apache/kafka/metadata/FakeKafkaConfigSchema.java +++ b/metadata/src/test/java/org/apache/kafka/metadata/FakeKafkaConfigSchema.java @@ -40,10 +40,12 @@ public class FakeKafkaConfigSchema { static { CONFIGS.put(BROKER, new ConfigDef(). define("unclean.leader.election.enable", BOOLEAN, "false", HIGH, ""). - define("min.insync.replicas", INT, "1", HIGH, "")); + define("min.insync.replicas", INT, "1", HIGH, ""). + define("remote.storage.enable", BOOLEAN, "false", HIGH, "")); CONFIGS.put(TOPIC, new ConfigDef(). define("unclean.leader.election.enable", BOOLEAN, "false", HIGH, ""). - define("min.insync.replicas", INT, "1", HIGH, "")); + define("min.insync.replicas", INT, "1", HIGH, ""). + define("remote.storage.enable", BOOLEAN, "false", HIGH, "")); } public static final Map> SYNONYMS = new HashMap<>(); @@ -53,6 +55,8 @@ public class FakeKafkaConfigSchema { List.of(new ConfigSynonym("unclean.leader.election.enable"))); SYNONYMS.put("min.insync.replicas", List.of(new ConfigSynonym("min.insync.replicas"))); + SYNONYMS.put("remote.storage.enable", + List.of(new ConfigSynonym("remote.storage.enable"))); } public static final KafkaConfigSchema INSTANCE = new KafkaConfigSchema(CONFIGS, SYNONYMS); diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java index d0d22a9856..e27e96dd36 100644 --- a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java +++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java @@ -595,8 +595,8 @@ public boolean isSwitchedFromClassicWithRemoteStorage() { if (wasRemoteStorageExplicitlySet() && wasRemoteStorageEnabled()) { return true; } - // CLASSIC→DISKLESS (single request): Remote-Storage is being newly enabled, requires consolidation gate - return isRemoteStorageConsolidationEnabled && isRemoteStorageBecomesEnabled(); + // CLASSIC→DISKLESS (single request): Remote-Storage is being newly enabled in the same request + return isRemoteStorageBecomesEnabled(); } /** Both overrides were already present and remain off; used to skip mutual exclusion without consolidation. */