Skip to content

Commit 0ec948b

Browse files
authored
Merge pull request apache#3222 from wayslog/feat/redis-cluster-channel
feat(redis): add native Redis Cluster channel support
2 parents 6a12915 + 7363d31 commit 0ec948b

File tree

9 files changed

+3246
-2
lines changed

9 files changed

+3246
-2
lines changed

docs/cn/redis_client.md

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,55 @@ response中的所有reply的ownership属于response。当response析构时,rep
161161
162162
或者你可以沿用常见的[twemproxy](https://github.com/twitter/twemproxy)方案。这个方案虽然需要额外部署proxy,还增加了延时,但client端仍可以像访问单点一样的访问它。
163163
164+
如果你要直接访问原生 Redis Cluster(按 slot 路由、自动处理 MOVED/ASK,并通过 `CLUSTER SLOTS`/`CLUSTER NODES` 刷新拓扑),可以使用 `brpc::RedisClusterChannel`:
165+
166+
```c++
167+
#include <brpc/redis_cluster.h>
168+
169+
brpc::RedisClusterChannel channel;
170+
brpc::RedisClusterChannelOptions options;
171+
options.max_redirect = 5;
172+
if (channel.Init("127.0.0.1:7000,127.0.0.1:7001", &options) != 0) {
173+
LOG(ERROR) << "Fail to init redis cluster channel";
174+
}
175+
```
176+
177+
`RedisClusterChannel` 支持同步/异步 `CallMethod`、自动重定向重试和周期性拓扑刷新。多 key 支持 `MGET/MSET/DEL/EXISTS/UNLINK/EVAL/EVALSHA`,暂不支持 `MULTI/EXEC`
178+
179+
## RedisClusterChannel 示例
180+
181+
`example/redis_c++/redis_cluster_client.cpp` 覆盖了以下常见场景:
182+
183+
- 通过多个 seed 节点初始化;
184+
- 自动处理 MOVED/ASK 重定向与重试;
185+
- 先走 `CLUSTER SLOTS`,失败后回退 `CLUSTER NODES`
186+
- 同一个 channel 同时执行同步 pipeline 和异步请求。
187+
188+
构建并运行:
189+
190+
```bash
191+
cd example/redis_c++
192+
make redis_cluster_client
193+
./redis_cluster_client \
194+
--seeds=127.0.0.1:7000,127.0.0.1:7001 \
195+
--max_redirect=5 \
196+
--timeout_ms=1000
197+
```
198+
199+
常用选项:
200+
201+
- `RedisClusterChannelOptions::max_redirect`:单个命令的最大重定向次数;
202+
- `RedisClusterChannelOptions::refresh_interval_s`:周期刷新拓扑的间隔;
203+
- `RedisClusterChannelOptions::topology_refresh_timeout_ms`:拓扑命令超时;
204+
- `RedisClusterChannelOptions::channel_options`:每个 redis 节点的通用 brpc 参数;
205+
- `RedisClusterChannelOptions::enable_periodic_refresh`:是否启用后台周期刷新。
206+
207+
说明:
208+
209+
- `MGET/MSET/DEL/EXISTS/UNLINK` 会按 key 分发后按请求顺序合并返回;
210+
- `EVAL/EVALSHA` 要求声明的 key 位于同一 slot;
211+
- `MULTI/EXEC` 当前会直接返回错误 reply。
212+
164213
# 查看发出的请求和收到的回复
165214

166215
打开[-redis_verbose](http://brpc.baidu.com:8765/flags/redis_verbose)即看到所有的redis request和response,注意这应该只用于线下调试,而不是线上程序。
@@ -242,6 +291,8 @@ TRACE: 02-13 18:07:42: * 0 client.cpp:180] Accessing redis server at qps=75238
242291

243292
[example/redis_c++/redis_cli](https://github.com/apache/brpc/blob/master/example/redis_c%2B%2B/redis_cli.cpp)是一个类似于官方CLI的命令行工具,以展示brpc对redis协议的处理能力。当使用brpc访问redis-server出现不符合预期的行为时,也可以使用这个CLI进行交互式的调试。
244293

294+
如果是原生 Redis Cluster 场景,可直接参考 [example/redis_c++/redis_cluster_client.cpp](https://github.com/apache/brpc/blob/master/example/redis_c%2B%2B/redis_cluster_client.cpp)
295+
245296
和官方CLI类似,`redis_cli <command>`也可以直接运行命令,-server参数可以指定redis-server的地址。
246297

247298
```

docs/en/redis_client.md

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,55 @@ Create a `Channel` using the consistent hashing as the load balancing algorithm(
162162
163163
Another choice is to use the common [twemproxy](https://github.com/twitter/twemproxy) solution, which makes clients access the cluster just like accessing a single server, although the solution needs to deploy proxies and adds more latency.
164164
165+
For native Redis Cluster (slot based routing, MOVED/ASK redirection and topology refresh from `CLUSTER SLOTS`/`CLUSTER NODES`), use `brpc::RedisClusterChannel`:
166+
167+
```c++
168+
#include <brpc/redis_cluster.h>
169+
170+
brpc::RedisClusterChannel channel;
171+
brpc::RedisClusterChannelOptions options;
172+
options.max_redirect = 5;
173+
if (channel.Init("127.0.0.1:7000,127.0.0.1:7001", &options) != 0) {
174+
LOG(ERROR) << "Fail to init redis cluster channel";
175+
}
176+
```
177+
178+
`RedisClusterChannel` supports synchronous/asynchronous `CallMethod`, automatic redirection retries and periodic topology refresh. Multi-key support includes `MGET/MSET/DEL/EXISTS/UNLINK/EVAL/EVALSHA`. `MULTI/EXEC` is currently not supported.
179+
180+
## RedisClusterChannel example
181+
182+
`example/redis_c++/redis_cluster_client.cpp` demonstrates:
183+
184+
- bootstrap from multiple seed nodes.
185+
- MOVED/ASK auto-redirection and retry.
186+
- topology refresh from `CLUSTER SLOTS` with `CLUSTER NODES` fallback.
187+
- sync pipeline and async calls using one channel.
188+
189+
Build and run:
190+
191+
```bash
192+
cd example/redis_c++
193+
make redis_cluster_client
194+
./redis_cluster_client \
195+
--seeds=127.0.0.1:7000,127.0.0.1:7001 \
196+
--max_redirect=5 \
197+
--timeout_ms=1000
198+
```
199+
200+
Frequently used options:
201+
202+
- `RedisClusterChannelOptions::max_redirect`: max redirects per command.
203+
- `RedisClusterChannelOptions::refresh_interval_s`: interval of periodic topology refresh.
204+
- `RedisClusterChannelOptions::topology_refresh_timeout_ms`: timeout for topology commands.
205+
- `RedisClusterChannelOptions::channel_options`: normal brpc channel options for each redis node.
206+
- `RedisClusterChannelOptions::enable_periodic_refresh`: disable this when your app controls refresh explicitly.
207+
208+
Notes:
209+
210+
- `MGET/MSET/DEL/EXISTS/UNLINK` are executed per key and merged in request order.
211+
- `EVAL/EVALSHA` requires all declared keys to be in one slot.
212+
- `MULTI/EXEC` returns an error reply by design.
213+
165214
# Debug
166215

167216
Turn on [-redis_verbose](http://brpc.baidu.com:8765/flags/redis_verbose) to print contents of all redis requests and responses. Note that this should only be used for debugging rather than online services.
@@ -243,6 +292,8 @@ We can see a tremendous drop of QPS compared to the one using single connection
243292

244293
[example/redis_c++/redis_cli](https://github.com/apache/brpc/blob/master/example/redis_c%2B%2B/redis_cli.cpp) is a command line tool similar to the official CLI, demostrating brpc's capability to talk with redis servers. When unexpected results are got from a redis-server using a brpc client, you can debug with this tool interactively as well.
245294

295+
For native Redis Cluster, you can start from [example/redis_c++/redis_cluster_client.cpp](https://github.com/apache/brpc/blob/master/example/redis_c%2B%2B/redis_cluster_client.cpp).
296+
246297
Like the official CLI, `redis_cli <command>` runs the command directly, and `-server` which is address of the redis-server can be specified.
247298

248299
```

example/redis_c++/CMakeLists.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -138,9 +138,11 @@ endif()
138138
add_executable(redis_cli redis_cli.cpp)
139139
add_executable(redis_press redis_press.cpp)
140140
add_executable(redis_server redis_server.cpp)
141+
add_executable(redis_cluster_client redis_cluster_client.cpp)
141142

142143
set(AUX_LIB readline ncurses)
143144

144145
target_link_libraries(redis_cli ${BRPC_LIB} ${DYNAMIC_LIB} ${AUX_LIB})
145146
target_link_libraries(redis_press ${BRPC_LIB} ${DYNAMIC_LIB})
146147
target_link_libraries(redis_server ${BRPC_LIB} ${DYNAMIC_LIB})
148+
target_link_libraries(redis_cluster_client ${BRPC_LIB} ${DYNAMIC_LIB})

example/redis_c++/Makefile

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,10 +29,12 @@ DYNAMIC_LINKINGS += -lreadline -lncurses
2929
PRESS_SOURCES = redis_press.cpp
3030
CLI_SOURCES = redis_cli.cpp
3131
SERVER_SOURCES = redis_server.cpp
32+
CLUSTER_SOURCES = redis_cluster_client.cpp
3233

3334
PRESS_OBJS = $(addsuffix .o, $(basename $(PRESS_SOURCES)))
3435
CLI_OBJS = $(addsuffix .o, $(basename $(CLI_SOURCES)))
3536
SERVER_OBJS = $(addsuffix .o, $(basename $(SERVER_SOURCES)))
37+
CLUSTER_OBJS = $(addsuffix .o, $(basename $(CLUSTER_SOURCES)))
3638

3739
ifeq ($(SYSTEM),Darwin)
3840
ifneq ("$(LINK_SO)", "")
@@ -50,12 +52,13 @@ else ifeq ($(SYSTEM),Linux)
5052
endif
5153

5254
.PHONY:all
53-
all: redis_press redis_cli redis_server
55+
all: redis_press redis_cli redis_server redis_cluster_client
5456

5557
.PHONY:clean
5658
clean:
5759
@echo "> Cleaning"
58-
rm -rf redis_press redis_cli $(PRESS_OBJS) $(CLI_OBJS) $(SERVER_OBJS)
60+
rm -rf redis_press redis_cli redis_server redis_cluster_client \
61+
$(PRESS_OBJS) $(CLI_OBJS) $(SERVER_OBJS) $(CLUSTER_OBJS)
5962

6063
redis_press:$(PRESS_OBJS)
6164
@echo "> Linking $@"
@@ -81,6 +84,14 @@ else
8184
$(CXX) $(LIBPATHS) $(LINK_OPTIONS) -o $@
8285
endif
8386

87+
redis_cluster_client:$(CLUSTER_OBJS)
88+
@echo "> Linking $@"
89+
ifneq ("$(LINK_SO)", "")
90+
$(CXX) $(LIBPATHS) $(SOPATHS) $(LINK_OPTIONS_SO) -o $@
91+
else
92+
$(CXX) $(LIBPATHS) $(LINK_OPTIONS) -o $@
93+
endif
94+
8495
%.o:%.cpp
8596
@echo "> Compiling $@"
8697
$(CXX) -c $(HDRPATHS) $(CXXFLAGS) $< -o $@
Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,119 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
// A basic client for native Redis Cluster using brpc::RedisClusterChannel.
19+
20+
#include <gflags/gflags.h>
21+
#include <bthread/countdown_event.h>
22+
#include <butil/logging.h>
23+
#include <brpc/controller.h>
24+
#include <brpc/redis.h>
25+
#include <brpc/redis_cluster.h>
26+
27+
DEFINE_string(seeds, "127.0.0.1:7000,127.0.0.1:7001",
28+
"Comma-separated redis cluster seed endpoints");
29+
DEFINE_string(key_prefix, "brpc_cluster_demo", "Prefix for demo keys");
30+
DEFINE_int32(timeout_ms, 1000, "RPC timeout in milliseconds");
31+
DEFINE_int32(rpc_max_retry, 1, "Max retries for a single sub RPC");
32+
DEFINE_int32(max_redirect, 5, "Max MOVED/ASK redirect retries");
33+
DEFINE_int32(refresh_interval_s, 30, "Periodic topology refresh interval");
34+
DEFINE_int32(topology_refresh_timeout_ms, 1000,
35+
"Timeout of CLUSTER SLOTS/NODES request");
36+
DEFINE_bool(disable_periodic_refresh, false, "Disable periodic topology refresh");
37+
38+
namespace {
39+
40+
class Done : public google::protobuf::Closure {
41+
public:
42+
explicit Done(bthread::CountdownEvent* event) : _event(event) {}
43+
void Run() override { _event->signal(); }
44+
45+
private:
46+
bthread::CountdownEvent* _event;
47+
};
48+
49+
int PrintResponse(const brpc::RedisResponse& response) {
50+
for (int i = 0; i < response.reply_size(); ++i) {
51+
const brpc::RedisReply& reply = response.reply(i);
52+
if (reply.is_error()) {
53+
LOG(ERROR) << "reply[" << i << "] error=" << reply.error_message();
54+
return -1;
55+
}
56+
LOG(INFO) << "reply[" << i << "] " << reply;
57+
}
58+
return 0;
59+
}
60+
61+
} // namespace
62+
63+
int main(int argc, char* argv[]) {
64+
GFLAGS_NAMESPACE::ParseCommandLineFlags(&argc, &argv, true);
65+
66+
brpc::RedisClusterChannelOptions options;
67+
options.max_redirect = FLAGS_max_redirect;
68+
options.refresh_interval_s = FLAGS_refresh_interval_s;
69+
options.enable_periodic_refresh = !FLAGS_disable_periodic_refresh;
70+
options.topology_refresh_timeout_ms = FLAGS_topology_refresh_timeout_ms;
71+
options.channel_options.timeout_ms = FLAGS_timeout_ms;
72+
options.channel_options.max_retry = FLAGS_rpc_max_retry;
73+
74+
brpc::RedisClusterChannel channel;
75+
if (channel.Init(FLAGS_seeds, &options) != 0) {
76+
LOG(ERROR) << "Fail to init redis cluster channel, seeds=" << FLAGS_seeds;
77+
return -1;
78+
}
79+
80+
const std::string key1 = FLAGS_key_prefix + "_1";
81+
const std::string key2 = FLAGS_key_prefix + "_2";
82+
83+
// Sync pipeline.
84+
brpc::RedisRequest request;
85+
brpc::RedisResponse response;
86+
brpc::Controller cntl;
87+
CHECK(request.AddCommand("set %s v1", key1.c_str()));
88+
CHECK(request.AddCommand("set %s v2", key2.c_str()));
89+
CHECK(request.AddCommand("mget %s %s", key1.c_str(), key2.c_str()));
90+
channel.CallMethod(NULL, &cntl, &request, &response, NULL);
91+
if (cntl.Failed()) {
92+
LOG(ERROR) << "Sync call failed: " << cntl.ErrorText();
93+
return -1;
94+
}
95+
if (PrintResponse(response) != 0) {
96+
return -1;
97+
}
98+
99+
// Async single request.
100+
brpc::RedisRequest async_request;
101+
brpc::RedisResponse async_response;
102+
brpc::Controller async_cntl;
103+
CHECK(async_request.AddCommand("get %s", key1.c_str()));
104+
105+
bthread::CountdownEvent event(1);
106+
Done done(&event);
107+
channel.CallMethod(NULL, &async_cntl, &async_request, &async_response, &done);
108+
event.wait();
109+
if (async_cntl.Failed()) {
110+
LOG(ERROR) << "Async call failed: " << async_cntl.ErrorText();
111+
return -1;
112+
}
113+
if (PrintResponse(async_response) != 0) {
114+
return -1;
115+
}
116+
117+
LOG(INFO) << "Redis cluster demo finished";
118+
return 0;
119+
}

0 commit comments

Comments
 (0)