diff --git a/clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java b/clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java
index a1d075ae05118..24c192cf9476a 100755
--- a/clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java
+++ b/clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java
@@ -107,20 +107,22 @@ public class TopicConfig {
"(i.e. retention.ms/bytes).";
public static final String REMOTE_COPY_LAG_MS_CONFIG = "remote.copy.lag.ms";
- public static final String REMOTE_COPY_LAG_MS_DOC = "Controls how long to delay uploading segments to remote storage. " +
- "When set to 0, immediate upload without any delay check. " +
- "When set to a positive value (ms), a segment can't become eligible for upload until the time since the latest record in the segment reaches the value. " +
- "The value should not exceed the real local retention ms except the latter is retained indefinitely (-1). " +
- "When set to -1, resolves to the real local retention ms as maximum delay. " +
- "For how the real local retention time is computed, see local.retention.ms.";
+ public static final String REMOTE_COPY_LAG_MS_DOC = "Controls one of the two upload eligibility checks (time and size) for copying segments to remote storage. " +
+ "A non-active segment is upload-eligible when either this time-based check or remote.copy.lag.bytes is satisfied. " +
+ "When set to 0, uploads are immediately eligible regardless of lag checks. " +
+ "When set to a positive value (ms), the segment is time-eligible once elapsed time since its latest record reaches this value. " +
+ "When set to -1, this value is derived from effective local retention time (local.retention.ms). " +
+ "If that effective local retention time is unlimited (-1), this time-based check is not applied. " +
+ "A positive value should not exceed effective local retention time unless local retention is unlimited (-1).";
public static final String REMOTE_COPY_LAG_BYTES_CONFIG = "remote.copy.lag.bytes";
- public static final String REMOTE_COPY_LAG_BYTES_DOC = "Controls size-based delay for uploading segments to remote storage. " +
- "When set to 0, immediate upload without any delay check. " +
- "When set to a positive value (bytes), a segment can't become eligible for upload until the total bytes of log data after the segment reach the value. " +
- "The value should not exceed the real local retention bytes except the latter is retained indefinitely (-1). " +
- "When set to -1, resolves to the real local retention bytes as maximum delay. " +
- "For how the real local retention size is computed, see local.retention.bytes.";
+ public static final String REMOTE_COPY_LAG_BYTES_DOC = "Controls one of the two upload eligibility checks (time and size) for copying segments to remote storage. " +
+ "A non-active segment is upload-eligible when either this size-based check or remote.copy.lag.ms is satisfied. " +
+ "When set to 0, uploads are immediately eligible regardless of lag checks. " +
+ "When set to a positive value (bytes), the segment is size-eligible once bytes of newer local log data after that segment reaches this value. " +
+ "When set to -1, this value is derived from effective local retention size (local.retention.bytes). " +
+ "If that effective local retention size is unlimited (-1), this size-based check is not applied. " +
+ "A positive value should not exceed effective local retention size unless local retention is unlimited (-1).";
public static final String REMOTE_LOG_DELETE_ON_DISABLE_CONFIG = "remote.log.delete.on.disable";
public static final String REMOTE_LOG_DELETE_ON_DISABLE_DOC = "Determines whether tiered data for a topic should be " +
diff --git a/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala b/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
index 71ab195ed9757..96d865f8531f6 100755
--- a/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
@@ -770,6 +770,32 @@ class DynamicBrokerConfigTest {
verifyIncorrectLogLocalRetentionProps(2000L, 1000L, -1, 100)
}
+ @Test
+ def testDynamicLogRemoteCopyLagConfig(): Unit = {
+ val props = TestUtils.createBrokerConfig(0, port = 8181)
+ val config = KafkaConfig(props)
+ val dynamicLogConfig = new DynamicLogConfig(mock(classOf[LogManager]), mock(classOf[DirectoryEventHandler]))
+ config.dynamicConfig.initialize(None)
+ config.dynamicConfig.addBrokerReconfigurable(dynamicLogConfig)
+ assertEquals(RemoteLogManagerConfig.DEFAULT_LOG_REMOTE_COPY_LAG_MS, config.remoteLogManagerConfig.logRemoteCopyLagMs)
+ assertEquals(RemoteLogManagerConfig.DEFAULT_LOG_REMOTE_COPY_LAG_BYTES, config.remoteLogManagerConfig.logRemoteCopyLagBytes)
+
+ // update default config
+ val newProps = new Properties()
+ newProps.put(RemoteLogManagerConfig.LOG_REMOTE_COPY_LAG_MS_PROP, "100")
+ newProps.put(RemoteLogManagerConfig.LOG_REMOTE_COPY_LAG_BYTES_PROP, "200")
+ config.dynamicConfig.validate(newProps, perBrokerConfig = false)
+ config.dynamicConfig.updateDefaultConfig(newProps)
+ assertEquals(100L, config.remoteLogManagerConfig.logRemoteCopyLagMs())
+ assertEquals(200L, config.remoteLogManagerConfig.logRemoteCopyLagBytes())
+
+ // update per broker config
+ config.dynamicConfig.validate(newProps, perBrokerConfig = true)
+ newProps.put(RemoteLogManagerConfig.LOG_REMOTE_COPY_LAG_BYTES_PROP, "300")
+ config.dynamicConfig.updateBrokerConfig(0, newProps)
+ assertEquals(300L, config.remoteLogManagerConfig.logRemoteCopyLagBytes())
+ }
+
@Test
def testDynamicLogRemoteCopyLagThrowsOnIncorrectConfig(): Unit = {
// log remote copy lag ms cannot exceed effective log local retention ms
diff --git a/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java
index 0ca80703e0d54..93fa68e06ecf0 100644
--- a/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java
+++ b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java
@@ -169,22 +169,26 @@ public final class RemoteLogManagerConfig {
public static final Long DEFAULT_LOG_LOCAL_RETENTION_BYTES = -2L;
public static final String LOG_REMOTE_COPY_LAG_MS_PROP = "log.remote.copy.lag.ms";
- public static final String LOG_REMOTE_COPY_LAG_MS_DOC = "Controls how long to delay uploading segments to remote storage. " +
- "When set to 0, immediate upload without any delay check. " +
- "When set to a positive value (ms), a segment can't become eligible for upload until the time since the latest record in the segment reaches the value. " +
- "The value should not exceed the real local retention ms except the latter is retained indefinitely (-1). " +
- "When set to -1, resolves to the real local retention ms as maximum delay. " +
- "For how the real local retention time is computed, see log.local.retention.ms.";
+ public static final String LOG_REMOTE_COPY_LAG_MS_DOC = "Controls one of the two upload eligibility checks (time and size) for copying segments to remote storage. " +
+ "A non-active segment is upload-eligible when either this time-based check or log.remote.copy.lag.bytes is satisfied. " +
+ "When set to 0, uploads are immediately eligible regardless of lag checks. " +
+ "When set to a positive value (ms), the segment is time-eligible once elapsed time since its latest record reaches this value. " +
+ "When set to -1, this value is derived from effective local retention time (log.local.retention.ms). " +
+ "If that effective local retention time is unlimited (-1), this time-based check is not applied. " +
+ "A positive value should not exceed effective local retention time unless local retention is unlimited (-1).";
+ public static final Long MAX_LOG_REMOTE_COPY_LAG_MS = -1L; // It indicates the value depends on log.local.retention.ms
public static final Long DEFAULT_LOG_REMOTE_COPY_LAG_MS = 0L;
public static final String LOG_REMOTE_COPY_LAG_BYTES_PROP = "log.remote.copy.lag.bytes";
- public static final String LOG_REMOTE_COPY_LAG_BYTES_DOC = "Controls size-based delay for uploading segments to remote storage. " +
- "When set to 0, immediate upload without any delay check. " +
- "When set to a positive value (bytes), a segment can't become eligible for upload until the total bytes of log data after the segment reach the value. " +
- "The value should not exceed the real local retention bytes except the latter is retained indefinitely (-1). " +
- "When set to -1, resolves to the real local retention bytes as maximum delay. " +
- "For how the real local retention size is computed, see log.local.retention.bytes.";
- public static final Long DEFAULT_LOG_REMOTE_COPY_LAG_BYTES = 0L;
+ public static final String LOG_REMOTE_COPY_LAG_BYTES_DOC = "Controls one of the two upload eligibility checks (time and size) for copying segments to remote storage. " +
+ "A non-active segment is upload-eligible when either this size-based check or log.remote.copy.lag.ms is satisfied. " +
+ "When set to 0, uploads are immediately eligible regardless of lag checks. " +
+ "When set to a positive value (bytes), the segment is size-eligible once bytes of newer local log data after that segment reaches this value. " +
+ "When set to -1, this value is derived from effective local retention size (log.local.retention.bytes). " +
+ "If that effective local retention size is unlimited (-1), this size-based check is not applied. " +
+ "A positive value should not exceed effective local retention size unless local retention is unlimited (-1).";
+ public static final Long MAX_LOG_REMOTE_COPY_LAG_BYTES = -1L; // It indicates the value depends on log.local.retention.bytes
+ public static final Long DEFAULT_LOG_REMOTE_COPY_LAG_BYTES = MAX_LOG_REMOTE_COPY_LAG_BYTES;
public static final String REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND_PROP = "remote.log.manager.copy.max.bytes.per.second";
public static final String REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND_DOC = "The maximum number of bytes that can be copied from local storage to remote storage per second. " +
@@ -368,13 +372,13 @@ public static ConfigDef configDef() {
.define(LOG_REMOTE_COPY_LAG_MS_PROP,
LONG,
DEFAULT_LOG_REMOTE_COPY_LAG_MS,
- atLeast(-1),
+ atLeast(MAX_LOG_REMOTE_COPY_LAG_MS),
MEDIUM,
LOG_REMOTE_COPY_LAG_MS_DOC)
.define(LOG_REMOTE_COPY_LAG_BYTES_PROP,
LONG,
DEFAULT_LOG_REMOTE_COPY_LAG_BYTES,
- atLeast(-1),
+ atLeast(MAX_LOG_REMOTE_COPY_LAG_BYTES),
MEDIUM,
LOG_REMOTE_COPY_LAG_BYTES_DOC)
.define(REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND_PROP,
diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java
index c1a6361e50db1..661df953071b0 100644
--- a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java
+++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java
@@ -32,6 +32,7 @@
import org.apache.kafka.server.config.QuotaConfig;
import org.apache.kafka.server.config.ServerLogConfigs;
import org.apache.kafka.server.config.ServerTopicConfigSynonyms;
+import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig;
import org.apache.kafka.server.record.BrokerCompressionType;
import java.util.Collections;
@@ -144,10 +145,6 @@ public Optional serverConfigName(String configName) {
public static final boolean DEFAULT_REMOTE_LOG_DELETE_ON_DISABLE_CONFIG = false;
public static final long DEFAULT_LOCAL_RETENTION_BYTES = -2; // It indicates the value to be derived from RetentionBytes
public static final long DEFAULT_LOCAL_RETENTION_MS = -2; // It indicates the value to be derived from RetentionMs
- public static final long DEFAULT_REMOTE_COPY_LAG_MS = 0;
- public static final long DEFAULT_REMOTE_COPY_LAG_BYTES = 0;
- public static final long MAX_REMOTE_COPY_LAG_MS = -1; // It indicates the value depends on local retention ms
- public static final long MAX_REMOTE_COPY_LAG_BYTES = -1; // It indicates the value depends on local retention bytes
public static final String INTERNAL_SEGMENT_BYTES_CONFIG = "internal.segment.bytes";
public static final String INTERNAL_SEGMENT_BYTES_DOC = "The maximum size of a single log file. This should be used for testing only.";
@@ -257,8 +254,8 @@ public Optional serverConfigName(String configName) {
.define(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG, LONG, DEFAULT_LOCAL_RETENTION_BYTES, atLeast(-2), MEDIUM,
TopicConfig.LOCAL_LOG_RETENTION_BYTES_DOC)
.define(TopicConfig.REMOTE_LOG_COPY_DISABLE_CONFIG, BOOLEAN, false, MEDIUM, TopicConfig.REMOTE_LOG_COPY_DISABLE_DOC)
- .define(TopicConfig.REMOTE_COPY_LAG_MS_CONFIG, LONG, DEFAULT_REMOTE_COPY_LAG_MS, atLeast(-1), MEDIUM, TopicConfig.REMOTE_COPY_LAG_MS_DOC)
- .define(TopicConfig.REMOTE_COPY_LAG_BYTES_CONFIG, LONG, DEFAULT_REMOTE_COPY_LAG_BYTES, atLeast(-1), MEDIUM, TopicConfig.REMOTE_COPY_LAG_BYTES_DOC)
+ .define(TopicConfig.REMOTE_COPY_LAG_MS_CONFIG, LONG, RemoteLogManagerConfig.DEFAULT_LOG_REMOTE_COPY_LAG_MS, atLeast(RemoteLogManagerConfig.MAX_LOG_REMOTE_COPY_LAG_MS), MEDIUM, TopicConfig.REMOTE_COPY_LAG_MS_DOC)
+ .define(TopicConfig.REMOTE_COPY_LAG_BYTES_CONFIG, LONG, RemoteLogManagerConfig.DEFAULT_LOG_REMOTE_COPY_LAG_BYTES, atLeast(RemoteLogManagerConfig.MAX_LOG_REMOTE_COPY_LAG_BYTES), MEDIUM, TopicConfig.REMOTE_COPY_LAG_BYTES_DOC)
.define(TopicConfig.REMOTE_LOG_DELETE_ON_DISABLE_CONFIG, BOOLEAN, false, MEDIUM, TopicConfig.REMOTE_LOG_DELETE_ON_DISABLE_DOC)
.define(TopicConfig.ERRORS_DEADLETTERQUEUE_GROUP_ENABLE_CONFIG, BOOLEAN, false, MEDIUM, TopicConfig.ERRORS_DEADLETTERQUEUE_GROUP_ENABLE_DOC)
.defineInternal(INTERNAL_SEGMENT_BYTES_CONFIG, INT, null, null, MEDIUM, INTERNAL_SEGMENT_BYTES_DOC);
@@ -418,13 +415,12 @@ public Boolean remoteLogCopyDisable() {
return remoteLogConfig.remoteLogCopyDisable;
}
-
public long remoteCopyLagMs() {
- return remoteLogConfig.remoteCopyLagMs == MAX_REMOTE_COPY_LAG_MS ? localRetentionMs() : remoteLogConfig.remoteCopyLagMs;
+ return remoteLogConfig.remoteCopyLagMs == RemoteLogManagerConfig.MAX_LOG_REMOTE_COPY_LAG_MS ? localRetentionMs() : remoteLogConfig.remoteCopyLagMs;
}
public long remoteCopyLagBytes() {
- return remoteLogConfig.remoteCopyLagBytes == MAX_REMOTE_COPY_LAG_BYTES ? localRetentionBytes() : remoteLogConfig.remoteCopyLagBytes;
+ return remoteLogConfig.remoteCopyLagBytes == RemoteLogManagerConfig.MAX_LOG_REMOTE_COPY_LAG_BYTES ? localRetentionBytes() : remoteLogConfig.remoteCopyLagBytes;
}
public long localRetentionMs() {
diff --git a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLagCopyTest.java b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLagCopyTest.java
index 63d6e004fe2c5..f0941f894d803 100644
--- a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLagCopyTest.java
+++ b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLagCopyTest.java
@@ -288,9 +288,9 @@ public void testCandidateLogSegmentsUploadWhenBothRemoteCopyLagConfigsAreDefault
Map logProps = new HashMap<>();
logProps.put(TopicConfig.RETENTION_MS_CONFIG, 10_000L);
- logProps.put(TopicConfig.REMOTE_COPY_LAG_MS_CONFIG, LogConfig.DEFAULT_REMOTE_COPY_LAG_MS);
+ logProps.put(TopicConfig.REMOTE_COPY_LAG_MS_CONFIG, RemoteLogManagerConfig.DEFAULT_LOG_REMOTE_COPY_LAG_MS);
logProps.put(TopicConfig.RETENTION_BYTES_CONFIG, 10_000L);
- logProps.put(TopicConfig.REMOTE_COPY_LAG_BYTES_CONFIG, LogConfig.DEFAULT_REMOTE_COPY_LAG_BYTES);
+ logProps.put(TopicConfig.REMOTE_COPY_LAG_BYTES_CONFIG, RemoteLogManagerConfig.DEFAULT_LOG_REMOTE_COPY_LAG_BYTES);
LogConfig logConfig = new LogConfig(logProps);
when(log.config()).thenReturn(logConfig);
when(log.logSegments(5L, Long.MAX_VALUE)).thenReturn(List.of(segment1, segment2, activeSegment));
diff --git a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfigTest.java b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfigTest.java
index 70ca4333cce24..a3b7bd0761798 100644
--- a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfigTest.java
+++ b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfigTest.java
@@ -29,6 +29,7 @@
@SuppressWarnings("removal")
public class RemoteLogManagerConfigTest {
+
@Test
public void testValidConfigs() {
String rsmPrefix = "__custom.rsm.";
@@ -53,6 +54,8 @@ public void testDefaultConfigs() {
RemoteLogManagerConfig remoteLogManagerConfigEmptyConfig = new RLMTestConfig(emptyProps).remoteLogManagerConfig();
assertEquals(RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_MANAGER_THREAD_POOL_SIZE, remoteLogManagerConfigEmptyConfig.remoteLogManagerThreadPoolSize());
assertEquals(RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_NUM, remoteLogManagerConfigEmptyConfig.remoteLogManagerCopyNumQuotaSamples());
+ assertEquals(RemoteLogManagerConfig.DEFAULT_LOG_REMOTE_COPY_LAG_MS, remoteLogManagerConfigEmptyConfig.logRemoteCopyLagMs());
+ assertEquals(RemoteLogManagerConfig.DEFAULT_LOG_REMOTE_COPY_LAG_BYTES, remoteLogManagerConfigEmptyConfig.logRemoteCopyLagBytes());
}
@Test