Skip to content

Commit bb081be

Browse files
feng-yclaude
andauthored
feat(auto_cl): add error rate threshold for punishment attenuation (#3219)
* feat(auto_cl): add error rate threshold for punishment attenuation Add new GFlag `auto_cl_error_rate_punish_threshold` to enable error-rate-based punishment attenuation in AutoConcurrencyLimiter. Problem: Low error rates (e.g., 1.3% sporadic timeouts) cause disproportionate avg_latency inflation (+31%), leading the limiter to mistakenly shrink max_concurrency and trigger ELIMIT rejections. Solution: Inspired by Alibaba Sentinel's threshold-based approach: - threshold=0 (default): Original behavior preserved (backward compat) - threshold>0 (e.g., 0.1): Error rates below threshold produce zero punishment; above it, punishment scales linearly from 0 to full Example: With threshold=0.1, a 5% error rate produces no punishment, while a 50% error rate produces 44% of the original punishment. --------- Co-authored-by: Claude Opus 4.5 <[email protected]>
1 parent d22fa17 commit bb081be

File tree

4 files changed

+229
-1
lines changed

4 files changed

+229
-1
lines changed

docs/cn/auto_concurrency_limiter.md

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -154,3 +154,21 @@ netflix中的gradient算法公式为:max_concurrency = min_latency / latency *
154154
* gradient算法中的max_concurrency / latency从概念上和qps有关联(根据little's law),但可能严重脱节。比如在重测
155155
min_latency前,若所有latency都小于min_latency,那么max_concurrency会不断下降甚至到0;但按照本算法,max_qps和min_latency仍然是稳定的,它们计算出的max_concurrency也不会剧烈变动。究其本质,gradient算法在迭代max_concurrency时,latency并不能代表实际并发为max_concurrency时的延时,两者是脱节的,所以max_concurrency / latency的实际物理含义不明,与qps可能差异甚大,最后导致了很大的偏差。
156156
* gradient算法的queue_size推荐为sqrt(max_concurrency),这是不合理的。netflix对queue_size的理解大概是代表各种不可控环节的缓存,比如socket里的,和max_concurrency存在一定的正向关系情有可原。但在我们的理解中,这部分queue_size作用微乎其微,没有或用常量即可。我们关注的queue_size是给concurrency上升留出的探索空间: max_concurrency的更新是有延迟的,在并发从低到高的增长过程中,queue_size的作用就是在max_concurrency更新前不限制qps上升。而当concurrency高时,服务可能已经过载了,queue_size就应该小一点,防止进一步恶化延时。这里的queue_size和并发是反向关系。
157+
158+
## 错误率惩罚阈值
159+
160+
`auto_cl_error_rate_punish_threshold`用于设置错误率"死区",低于该阈值的错误率不会产生惩罚,避免少量错误请求对max_concurrency的过度影响。
161+
162+
| GFlag | 默认值 | 有效范围 | 说明 |
163+
|-------|--------|----------|------|
164+
| auto_cl_error_rate_punish_threshold | 0 | [0, 1) | 错误率惩罚阈值,0表示禁用 |
165+
166+
- **默认值为0**:禁用该功能,保持原有行为
167+
- **设置为有效值(如0.1)**:错误率 ≤ 阈值时惩罚为0;错误率 > 阈值时惩罚线性增长
168+
- **无效值处理**:≥1 的值会被忽略,等同于0
169+
170+
**示例**
171+
```
172+
# 错误率低于10%时不惩罚,高于10%时线性增加惩罚
173+
--auto_cl_error_rate_punish_threshold=0.1
174+
```

src/brpc/policy/auto_concurrency_limiter.cpp

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,13 @@ DEFINE_int32(auto_cl_latency_fluctuation_correction_factor, 1,
7777
"the value, the higher the tolerance for the fluctuation of the "
7878
"latency. If the value is too large, the latency will be higher "
7979
"when the server is overloaded.");
80+
DEFINE_double(auto_cl_error_rate_punish_threshold, 0,
81+
"Threshold for error-rate-based punishment attenuation. "
82+
"Valid range: [0, 1). 0 (default) disables the feature. "
83+
"Values >= 1 are ignored and treated as 0. "
84+
"e.g. 0.1: error rates below 10%% produce zero punishment; "
85+
"above it the punishment scales linearly from 0 to full strength. "
86+
"Only effective when auto_cl_enable_error_punish is true.");
8087

8188
AutoConcurrencyLimiter::AutoConcurrencyLimiter()
8289
: _max_concurrency(FLAGS_auto_cl_initial_max_concurrency)
@@ -236,7 +243,29 @@ void AutoConcurrencyLimiter::AdjustMaxConcurrency(int next_max_concurrency) {
236243
void AutoConcurrencyLimiter::UpdateMaxConcurrency(int64_t sampling_time_us) {
237244
int32_t total_succ_req = _total_succ_req.load(butil::memory_order_relaxed);
238245
double failed_punish = _sw.total_failed_us * FLAGS_auto_cl_fail_punish_ratio;
239-
int64_t avg_latency =
246+
247+
// Threshold-based attenuation: when 0 < threshold < 1, attenuate punishment
248+
// based on error rate. Inspired by Sentinel's threshold-based circuit breaker:
249+
// low error rates should not inflate avg_latency. Above threshold, punishment
250+
// scales linearly from 0 to full strength.
251+
// Invalid values (<=0 or >=1) skip this block entirely, preserving original behavior.
252+
if (FLAGS_auto_cl_error_rate_punish_threshold > 0 &&
253+
FLAGS_auto_cl_error_rate_punish_threshold < 1.0 &&
254+
_sw.failed_count > 0) {
255+
double threshold = FLAGS_auto_cl_error_rate_punish_threshold;
256+
double error_rate = static_cast<double>(_sw.failed_count) /
257+
(_sw.succ_count + _sw.failed_count);
258+
if (error_rate <= threshold) {
259+
// Error rate within dead zone, cancel punishment.
260+
failed_punish = 0;
261+
} else {
262+
// Linear ramp: 0 at threshold, 1.0 at 100% error rate.
263+
double punish_factor = (error_rate - threshold) / (1.0 - threshold);
264+
failed_punish *= punish_factor;
265+
}
266+
}
267+
268+
int64_t avg_latency =
240269
std::ceil((failed_punish + _sw.total_succ_us) / _sw.succ_count);
241270
double qps = 1000000.0 * total_succ_req / (sampling_time_us - _sw.start_time_us);
242271
UpdateMinLatency(avg_latency);

test/BUILD.bazel

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -269,6 +269,19 @@ cc_test(
269269
],
270270
)
271271

272+
cc_test(
273+
name = "brpc_auto_concurrency_limiter_test",
274+
srcs = [
275+
"brpc_auto_concurrency_limiter_unittest.cpp",
276+
],
277+
copts = COPTS,
278+
deps = [
279+
"//:brpc",
280+
"@com_google_googletest//:gtest",
281+
"@com_google_googletest//:gtest_main",
282+
],
283+
)
284+
272285
refresh_compile_commands(
273286
name = "brpc_test_compdb",
274287
# Specify the targets of interest.
Lines changed: 168 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,168 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
#include "brpc/policy/auto_concurrency_limiter.h"
19+
#include "butil/time.h"
20+
#include <gtest/gtest.h>
21+
22+
namespace brpc {
23+
namespace policy {
24+
25+
DECLARE_int32(auto_cl_sample_window_size_ms);
26+
DECLARE_int32(auto_cl_min_sample_count);
27+
DECLARE_int32(auto_cl_max_sample_count);
28+
DECLARE_bool(auto_cl_enable_error_punish);
29+
DECLARE_double(auto_cl_fail_punish_ratio);
30+
DECLARE_double(auto_cl_error_rate_punish_threshold);
31+
32+
} // namespace policy
33+
} // namespace brpc
34+
35+
class AutoConcurrencyLimiterTest : public ::testing::Test {
36+
protected:
37+
void SetUp() override {
38+
// Save original values
39+
orig_sample_window_size_ms_ = brpc::policy::FLAGS_auto_cl_sample_window_size_ms;
40+
orig_min_sample_count_ = brpc::policy::FLAGS_auto_cl_min_sample_count;
41+
orig_max_sample_count_ = brpc::policy::FLAGS_auto_cl_max_sample_count;
42+
orig_enable_error_punish_ = brpc::policy::FLAGS_auto_cl_enable_error_punish;
43+
orig_fail_punish_ratio_ = brpc::policy::FLAGS_auto_cl_fail_punish_ratio;
44+
orig_error_rate_threshold_ = brpc::policy::FLAGS_auto_cl_error_rate_punish_threshold;
45+
46+
// Set test-friendly values
47+
brpc::policy::FLAGS_auto_cl_sample_window_size_ms = 1000;
48+
brpc::policy::FLAGS_auto_cl_min_sample_count = 5;
49+
brpc::policy::FLAGS_auto_cl_max_sample_count = 200;
50+
brpc::policy::FLAGS_auto_cl_enable_error_punish = true;
51+
brpc::policy::FLAGS_auto_cl_fail_punish_ratio = 1.0;
52+
}
53+
54+
void TearDown() override {
55+
// Restore original values
56+
brpc::policy::FLAGS_auto_cl_sample_window_size_ms = orig_sample_window_size_ms_;
57+
brpc::policy::FLAGS_auto_cl_min_sample_count = orig_min_sample_count_;
58+
brpc::policy::FLAGS_auto_cl_max_sample_count = orig_max_sample_count_;
59+
brpc::policy::FLAGS_auto_cl_enable_error_punish = orig_enable_error_punish_;
60+
brpc::policy::FLAGS_auto_cl_fail_punish_ratio = orig_fail_punish_ratio_;
61+
brpc::policy::FLAGS_auto_cl_error_rate_punish_threshold = orig_error_rate_threshold_;
62+
}
63+
64+
private:
65+
int32_t orig_sample_window_size_ms_;
66+
int32_t orig_min_sample_count_;
67+
int32_t orig_max_sample_count_;
68+
bool orig_enable_error_punish_;
69+
double orig_fail_punish_ratio_;
70+
double orig_error_rate_threshold_;
71+
};
72+
73+
// Helper function to add samples and trigger window completion
74+
// Uses synthetic timestamps instead of sleeping for faster, deterministic tests.
75+
// The final successful sample is used as the trigger, so actual counts match
76+
// succ_count/fail_count exactly (preserving intended error rates).
77+
void AddSamplesAndTriggerWindow(brpc::policy::AutoConcurrencyLimiter& limiter,
78+
int succ_count, int64_t succ_latency,
79+
int fail_count, int64_t fail_latency) {
80+
ASSERT_GT(succ_count, 0) << "Need at least 1 success to trigger window";
81+
int64_t now = butil::gettimeofday_us();
82+
83+
// Add successful samples (reserve one for the trigger)
84+
for (int i = 0; i < succ_count - 1; ++i) {
85+
limiter.AddSample(0, succ_latency, now);
86+
}
87+
// Add failed samples
88+
for (int i = 0; i < fail_count; ++i) {
89+
limiter.AddSample(1, fail_latency, now);
90+
}
91+
92+
// Advance timestamp past window expiry instead of sleeping
93+
int64_t after_window = now + brpc::policy::FLAGS_auto_cl_sample_window_size_ms * 1000 + 1000;
94+
95+
// Use the final success sample to trigger window submission
96+
limiter.AddSample(0, succ_latency, after_window);
97+
}
98+
99+
// Test 1: Backward compatibility - threshold=0 preserves original punishment behavior
100+
TEST_F(AutoConcurrencyLimiterTest, ThresholdZeroPreservesOriginalBehavior) {
101+
brpc::policy::FLAGS_auto_cl_error_rate_punish_threshold = 0;
102+
brpc::policy::FLAGS_auto_cl_sample_window_size_ms = 10;
103+
104+
brpc::policy::AutoConcurrencyLimiter limiter;
105+
AddSamplesAndTriggerWindow(limiter, 90, 100, 10, 1000);
106+
107+
// 10% error rate, threshold=0 means full punishment applied
108+
// avg_latency = ceil((10*1000 + 90*100) / 90) = ceil(211.1) = 212us
109+
ASSERT_GT(limiter._min_latency_us, 180);
110+
ASSERT_LT(limiter._min_latency_us, 250);
111+
}
112+
113+
// Test 2: Dead zone - error rate below threshold produces zero punishment
114+
TEST_F(AutoConcurrencyLimiterTest, BelowThresholdZeroPunishment) {
115+
brpc::policy::FLAGS_auto_cl_error_rate_punish_threshold = 0.2; // 20% threshold
116+
brpc::policy::FLAGS_auto_cl_sample_window_size_ms = 10;
117+
118+
brpc::policy::AutoConcurrencyLimiter limiter;
119+
AddSamplesAndTriggerWindow(limiter, 90, 100, 10, 1000);
120+
121+
// 10% error rate < 20% threshold, punishment should be zero
122+
// avg_latency = 90*100 / 90 = 100us (no inflation)
123+
ASSERT_GT(limiter._min_latency_us, 80);
124+
ASSERT_LT(limiter._min_latency_us, 130);
125+
}
126+
127+
// Test 3: Boundary - error rate exactly at threshold produces zero punishment
128+
TEST_F(AutoConcurrencyLimiterTest, ExactlyAtThresholdZeroPunishment) {
129+
brpc::policy::FLAGS_auto_cl_error_rate_punish_threshold = 0.1; // 10% threshold
130+
brpc::policy::FLAGS_auto_cl_sample_window_size_ms = 10;
131+
132+
brpc::policy::AutoConcurrencyLimiter limiter;
133+
AddSamplesAndTriggerWindow(limiter, 90, 100, 10, 1000);
134+
135+
// 10% error rate == 10% threshold, punishment should be zero
136+
// avg_latency = 90*100 / 90 = 100us
137+
ASSERT_GT(limiter._min_latency_us, 80);
138+
ASSERT_LT(limiter._min_latency_us, 130);
139+
}
140+
141+
// Test 4: Linear scaling - above threshold, punishment scales proportionally
142+
TEST_F(AutoConcurrencyLimiterTest, AboveThresholdLinearScaling) {
143+
brpc::policy::FLAGS_auto_cl_error_rate_punish_threshold = 0.1; // 10% threshold
144+
brpc::policy::FLAGS_auto_cl_sample_window_size_ms = 10;
145+
146+
// Case A: 50% error rate
147+
// punish_factor = (0.5 - 0.1) / (1.0 - 0.1) = 4/9 ≈ 0.444
148+
// failed_punish = 50 * 1000 * (4/9) = 22222.2us
149+
// avg_latency = ceil((22222.2 + 50*100) / 50) = ceil(544.4) = 545us
150+
{
151+
brpc::policy::AutoConcurrencyLimiter limiter;
152+
AddSamplesAndTriggerWindow(limiter, 50, 100, 50, 1000);
153+
ASSERT_GT(limiter._min_latency_us, 450);
154+
ASSERT_LT(limiter._min_latency_us, 650);
155+
}
156+
157+
// Case B: 90% error rate (near full punishment)
158+
// punish_factor = (0.9 - 0.1) / (1.0 - 0.1) = 8/9 ≈ 0.889
159+
// failed_punish = 90 * 1000 * (8/9) = 80000us
160+
// avg_latency = ceil((80000 + 10*100) / 10) = ceil(8100) = 8100us
161+
{
162+
brpc::policy::AutoConcurrencyLimiter limiter;
163+
AddSamplesAndTriggerWindow(limiter, 10, 100, 90, 1000);
164+
ASSERT_GT(limiter._min_latency_us, 7000);
165+
ASSERT_LT(limiter._min_latency_us, 9000);
166+
}
167+
}
168+

0 commit comments

Comments
 (0)