Skip to content

Commit 2b9a1a5

Browse files
authored
[Bug](pipeline) fix wake up early without terminate call (#61679)
``` Thread A (正在执行 HashJoin Build Task) Thread B (下游 pipeline 全部完成) ──────────────────────────────────────── ────────────────────────────────── Defer 开始执行: line 475: 读取 _wake_up_early → false decrement_running_task() 触发 make_all_runnable(): line 127: set_wake_up_early() → true line 132: terminate() → finish_dep.set_always_ready() line 481: else if (_eos && !_spilling && !_is_pending_finish()) _is_pending_finish() = false ← 因为 always_ready! line 483: *done = true ← 注意: _sink->terminate() 从未被调用! close_task(): task->close(OK): ``` This pull request addresses a subtle race condition in the pipeline task execution logic and adds a targeted test to verify the fix. The main improvement ensures that operator termination is reliably triggered even in the presence of concurrent state changes, preventing operators from being left in an inconsistent state. Additionally, the pull request introduces a debug point for precise testing and includes minor test code cleanups. **Race condition fix and test coverage:** * Fixed a race condition in `PipelineTask::execute()` by reordering the logic to ensure `terminate()` is always called if required, even when another thread updates task state between checks. Added a debug point to simulate the race for testing. * Added a new test `TEST_TERMINATE_RACE_FIX` in `pipeline_task_test.cpp` that uses the debug point to reliably reproduce and verify the race condition fix, ensuring operator termination is not skipped. **Test infrastructure and cleanup:** * Included `debug_points.h` and `common/config.h` in `pipeline_task_test.cpp` to support debug point injection and configuration toggling for the new test. [[1]](diffhunk://#diff-262afd1bf43b83333335fec0b00b65ab0b0241315fd3ceb98c5b3d568971052fR21) [[2]](diffhunk://#diff-262afd1bf43b83333335fec0b00b65ab0b0241315fd3ceb98c5b3d568971052fR36) * Minor formatting cleanup in an existing test case for readability.
1 parent 9d31993 commit 2b9a1a5

File tree

2 files changed

+120
-3
lines changed

2 files changed

+120
-3
lines changed

be/src/exec/pipeline/pipeline_task.cpp

Lines changed: 29 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -473,15 +473,41 @@ Status PipelineTask::execute(bool* done) {
473473

474474
// If task is woke up early, we should terminate all operators, and this task could be closed immediately.
475475
if (_wake_up_early) {
476-
terminate();
477-
THROW_IF_ERROR(_root->terminate(_state));
478-
THROW_IF_ERROR(_sink->terminate(_state));
479476
_eos = true;
480477
*done = true;
481478
} else if (_eos && !_spilling &&
482479
(fragment_context->is_canceled() || !_is_pending_finish())) {
480+
// Debug point for testing the race condition fix: inject set_wake_up_early() +
481+
// terminate() here to simulate Thread B writing A then B between Thread A's two
482+
// reads of _wake_up_early.
483+
DBUG_EXECUTE_IF("PipelineTask::execute.wake_up_early_in_else_if", {
484+
set_wake_up_early();
485+
terminate();
486+
});
483487
*done = true;
484488
}
489+
490+
// NOTE: The terminate() call is intentionally placed AFTER the _is_pending_finish() check
491+
// above, not before. This ordering is critical to avoid a race condition:
492+
//
493+
// Pipeline::make_all_runnable() writes in this order:
494+
// (A) set_wake_up_early() -> (B) terminate() [sets finish_dep._always_ready]
495+
//
496+
// If we checked _wake_up_early (A) before _is_pending_finish() (B), there would be a
497+
// window where Thread A reads _wake_up_early=false, then Thread B writes both A and B,
498+
// then Thread A reads _is_pending_finish()=false (due to _always_ready). Thread A would
499+
// then set *done=true without ever calling operator terminate(), causing close() to run
500+
// on operators that were never properly terminated (e.g. RuntimeFilterProducer still in
501+
// WAITING_FOR_SYNCED_SIZE state when insert() is called).
502+
//
503+
// By reading _is_pending_finish() (B) before the second read of _wake_up_early (A),
504+
// if Thread A observes B's effect (_always_ready=true), it is guaranteed to also observe
505+
// A's effect (_wake_up_early=true) on this second read, ensuring terminate() is called.
506+
if (_wake_up_early) {
507+
terminate();
508+
THROW_IF_ERROR(_root->terminate(_state));
509+
THROW_IF_ERROR(_sink->terminate(_state));
510+
}
485511
}};
486512
const auto query_id = _state->query_id();
487513
// If this task is already EOS and block is empty (which means we already output all blocks),

be/test/exec/pipeline/pipeline_task_test.cpp

Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
#include <glog/logging.h>
1919
#include <gtest/gtest.h>
2020

21+
#include "common/config.h"
2122
#include "common/status.h"
2223
#include "exec/operator/operator.h"
2324
#include "exec/operator/spill_utils.h"
@@ -32,6 +33,7 @@
3233
#include "testutil/mock/mock_runtime_state.h"
3334
#include "testutil/mock/mock_thread_mem_tracker_mgr.h"
3435
#include "testutil/mock/mock_workload_group_mgr.h"
36+
#include "util/debug_points.h"
3537

3638
namespace doris {
3739

@@ -1534,4 +1536,93 @@ TEST_F(PipelineTaskTest, TEST_REVOKE_MEMORY) {
15341536
}
15351537
}
15361538

1539+
// Test for the race condition fix between _wake_up_early and _is_pending_finish().
1540+
//
1541+
// The race: Pipeline::make_all_runnable() writes in order (A) set_wake_up_early -> (B) terminate()
1542+
// [sets finish_dep._always_ready]. In execute()'s Defer block, if Thread A reads _wake_up_early=false
1543+
// (A), then Thread B writes A and B, then Thread A reads _is_pending_finish()=false (due to
1544+
// _always_ready from B), Thread A would set *done=true without calling operator terminate().
1545+
//
1546+
// The fix: terminate() is called after _is_pending_finish() in the Defer. So if Thread A sees B's
1547+
// effect (_always_ready=true), it must also see A's effect (_wake_up_early=true) on the subsequent
1548+
// read, ensuring terminate() is always called.
1549+
//
1550+
// This test uses a debug point injected into the else-if branch to simulate the exact bad timing:
1551+
// the debug point fires set_wake_up_early() + terminate() after _is_pending_finish() returns false
1552+
// (due to finish_dep being naturally unblocked) but before the second _wake_up_early check.
1553+
TEST_F(PipelineTaskTest, TEST_TERMINATE_RACE_FIX) {
1554+
auto num_instances = 1;
1555+
auto pip_id = 0;
1556+
auto task_id = 0;
1557+
auto pip = std::make_shared<Pipeline>(pip_id, num_instances, num_instances);
1558+
{
1559+
OperatorPtr source_op;
1560+
source_op.reset(new DummyOperator());
1561+
EXPECT_TRUE(pip->add_operator(source_op, num_instances).ok());
1562+
1563+
int op_id = 1;
1564+
int node_id = 2;
1565+
int dest_id = 3;
1566+
DataSinkOperatorPtr sink_op;
1567+
sink_op.reset(new DummySinkOperatorX(op_id, node_id, dest_id));
1568+
EXPECT_TRUE(pip->set_sink(sink_op).ok());
1569+
}
1570+
auto profile = std::make_shared<RuntimeProfile>("Pipeline : " + std::to_string(pip_id));
1571+
std::map<int,
1572+
std::pair<std::shared_ptr<BasicSharedState>, std::vector<std::shared_ptr<Dependency>>>>
1573+
shared_state_map;
1574+
_runtime_state->resize_op_id_to_local_state(-1);
1575+
auto task = std::make_shared<PipelineTask>(pip, task_id, _runtime_state.get(), _context,
1576+
profile.get(), shared_state_map, task_id);
1577+
task->_exec_time_slice = 10'000'000'000ULL;
1578+
{
1579+
std::vector<TScanRangeParams> scan_range;
1580+
int sender_id = 0;
1581+
TDataSink tsink;
1582+
EXPECT_TRUE(task->prepare(scan_range, sender_id, tsink).ok());
1583+
}
1584+
_query_ctx->get_execution_dependency()->set_ready();
1585+
1586+
// Get the sink's finish dependency and block it to simulate a pending async operation
1587+
// (e.g. runtime filter size sync RPC in flight).
1588+
auto* sink_finish_dep =
1589+
_runtime_state->get_sink_local_state()->cast<DummySinkLocalState>().finishdependency();
1590+
EXPECT_NE(sink_finish_dep, nullptr);
1591+
sink_finish_dep->block();
1592+
1593+
// Drive the task to EOS so it will enter the Defer's pending-finish check.
1594+
task->_operators.front()->cast<DummyOperator>()._eos = true;
1595+
{
1596+
bool done = false;
1597+
EXPECT_TRUE(task->execute(&done).ok());
1598+
// EOS reached but still blocked on finish dependency: not done yet.
1599+
EXPECT_TRUE(task->_eos);
1600+
EXPECT_FALSE(done);
1601+
EXPECT_FALSE(task->_wake_up_early);
1602+
}
1603+
1604+
// Now unblock the finish dependency (simulates the async op completing) and activate the
1605+
// debug point. The debug point fires inside the else-if branch — after _is_pending_finish()
1606+
// returns false but before the second _wake_up_early read — and calls set_wake_up_early() +
1607+
// terminate(). This precisely reproduces the race where Thread B's writes land between
1608+
// Thread A's two reads of _wake_up_early.
1609+
sink_finish_dep->set_ready();
1610+
config::enable_debug_points = true;
1611+
DebugPoints::instance()->add("PipelineTask::execute.wake_up_early_in_else_if");
1612+
{
1613+
bool done = false;
1614+
EXPECT_TRUE(task->execute(&done).ok());
1615+
EXPECT_TRUE(task->_eos);
1616+
EXPECT_TRUE(done);
1617+
// The key assertion: even though the task took the else-if path (not the
1618+
// if(_wake_up_early) path), operator terminate() must have been called because the
1619+
// second read of _wake_up_early correctly observed the value set by the debug point.
1620+
EXPECT_TRUE(task->_wake_up_early);
1621+
EXPECT_TRUE(task->_operators.front()->cast<DummyOperator>()._terminated);
1622+
EXPECT_TRUE(task->_sink->cast<DummySinkOperatorX>()._terminated);
1623+
}
1624+
DebugPoints::instance()->clear();
1625+
config::enable_debug_points = false;
1626+
}
1627+
15371628
} // namespace doris

0 commit comments

Comments
 (0)