Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,8 @@ public static ConfigDef brokerQuotaConfigs() {
ConfigDef.Importance.MEDIUM, QuotaConfig.REPLICA_ALTER_LOG_DIRS_IO_MAX_BYTES_PER_SECOND_DOC);
}

public static final Set<String> BROKER_QUOTA_CONFIGS = Set.copyOf(brokerQuotaConfigs().names());

public static ConfigDef userAndClientQuotaConfigs() {
ConfigDef configDef = new ConfigDef();
buildUserClientQuotaConfigDef(configDef);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,8 @@ public class DynamicBrokerConfig {
DynamicReplicationConfig.RECONFIGURABLE_CONFIGS,
List.of(AbstractConfig.CONFIG_PROVIDERS_CONFIG),
GroupCoordinatorConfig.RECONFIGURABLE_CONFIGS,
ShareCoordinatorConfig.RECONFIGURABLE_CONFIGS)
ShareCoordinatorConfig.RECONFIGURABLE_CONFIGS,
QuotaConfig.BROKER_QUOTA_CONFIGS)
.flatMap(Collection::stream)
.collect(Collectors.toUnmodifiableSet());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
import org.apache.kafka.metadata.authorizer.StandardAuthorizer;
import org.apache.kafka.network.SocketServerConfigs;
import org.apache.kafka.server.common.MetadataVersion;
import org.apache.kafka.server.config.QuotaConfig;
import org.apache.kafka.test.TestUtils;

import org.junit.jupiter.api.Timeout;
Expand All @@ -72,6 +73,7 @@
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import static org.apache.kafka.clients.admin.AdminClientConfig.BOOTSTRAP_CONTROLLERS_CONFIG;
import static org.apache.kafka.clients.admin.AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG;
Expand All @@ -81,6 +83,7 @@
import static org.apache.kafka.server.config.ServerConfigs.AUTHORIZER_CLASS_NAME_CONFIG;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
Expand Down Expand Up @@ -429,4 +432,28 @@ public void testIncrementalAlterConfigsBySingleControllerWithDynamicQuorum(Clust
public void testIncrementalAlterConfigsByAllControllersWithDynamicQuorum(ClusterInstance clusterInstance) throws Exception {
testIncrementalAlterConfigs(clusterInstance, true);
}

@ClusterTest
public void testQuotaConfigsIsReadOnlyShouldBeFalse(ClusterInstance clusterInstance) throws Exception {
try (Admin admin = Admin.create(adminConfig(clusterInstance, true))) {
int nodeId = clusterInstance.controllers().values().iterator().next().config().nodeId();
ConfigResource nodeResource = new ConfigResource(BROKER, "" + nodeId);
Map<ConfigResource, Collection<AlterConfigOp>> alterations = Map.of(
nodeResource, List.of(
new AlterConfigOp(new ConfigEntry(QuotaConfig.LEADER_REPLICATION_THROTTLED_RATE_CONFIG, "16800"), AlterConfigOp.OpType.SET),
new AlterConfigOp(new ConfigEntry(QuotaConfig.FOLLOWER_REPLICATION_THROTTLED_RATE_CONFIG, "16800"), AlterConfigOp.OpType.SET),
new AlterConfigOp(new ConfigEntry(QuotaConfig.REPLICA_ALTER_LOG_DIRS_IO_MAX_BYTES_PER_SECOND_CONFIG, "16800"), AlterConfigOp.OpType.SET)
));
admin.incrementalAlterConfigs(alterations).all().get(1, TimeUnit.MINUTES);
TestUtils.retryOnExceptionWithTimeout(30_000, () -> {
Config config = admin.describeConfigs(List.of(nodeResource)).
all().get(1, TimeUnit.MINUTES).get(nodeResource);
Map<String, ConfigEntry> configEntries = config.entries().stream()
.collect(Collectors.toMap(ConfigEntry::name, e -> e));
assertFalse(configEntries.get(QuotaConfig.LEADER_REPLICATION_THROTTLED_RATE_CONFIG).isReadOnly());
assertFalse(configEntries.get(QuotaConfig.FOLLOWER_REPLICATION_THROTTLED_RATE_CONFIG).isReadOnly());
assertFalse(configEntries.get(QuotaConfig.REPLICA_ALTER_LOG_DIRS_IO_MAX_BYTES_PER_SECOND_CONFIG).isReadOnly());
});
}
}
}