Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 44 additions & 0 deletions docs/cn/auto_concurrency_limiter.md
Original file line number Diff line number Diff line change
Expand Up @@ -154,3 +154,47 @@ netflix中的gradient算法公式为:max_concurrency = min_latency / latency *
* gradient算法中的max_concurrency / latency从概念上和qps有关联(根据little's law),但可能严重脱节。比如在重测
min_latency前,若所有latency都小于min_latency,那么max_concurrency会不断下降甚至到0;但按照本算法,max_qps和min_latency仍然是稳定的,它们计算出的max_concurrency也不会剧烈变动。究其本质,gradient算法在迭代max_concurrency时,latency并不能代表实际并发为max_concurrency时的延时,两者是脱节的,所以max_concurrency / latency的实际物理含义不明,与qps可能差异甚大,最后导致了很大的偏差。
* gradient算法的queue_size推荐为sqrt(max_concurrency),这是不合理的。netflix对queue_size的理解大概是代表各种不可控环节的缓存,比如socket里的,和max_concurrency存在一定的正向关系情有可原。但在我们的理解中,这部分queue_size作用微乎其微,没有或用常量即可。我们关注的queue_size是给concurrency上升留出的探索空间: max_concurrency的更新是有延迟的,在并发从低到高的增长过程中,queue_size的作用就是在max_concurrency更新前不限制qps上升。而当concurrency高时,服务可能已经过载了,queue_size就应该小一点,防止进一步恶化延时。这里的queue_size和并发是反向关系。

## 参数配置

### 错误请求惩罚

自适应限流在计算平均延时时,默认会将失败请求的延时也计入统计,以避免在下游服务异常时过度放大max_concurrency。相关参数如下:

| GFlag | 默认值 | 说明 |
|-------|--------|------|
| auto_cl_enable_error_punish | true | 是否开启错误请求惩罚。关闭后失败请求不计入延时统计 |
| auto_cl_fail_punish_ratio | 1.0 | 惩罚系数。值越大惩罚越激进,失败请求对平均延时的影响越大 |
| auto_cl_error_rate_punish_threshold | 0 | 错误率惩罚阈值。见下文详细说明 |

#### 错误率惩罚阈值

`auto_cl_error_rate_punish_threshold`用于设置错误率"死区",低于该阈值的错误率不会产生惩罚,避免少量错误请求对max_concurrency的过度影响。

- **默认值为0**:保持原有行为,所有失败请求都会产生惩罚
- **设置为正值(如0.1)**:
- 错误率 ≤ 阈值时:惩罚为0,平均延时仅由成功请求决定
- 错误率 > 阈值时:惩罚线性增长,从0逐步恢复到完整惩罚

线性衰减公式:`punish_factor = (error_rate - threshold) / (1.0 - threshold)`

**使用场景**:当服务存在少量固有错误(如个别请求参数异常)时,这些错误不应影响对服务处理能力的判断。通过设置合理的阈值(如0.05或0.1),可以过滤掉这部分噪声。

**示例**:
```
# 错误率低于10%时不惩罚,高于10%时线性增加惩罚
--auto_cl_error_rate_punish_threshold=0.1
```

### 其他参数

| GFlag | 默认值 | 说明 |
|-------|--------|------|
| auto_cl_sample_window_size_ms | 1000 | 采样窗口时长(毫秒) |
| auto_cl_min_sample_count | 100 | 采样窗口内的最小样本数,不足则丢弃该窗口 |
| auto_cl_max_sample_count | 200 | 采样窗口内的最大样本数,超过则提前提交窗口 |
| auto_cl_initial_max_concurrency | 40 | 初始最大并发数 |
| auto_cl_alpha_factor_for_ema | 0.1 | EMA平滑系数,值越小单次采样窗口对结果影响越小 |
| auto_cl_max_explore_ratio | 0.3 | 最大探索比例,值越大对延时波动的容忍度越高 |
| auto_cl_min_explore_ratio | 0.06 | 最小探索比例,用于判断服务负载情况 |
| auto_cl_noload_latency_remeasure_interval_ms | 50000 | 重测noload_latency的间隔(毫秒) |
30 changes: 29 additions & 1 deletion src/brpc/policy/auto_concurrency_limiter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,14 @@ DEFINE_int32(auto_cl_latency_fluctuation_correction_factor, 1,
"the value, the higher the tolerance for the fluctuation of the "
"latency. If the value is too large, the latency will be higher "
"when the server is overloaded.");
DEFINE_double(auto_cl_error_rate_punish_threshold, 0,
"Threshold for error-rate-based punishment attenuation. "
"0 (default): no effect, original punishment logic is used. "
"> 0 (e.g. 0.1): error rates below this threshold produce zero "
"punishment; above it the punishment scales linearly from 0 to "
"full strength. Only effective when auto_cl_enable_error_punish "
"is true. Example: 0.1 means error rates below 10%% are not "
"punished.");

AutoConcurrencyLimiter::AutoConcurrencyLimiter()
: _max_concurrency(FLAGS_auto_cl_initial_max_concurrency)
Expand Down Expand Up @@ -236,7 +244,27 @@ void AutoConcurrencyLimiter::AdjustMaxConcurrency(int next_max_concurrency) {
void AutoConcurrencyLimiter::UpdateMaxConcurrency(int64_t sampling_time_us) {
int32_t total_succ_req = _total_succ_req.load(butil::memory_order_relaxed);
double failed_punish = _sw.total_failed_us * FLAGS_auto_cl_fail_punish_ratio;
int64_t avg_latency =

// Threshold-based attenuation: when auto_cl_error_rate_punish_threshold > 0,
// attenuate punishment based on error rate. Inspired by Sentinel's threshold-
// based circuit breaker: low error rates should not inflate avg_latency.
// Above threshold, punishment scales linearly from 0 to full strength.
// When threshold is 0 (default), this block is skipped entirely.
if (FLAGS_auto_cl_error_rate_punish_threshold > 0 && _sw.failed_count > 0) {
double threshold = FLAGS_auto_cl_error_rate_punish_threshold;
double error_rate = static_cast<double>(_sw.failed_count) /
(_sw.succ_count + _sw.failed_count);
if (error_rate <= threshold) {
// Error rate within dead zone, cancel punishment.
failed_punish = 0;
} else {
// Linear ramp: 0 at threshold, 1.0 at 100% error rate.
double punish_factor = (error_rate - threshold) / (1.0 - threshold);
failed_punish *= punish_factor;
}
Comment thread
feng-y marked this conversation as resolved.
Outdated
}

int64_t avg_latency =
std::ceil((failed_punish + _sw.total_succ_us) / _sw.succ_count);
double qps = 1000000.0 * total_succ_req / (sampling_time_us - _sw.start_time_us);
UpdateMinLatency(avg_latency);
Expand Down
169 changes: 169 additions & 0 deletions test/brpc_auto_concurrency_limiter_unittest.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include "brpc/policy/auto_concurrency_limiter.h"
#include "butil/time.h"
#include "bthread/bthread.h"
Comment thread
feng-y marked this conversation as resolved.
Outdated
#include <gtest/gtest.h>

Comment thread
feng-y marked this conversation as resolved.
namespace brpc {
namespace policy {

DECLARE_int32(auto_cl_sample_window_size_ms);
DECLARE_int32(auto_cl_min_sample_count);
DECLARE_int32(auto_cl_max_sample_count);
DECLARE_bool(auto_cl_enable_error_punish);
DECLARE_double(auto_cl_fail_punish_ratio);
DECLARE_double(auto_cl_error_rate_punish_threshold);

} // namespace policy
} // namespace brpc

class AutoConcurrencyLimiterTest : public ::testing::Test {
protected:
void SetUp() override {
// Save original values
orig_sample_window_size_ms_ = brpc::policy::FLAGS_auto_cl_sample_window_size_ms;
orig_min_sample_count_ = brpc::policy::FLAGS_auto_cl_min_sample_count;
orig_max_sample_count_ = brpc::policy::FLAGS_auto_cl_max_sample_count;
orig_enable_error_punish_ = brpc::policy::FLAGS_auto_cl_enable_error_punish;
orig_fail_punish_ratio_ = brpc::policy::FLAGS_auto_cl_fail_punish_ratio;
orig_error_rate_threshold_ = brpc::policy::FLAGS_auto_cl_error_rate_punish_threshold;

// Set test-friendly values
brpc::policy::FLAGS_auto_cl_sample_window_size_ms = 1000;
brpc::policy::FLAGS_auto_cl_min_sample_count = 5;
brpc::policy::FLAGS_auto_cl_max_sample_count = 200;
brpc::policy::FLAGS_auto_cl_enable_error_punish = true;
brpc::policy::FLAGS_auto_cl_fail_punish_ratio = 1.0;
}

void TearDown() override {
// Restore original values
brpc::policy::FLAGS_auto_cl_sample_window_size_ms = orig_sample_window_size_ms_;
brpc::policy::FLAGS_auto_cl_min_sample_count = orig_min_sample_count_;
brpc::policy::FLAGS_auto_cl_max_sample_count = orig_max_sample_count_;
brpc::policy::FLAGS_auto_cl_enable_error_punish = orig_enable_error_punish_;
brpc::policy::FLAGS_auto_cl_fail_punish_ratio = orig_fail_punish_ratio_;
brpc::policy::FLAGS_auto_cl_error_rate_punish_threshold = orig_error_rate_threshold_;
}

private:
int32_t orig_sample_window_size_ms_;
int32_t orig_min_sample_count_;
int32_t orig_max_sample_count_;
bool orig_enable_error_punish_;
double orig_fail_punish_ratio_;
double orig_error_rate_threshold_;
};

// Helper function to add samples and trigger window completion
// Uses synthetic timestamps instead of sleeping for faster, deterministic tests.
// The final successful sample is used as the trigger, so actual counts match
// succ_count/fail_count exactly (preserving intended error rates).
void AddSamplesAndTriggerWindow(brpc::policy::AutoConcurrencyLimiter& limiter,
int succ_count, int64_t succ_latency,
int fail_count, int64_t fail_latency) {
ASSERT_GT(succ_count, 0) << "Need at least 1 success to trigger window";
int64_t now = butil::gettimeofday_us();

// Add successful samples (reserve one for the trigger)
for (int i = 0; i < succ_count - 1; ++i) {
limiter.AddSample(0, succ_latency, now);
}
// Add failed samples
for (int i = 0; i < fail_count; ++i) {
limiter.AddSample(1, fail_latency, now);
}

// Advance timestamp past window expiry instead of sleeping
int64_t after_window = now + brpc::policy::FLAGS_auto_cl_sample_window_size_ms * 1000 + 1000;

// Use the final success sample to trigger window submission
limiter.AddSample(0, succ_latency, after_window);
Comment thread
feng-y marked this conversation as resolved.
}

// Test 1: Backward compatibility - threshold=0 preserves original punishment behavior
TEST_F(AutoConcurrencyLimiterTest, ThresholdZeroPreservesOriginalBehavior) {
brpc::policy::FLAGS_auto_cl_error_rate_punish_threshold = 0;
brpc::policy::FLAGS_auto_cl_sample_window_size_ms = 10;

brpc::policy::AutoConcurrencyLimiter limiter;
AddSamplesAndTriggerWindow(limiter, 90, 100, 10, 1000);

// 10% error rate, threshold=0 means full punishment applied
// avg_latency = (10*1000 + 90*100) / 90 = 211us
Comment thread
feng-y marked this conversation as resolved.
Outdated
ASSERT_GT(limiter._min_latency_us, 180);
ASSERT_LT(limiter._min_latency_us, 250);
Comment thread
feng-y marked this conversation as resolved.
}

// Test 2: Dead zone - error rate below threshold produces zero punishment
TEST_F(AutoConcurrencyLimiterTest, BelowThresholdZeroPunishment) {
brpc::policy::FLAGS_auto_cl_error_rate_punish_threshold = 0.2; // 20% threshold
brpc::policy::FLAGS_auto_cl_sample_window_size_ms = 10;

brpc::policy::AutoConcurrencyLimiter limiter;
AddSamplesAndTriggerWindow(limiter, 90, 100, 10, 1000);

// 10% error rate < 20% threshold, punishment should be zero
// avg_latency = 90*100 / 90 = 100us (no inflation)
ASSERT_GT(limiter._min_latency_us, 80);
ASSERT_LT(limiter._min_latency_us, 130);
}

// Test 3: Boundary - error rate exactly at threshold produces zero punishment
TEST_F(AutoConcurrencyLimiterTest, ExactlyAtThresholdZeroPunishment) {
brpc::policy::FLAGS_auto_cl_error_rate_punish_threshold = 0.1; // 10% threshold
brpc::policy::FLAGS_auto_cl_sample_window_size_ms = 10;

brpc::policy::AutoConcurrencyLimiter limiter;
AddSamplesAndTriggerWindow(limiter, 90, 100, 10, 1000);

// 10% error rate == 10% threshold, punishment should be zero
// avg_latency = 90*100 / 90 = 100us
ASSERT_GT(limiter._min_latency_us, 80);
ASSERT_LT(limiter._min_latency_us, 130);
}

// Test 4: Linear scaling - above threshold, punishment scales proportionally
TEST_F(AutoConcurrencyLimiterTest, AboveThresholdLinearScaling) {
brpc::policy::FLAGS_auto_cl_error_rate_punish_threshold = 0.1; // 10% threshold
brpc::policy::FLAGS_auto_cl_sample_window_size_ms = 10;

// Case A: 50% error rate
// punish_factor = (0.5 - 0.1) / (1.0 - 0.1) = 0.444
// failed_punish = 50 * 1000 * 0.444 = 22222us
// avg_latency = (22222 + 50*100) / 50 = 544us
Comment thread
feng-y marked this conversation as resolved.
Outdated
{
brpc::policy::AutoConcurrencyLimiter limiter;
AddSamplesAndTriggerWindow(limiter, 50, 100, 50, 1000);
ASSERT_GT(limiter._min_latency_us, 450);
ASSERT_LT(limiter._min_latency_us, 650);
}

// Case B: 90% error rate (near full punishment)
// punish_factor = (0.9 - 0.1) / (1.0 - 0.1) = 0.889
// failed_punish = 90 * 1000 * 0.889 = 80000us
// avg_latency = (80000 + 10*100) / 10 = 8100us
{
brpc::policy::AutoConcurrencyLimiter limiter;
AddSamplesAndTriggerWindow(limiter, 10, 100, 90, 1000);
ASSERT_GT(limiter._min_latency_us, 7000);
ASSERT_LT(limiter._min_latency_us, 9000);
}
}

Loading