Skip to content

Commit c1c8896

Browse files
authored
KAFKA-19984 Throttle-related dynamic configurations should not have the isReadOnly flag (#21131)
The following configs are weird because those values are definitely not READONLY - leader.replication.throttled.rate - follower.replication.throttled.rate - replica.alter.log.dirs.io.max.bytes.per.second Add them into the `DynamicBrokerConfig` to ensure when we query it's not READONLY Reviewers: Lan Ding <[email protected]>, Chia-Ping Tsai <[email protected]>
1 parent 63226df commit c1c8896

3 files changed

Lines changed: 31 additions & 1 deletion

File tree

server-common/src/main/java/org/apache/kafka/server/config/QuotaConfig.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -225,6 +225,8 @@ public static ConfigDef brokerQuotaConfigs() {
225225
ConfigDef.Importance.MEDIUM, QuotaConfig.REPLICA_ALTER_LOG_DIRS_IO_MAX_BYTES_PER_SECOND_DOC);
226226
}
227227

228+
public static final Set<String> BROKER_QUOTA_CONFIGS = Set.copyOf(brokerQuotaConfigs().names());
229+
228230
public static ConfigDef userAndClientQuotaConfigs() {
229231
ConfigDef configDef = new ConfigDef();
230232
buildUserClientQuotaConfigDef(configDef);

server/src/main/java/org/apache/kafka/server/config/DynamicBrokerConfig.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,8 @@ public class DynamicBrokerConfig {
7979
DynamicReplicationConfig.RECONFIGURABLE_CONFIGS,
8080
List.of(AbstractConfig.CONFIG_PROVIDERS_CONFIG),
8181
GroupCoordinatorConfig.RECONFIGURABLE_CONFIGS,
82-
ShareCoordinatorConfig.RECONFIGURABLE_CONFIGS)
82+
ShareCoordinatorConfig.RECONFIGURABLE_CONFIGS,
83+
QuotaConfig.BROKER_QUOTA_CONFIGS)
8384
.flatMap(Collection::stream)
8485
.collect(Collectors.toUnmodifiableSet());
8586

server/src/test/java/org/apache/kafka/server/BootstrapControllersIntegrationTest.java

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@
6161
import org.apache.kafka.metadata.authorizer.StandardAuthorizer;
6262
import org.apache.kafka.network.SocketServerConfigs;
6363
import org.apache.kafka.server.common.MetadataVersion;
64+
import org.apache.kafka.server.config.QuotaConfig;
6465
import org.apache.kafka.test.TestUtils;
6566

6667
import org.junit.jupiter.api.Timeout;
@@ -72,6 +73,7 @@
7273
import java.util.Set;
7374
import java.util.concurrent.ExecutionException;
7475
import java.util.concurrent.TimeUnit;
76+
import java.util.stream.Collectors;
7577

7678
import static org.apache.kafka.clients.admin.AdminClientConfig.BOOTSTRAP_CONTROLLERS_CONFIG;
7779
import static org.apache.kafka.clients.admin.AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG;
@@ -81,6 +83,7 @@
8183
import static org.apache.kafka.server.config.ServerConfigs.AUTHORIZER_CLASS_NAME_CONFIG;
8284
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
8385
import static org.junit.jupiter.api.Assertions.assertEquals;
86+
import static org.junit.jupiter.api.Assertions.assertFalse;
8487
import static org.junit.jupiter.api.Assertions.assertNotNull;
8588
import static org.junit.jupiter.api.Assertions.assertThrows;
8689
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -429,4 +432,28 @@ public void testIncrementalAlterConfigsBySingleControllerWithDynamicQuorum(Clust
429432
public void testIncrementalAlterConfigsByAllControllersWithDynamicQuorum(ClusterInstance clusterInstance) throws Exception {
430433
testIncrementalAlterConfigs(clusterInstance, true);
431434
}
435+
436+
@ClusterTest
437+
public void testQuotaConfigsIsReadOnlyShouldBeFalse(ClusterInstance clusterInstance) throws Exception {
438+
try (Admin admin = Admin.create(adminConfig(clusterInstance, true))) {
439+
int nodeId = clusterInstance.controllers().values().iterator().next().config().nodeId();
440+
ConfigResource nodeResource = new ConfigResource(BROKER, "" + nodeId);
441+
Map<ConfigResource, Collection<AlterConfigOp>> alterations = Map.of(
442+
nodeResource, List.of(
443+
new AlterConfigOp(new ConfigEntry(QuotaConfig.LEADER_REPLICATION_THROTTLED_RATE_CONFIG, "16800"), AlterConfigOp.OpType.SET),
444+
new AlterConfigOp(new ConfigEntry(QuotaConfig.FOLLOWER_REPLICATION_THROTTLED_RATE_CONFIG, "16800"), AlterConfigOp.OpType.SET),
445+
new AlterConfigOp(new ConfigEntry(QuotaConfig.REPLICA_ALTER_LOG_DIRS_IO_MAX_BYTES_PER_SECOND_CONFIG, "16800"), AlterConfigOp.OpType.SET)
446+
));
447+
admin.incrementalAlterConfigs(alterations).all().get(1, TimeUnit.MINUTES);
448+
TestUtils.retryOnExceptionWithTimeout(30_000, () -> {
449+
Config config = admin.describeConfigs(List.of(nodeResource)).
450+
all().get(1, TimeUnit.MINUTES).get(nodeResource);
451+
Map<String, ConfigEntry> configEntries = config.entries().stream()
452+
.collect(Collectors.toMap(ConfigEntry::name, e -> e));
453+
assertFalse(configEntries.get(QuotaConfig.LEADER_REPLICATION_THROTTLED_RATE_CONFIG).isReadOnly());
454+
assertFalse(configEntries.get(QuotaConfig.FOLLOWER_REPLICATION_THROTTLED_RATE_CONFIG).isReadOnly());
455+
assertFalse(configEntries.get(QuotaConfig.REPLICA_ALTER_LOG_DIRS_IO_MAX_BYTES_PER_SECOND_CONFIG).isReadOnly());
456+
});
457+
}
458+
}
432459
}

0 commit comments

Comments
 (0)