Skip to content

Commit 3d14801

Browse files
committed
[fix](rpc) Fix AutoReleaseClosure ordering: log errors before invoking callback
The callback's call() method may reuse the callback object (e.g., in vdata_stream_sender.h get_send_callback()), triggering a new RPC that mutates response_ and cntl_. If AutoReleaseClosure::Run() invokes call() before checking cntl_->Failed() or response_->status(), it reads the NEW RPC's state instead of the ORIGINAL RPC's result, causing:
1 parent 611cd8e commit 3d14801

File tree

9 files changed

+199
-103
lines changed

9 files changed

+199
-103
lines changed

be/src/exec/exchange/vdata_stream_sender.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -164,6 +164,7 @@ class Channel {
164164

165165
std::shared_ptr<ExchangeSendCallback<PTransmitDataResult>> get_send_callback(RpcInstance* ins,
166166
bool eos) {
167+
// here we reuse the callback because it's re-construction may be expensive due to many parameters' capture
167168
if (!_send_callback) {
168169
_send_callback = ExchangeSendCallback<PTransmitDataResult>::create_shared();
169170
} else {

be/src/exec/operator/exchange_sink_buffer.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -347,6 +347,7 @@ Status ExchangeSinkBuffer::_send_rpc(RpcInstance& instance_data) {
347347
}
348348
// The eos here only indicates that the current exchange sink has reached eos.
349349
// However, the queue still contains data from other exchange sinks, so RPCs need to continue being sent.
350+
// `_send_rpc` must be the LAST operation in this function, because it may reuse the callback!
350351
s = _send_rpc(ins);
351352
if (!s) {
352353
_failed(ins.id,
@@ -473,9 +474,9 @@ Status ExchangeSinkBuffer::_send_rpc(RpcInstance& instance_data) {
473474
} else if (eos) {
474475
_ended(ins);
475476
}
476-
477477
// The eos here only indicates that the current exchange sink has reached eos.
478478
// However, the queue still contains data from other exchange sinks, so RPCs need to continue being sent.
479+
// `_send_rpc` must be the LAST operation in this function, because it may reuse the callback!
479480
s = _send_rpc(ins);
480481
if (!s) {
481482
_failed(ins.id,

be/src/exec/runtime_filter/runtime_filter.cpp

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -38,10 +38,7 @@ Status RuntimeFilter::_push_to_remote(RuntimeState* state, const TNetworkAddress
3838
auto merge_filter_callback = DummyBrpcCallback<PMergeFilterResponse>::create_shared();
3939
auto merge_filter_closure =
4040
AutoReleaseClosure<PMergeFilterRequest, DummyBrpcCallback<PMergeFilterResponse>>::
41-
create_unique(merge_filter_request, merge_filter_callback,
42-
state->query_options().ignore_runtime_filter_error
43-
? std::weak_ptr<QueryContext> {}
44-
: state->get_query_ctx_weak());
41+
create_unique(merge_filter_request, merge_filter_callback);
4542
void* data = nullptr;
4643
int len = 0;
4744

be/src/exec/runtime_filter/runtime_filter_mgr.cpp

Lines changed: 4 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -241,8 +241,6 @@ Status RuntimeFilterMergeControllerEntity::send_filter_size(std::shared_ptr<Quer
241241
Status st = Status::OK();
242242
// After all runtime filters' size are collected, we should send response to all producers.
243243
if (cnt_val.merger->add_rf_size(request->filter_size())) {
244-
auto ctx = query_ctx->ignore_runtime_filter_error() ? std::weak_ptr<QueryContext> {}
245-
: query_ctx;
246244
for (auto addr : cnt_val.source_addrs) {
247245
std::shared_ptr<PBackendService_Stub> stub(
248246
ExecEnv::GetInstance()->brpc_internal_client_cache()->get_client(addr));
@@ -256,7 +254,7 @@ Status RuntimeFilterMergeControllerEntity::send_filter_size(std::shared_ptr<Quer
256254
auto closure = AutoReleaseClosure<PSyncFilterSizeRequest,
257255
DummyBrpcCallback<PSyncFilterSizeResponse>>::
258256
create_unique(std::make_shared<PSyncFilterSizeRequest>(),
259-
DummyBrpcCallback<PSyncFilterSizeResponse>::create_shared(), ctx);
257+
DummyBrpcCallback<PSyncFilterSizeResponse>::create_shared());
260258

261259
auto* pquery_id = closure->request_->mutable_query_id();
262260
pquery_id->set_hi(query_ctx->query_id().hi);
@@ -343,17 +341,13 @@ Status RuntimeFilterMergeControllerEntity::merge(std::shared_ptr<QueryContext> q
343341
}
344342

345343
if (is_ready) {
346-
return _send_rf_to_target(cnt_val,
347-
query_ctx->ignore_runtime_filter_error()
348-
? std::weak_ptr<QueryContext> {}
349-
: query_ctx,
350-
merge_time, request->query_id(), query_ctx->execution_timeout());
344+
return _send_rf_to_target(cnt_val, merge_time, request->query_id(),
345+
query_ctx->execution_timeout());
351346
}
352347
return Status::OK();
353348
}
354349

355350
Status RuntimeFilterMergeControllerEntity::_send_rf_to_target(GlobalMergeContext& cnt_val,
356-
std::weak_ptr<QueryContext> ctx,
357351
int64_t merge_time,
358352
PUniqueId query_id,
359353
int execution_timeout) {
@@ -394,7 +388,7 @@ Status RuntimeFilterMergeControllerEntity::_send_rf_to_target(GlobalMergeContext
394388
auto closure = AutoReleaseClosure<PPublishFilterRequestV2,
395389
DummyBrpcCallback<PPublishFilterResponse>>::
396390
create_unique(std::make_shared<PPublishFilterRequestV2>(apply_request),
397-
DummyBrpcCallback<PPublishFilterResponse>::create_shared(), ctx);
391+
DummyBrpcCallback<PPublishFilterResponse>::create_shared());
398392

399393
closure->request_->set_merge_time(merge_time);
400394
*closure->request_->mutable_query_id() = query_id;

be/src/exec/runtime_filter/runtime_filter_mgr.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -173,8 +173,8 @@ class RuntimeFilterMergeControllerEntity {
173173
const std::vector<TRuntimeFilterTargetParamsV2>&& target_info,
174174
const int producer_size);
175175

176-
Status _send_rf_to_target(GlobalMergeContext& cnt_val, std::weak_ptr<QueryContext> ctx,
177-
int64_t merge_time, PUniqueId query_id, int execution_timeout);
176+
Status _send_rf_to_target(GlobalMergeContext& cnt_val, int64_t merge_time, PUniqueId query_id,
177+
int execution_timeout);
178178

179179
// protect _filter_map
180180
std::shared_mutex _filter_map_mutex;

be/src/exec/runtime_filter/runtime_filter_producer.cpp

Lines changed: 36 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -94,51 +94,43 @@ Status RuntimeFilterProducer::publish(RuntimeState* state, bool build_hash_table
9494
return Status::OK();
9595
}
9696

97-
class SyncSizeClosure : public AutoReleaseClosure<PSendFilterSizeRequest,
98-
DummyBrpcCallback<PSendFilterSizeResponse>> {
99-
std::shared_ptr<Dependency> _dependency;
100-
// Should use weak ptr here, because when query context deconstructs, should also delete runtime filter
101-
// context, it not the memory is not released. And rpc is in another thread, it will hold rf context
102-
// after query context because the rpc is not returned.
103-
std::weak_ptr<RuntimeFilterWrapper> _wrapper;
104-
using Base =
105-
AutoReleaseClosure<PSendFilterSizeRequest, DummyBrpcCallback<PSendFilterSizeResponse>>;
106-
friend class RuntimeFilterProducer;
107-
ENABLE_FACTORY_CREATOR(SyncSizeClosure);
108-
109-
void _process_if_rpc_failed() override {
110-
Defer defer {[&]() {
111-
Base::_process_if_rpc_failed();
97+
// Callback for sync-size RPCs. Handles errors (disable wrapper + sub dependency) in call().
98+
class SyncSizeCallback : public DummyBrpcCallback<PSendFilterSizeResponse> {
99+
ENABLE_FACTORY_CREATOR(SyncSizeCallback);
100+
101+
public:
102+
SyncSizeCallback(std::shared_ptr<Dependency> dependency,
103+
std::shared_ptr<RuntimeFilterWrapper> wrapper)
104+
: _dependency(std::move(dependency)), _wrapper(wrapper) {}
105+
106+
void call() override {
107+
// On error: disable the wrapper and sub the dependency here, because set_synced_size()
108+
// will never be called (the merge node won't respond with a sync).
109+
// On success: do NOT sub here. The merge node will respond with sync_filter_size,
110+
// which calls set_synced_size() -> _dependency->sub().
111+
if (cntl_->Failed()) {
112+
if (auto w = _wrapper.lock()) {
113+
w->set_state(RuntimeFilterWrapper::State::DISABLED, cntl_->ErrorText());
114+
}
112115
((CountedFinishDependency*)_dependency.get())->sub();
113-
}};
114-
auto wrapper = _wrapper.lock();
115-
if (!wrapper) {
116116
return;
117117
}
118118

119-
wrapper->set_state(RuntimeFilterWrapper::State::DISABLED, cntl_->ErrorText());
120-
}
121-
122-
void _process_if_meet_error_status(const Status& status) override {
123-
Defer defer {[&]() {
124-
Base::_process_if_meet_error_status(status);
119+
Status status = Status::create(response_->status());
120+
if (!status.ok()) {
121+
if (auto w = _wrapper.lock()) {
122+
w->set_state(RuntimeFilterWrapper::State::DISABLED, status.to_string());
123+
}
125124
((CountedFinishDependency*)_dependency.get())->sub();
126-
}};
127-
auto wrapper = _wrapper.lock();
128-
if (!wrapper) {
129-
return;
130125
}
131-
132-
wrapper->set_state(RuntimeFilterWrapper::State::DISABLED, status.to_string());
133126
}
134127

135-
public:
136-
SyncSizeClosure(std::shared_ptr<PSendFilterSizeRequest> req,
137-
std::shared_ptr<DummyBrpcCallback<PSendFilterSizeResponse>> callback,
138-
std::shared_ptr<Dependency> dependency,
139-
std::shared_ptr<RuntimeFilterWrapper> wrapper,
140-
std::weak_ptr<QueryContext> context)
141-
: Base(req, callback, context), _dependency(std::move(dependency)), _wrapper(wrapper) {}
128+
private:
129+
std::shared_ptr<Dependency> _dependency;
130+
// Should use weak ptr here, because when query context deconstructs, should also delete runtime filter
131+
// context, it not the memory is not released. And rpc is in another thread, it will hold rf context
132+
// after query context because the rpc is not returned.
133+
std::weak_ptr<RuntimeFilterWrapper> _wrapper;
142134
};
143135

144136
void RuntimeFilterProducer::latch_dependency(
@@ -199,13 +191,15 @@ Status RuntimeFilterProducer::send_size(RuntimeState* state, uint64_t local_filt
199191
}
200192

201193
auto request = std::make_shared<PSendFilterSizeRequest>();
202-
auto callback = DummyBrpcCallback<PSendFilterSizeResponse>::create_shared();
194+
auto callback = SyncSizeCallback::create_shared(_dependency, _wrapper);
195+
// Store callback in the producer to keep it alive until the RPC completes.
196+
// AutoReleaseClosure holds callbacks via weak_ptr, so without this the callback
197+
// would be destroyed when this function returns and error-path sub() would never fire.
198+
_sync_size_callback = callback;
203199
// RuntimeFilter maybe deconstructed before the rpc finished, so that could not use
204200
// a raw pointer in closure. Has to use the context's shared ptr.
205-
auto closure = SyncSizeClosure::create_unique(request, callback, _dependency, _wrapper,
206-
state->query_options().ignore_runtime_filter_error
207-
? std::weak_ptr<QueryContext> {}
208-
: state->get_query_ctx_weak());
201+
auto closure = AutoReleaseClosure<PSendFilterSizeRequest, SyncSizeCallback>::create_unique(
202+
request, callback);
209203
auto* pquery_id = request->mutable_query_id();
210204
pquery_id->set_hi(state->get_query_ctx()->query_id().hi);
211205
pquery_id->set_lo(state->get_query_ctx()->query_id().lo);

be/src/exec/runtime_filter/runtime_filter_producer.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -181,6 +181,11 @@ class RuntimeFilterProducer : public RuntimeFilter {
181181

182182
int64_t _synced_size = -1;
183183
std::shared_ptr<CountedFinishDependency> _dependency;
184+
// Holds the SyncSizeCallback alive until the send_filter_size RPC completes.
185+
// AutoReleaseClosure stores callbacks via weak_ptr, so without this the callback
186+
// would be destroyed when send_size() returns, and error-path sub() would never fire.
187+
// Type-erased because the callback type is defined in the .cpp file.
188+
std::shared_ptr<void> _sync_size_callback;
184189

185190
std::atomic<State> _rf_state;
186191
};

be/src/util/brpc_closure.h

Lines changed: 14 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,8 @@
1919

2020
#include <google/protobuf/stubs/common.h>
2121

22-
#include <atomic>
2322
#include <utility>
2423

25-
#include "runtime/query_context.h"
2624
#include "runtime/thread_context.h"
2725
#include "service/brpc.h"
2826

@@ -84,9 +82,8 @@ class AutoReleaseClosure : public google::protobuf::Closure {
8482
ENABLE_FACTORY_CREATOR(AutoReleaseClosure);
8583

8684
public:
87-
AutoReleaseClosure(std::shared_ptr<Request> req, std::shared_ptr<Callback> callback,
88-
std::weak_ptr<QueryContext> context = {}, std::string_view error_msg = {})
89-
: request_(req), callback_(callback), context_(std::move(context)) {
85+
AutoReleaseClosure(std::shared_ptr<Request> req, std::shared_ptr<Callback> callback)
86+
: request_(req), callback_(callback) {
9087
this->cntl_ = callback->cntl_;
9188
this->response_ = callback->response_;
9289
}
@@ -96,15 +93,15 @@ class AutoReleaseClosure : public google::protobuf::Closure {
9693
// Will delete itself
9794
void Run() override {
9895
Defer defer {[&]() { delete this; }};
99-
// If lock failed, it means the callback object is deconstructed, then no need
100-
// to deal with the callback any more.
101-
if (auto tmp = callback_.lock()) {
102-
tmp->call();
103-
}
96+
// shouldn't do heavy work here. all heavy work should be done in callback's call() (which means in success/failure handlers)
10497
if (cntl_->Failed()) {
105-
_process_if_rpc_failed();
98+
LOG(WARNING) << "brpc failed: " << cntl_->ErrorText();
10699
} else {
107-
_process_status<ResponseType>(response_.get());
100+
_log_error_status<ResponseType>(response_.get());
101+
}
102+
// this must be the LAST operation in this function, because call() may reuse the callback! (response_ is in callback_)
103+
if (auto tmp = callback_.lock()) {
104+
tmp->call();
108105
}
109106
}
110107

@@ -116,45 +113,18 @@ class AutoReleaseClosure : public google::protobuf::Closure {
116113
// at any stage.
117114
std::shared_ptr<Request> request_;
118115
std::shared_ptr<ResponseType> response_;
119-
std::string error_msg_;
120-
121-
protected:
122-
virtual void _process_if_rpc_failed() {
123-
std::string error_msg =
124-
fmt::format("RPC meet failed: {} {}", cntl_->ErrorText(), error_msg_);
125-
if (auto ctx = context_.lock(); ctx) {
126-
ctx->cancel(Status::NetworkError(error_msg));
127-
} else {
128-
LOG(WARNING) << error_msg;
129-
}
130-
}
131-
132-
virtual void _process_if_meet_error_status(const Status& status) {
133-
if (status.is<ErrorCode::END_OF_FILE>()) {
134-
// no need to log END_OF_FILE, reduce the unlessful log
135-
return;
136-
}
137-
if (auto ctx = context_.lock(); ctx) {
138-
ctx->cancel(status);
139-
} else {
140-
LOG(WARNING) << "RPC meet error status: " << status;
141-
}
142-
}
143116

144117
private:
145-
template <typename Response>
146-
void _process_status(Response* response) {}
147-
148118
template <HasStatus Response>
149-
void _process_status(Response* response) {
119+
void _log_error_status(Response* response) {
150120
if (Status status = Status::create(response->status()); !status.ok()) {
151-
_process_if_meet_error_status(status);
121+
if (!status.is<ErrorCode::END_OF_FILE>()) {
122+
LOG(WARNING) << "RPC meet error status: " << status;
123+
}
152124
}
153125
}
154-
// Use a weak ptr to keep the callback, so that the callback can be deleted if the main
155-
// thread is freed.
126+
// Use a weak ptr to keep the callback, so that the callback can be deleted if the main thread is freed.
156127
Weak callback_;
157-
std::weak_ptr<QueryContext> context_;
158128
};
159129

160130
} // namespace doris

0 commit comments

Comments
 (0)