Skip to content

Commit 6c5b9ce

Browse files
committed
[Fix](pyudf) only select alive python servers in get_process
1 parent 203372f commit 6c5b9ce

8 files changed

Lines changed: 236 additions & 10 deletions

File tree

be/src/udf/python/python_server.cpp

Lines changed: 46 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -125,14 +125,53 @@ Status PythonServerManager::get_process(const PythonVersion& version, ProcessPtr
125125
version.to_string());
126126
}
127127

128-
// Find process with minimum load (use_count - 1 gives active client count)
129-
auto min_iter = std::min_element(
130-
pool.begin(), pool.end(),
131-
[](const ProcessPtr& a, const ProcessPtr& b) { return a.use_count() < b.use_count(); });
128+
// Prefer an already-alive process and only use load balancing inside that alive subset.
129+
// keep dead entries stay in the pool for the background health checker
130+
// unless there is no alive process left for the current request.
131+
auto min_alive_iter =
132+
std::min_element(pool.begin(), pool.end(), [](const ProcessPtr& a, const ProcessPtr& b) {
133+
const bool a_alive = a && a->is_alive();
134+
const bool b_alive = b && b->is_alive();
135+
if (a_alive != b_alive) {
136+
return a_alive > b_alive;
137+
}
138+
if (!a_alive) {
139+
return false;
140+
}
141+
return a.use_count() < b.use_count();
142+
});
132143

133-
// Return process with minimum load
134-
*process = *min_iter;
135-
return Status::OK();
144+
if (min_alive_iter != pool.end() && *min_alive_iter && (*min_alive_iter)->is_alive()) {
145+
*process = *min_alive_iter;
146+
return Status::OK();
147+
}
148+
149+
// Only reach here when the pool has no alive process at all. In that fallback path we
150+
// rebuild one slot synchronously so the caller can still make progress instead of waiting
151+
// for the next health-check round.
152+
for (size_t i = 0; i < pool.size(); ++i) {
153+
auto& candidate = pool[i];
154+
ProcessPtr replacement;
155+
Status status = fork(version, &replacement);
156+
if (!status.ok()) {
157+
if (candidate) {
158+
LOG(WARNING) << "Failed to recreate unavailable Python process (pid="
159+
<< candidate->get_child_pid() << ", version="
160+
<< version.to_string() << "): " << status.to_string();
161+
} else {
162+
LOG(WARNING) << "Failed to create Python process for empty slot, version="
163+
<< version.to_string() << ": " << status.to_string();
164+
}
165+
continue;
166+
}
167+
168+
pool[i] = replacement;
169+
*process = std::move(replacement);
170+
return Status::OK();
171+
}
172+
173+
return Status::InternalError("Python process pool has no available process for version {}",
174+
version.to_string());
136175
}
137176

138177
Status PythonServerManager::fork(const PythonVersion& version, ProcessPtr* process) {

be/test/udf/python/python_server_test.cpp

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -417,6 +417,69 @@ TEST_F(PythonServerTest, GetProcessFromInitializedPool) {
417417
mgr.shutdown();
418418
}
419419

420+
TEST_F(PythonServerTest, GetProcessRecreatesDeadProcessWhenNoAliveProcess) {
421+
setup_doris_home();
422+
std::string python_path = create_fake_python_with_socket_creation("3.9.16");
423+
424+
config::max_python_process_num = 1;
425+
426+
PythonServerManager mgr;
427+
PythonVersion version("3.9.16", test_dir_, python_path);
428+
429+
ASSERT_TRUE(mgr.ensure_pool_initialized(version).ok());
430+
431+
ProcessPtr first_process;
432+
ASSERT_TRUE(mgr.get_process(version, &first_process).ok());
433+
ASSERT_NE(first_process, nullptr);
434+
ASSERT_TRUE(first_process->is_alive());
435+
pid_t first_pid = first_process->get_child_pid();
436+
437+
first_process->shutdown();
438+
ASSERT_FALSE(first_process->is_alive());
439+
440+
ProcessPtr replacement;
441+
Status status = mgr.get_process(version, &replacement);
442+
443+
EXPECT_TRUE(status.ok()) << status.to_string();
444+
ASSERT_NE(replacement, nullptr);
445+
EXPECT_TRUE(replacement->is_alive());
446+
EXPECT_NE(replacement->get_child_pid(), first_pid);
447+
448+
mgr.shutdown();
449+
}
450+
451+
TEST_F(PythonServerTest, GetProcessSkipsDeadProcessWhenAliveProcessExists) {
452+
setup_doris_home();
453+
std::string python_path = create_fake_python_with_socket_creation("3.9.16");
454+
455+
PythonServerManager mgr;
456+
PythonVersion version("3.9.16", test_dir_, python_path);
457+
458+
ProcessPtr alive_process;
459+
ASSERT_TRUE(mgr.fork(version, &alive_process).ok());
460+
ASSERT_NE(alive_process, nullptr);
461+
ASSERT_TRUE(alive_process->is_alive());
462+
463+
ProcessPtr dead_process;
464+
ASSERT_TRUE(mgr.fork(version, &dead_process).ok());
465+
ASSERT_NE(dead_process, nullptr);
466+
pid_t dead_pid = dead_process->get_child_pid();
467+
dead_process->shutdown();
468+
ASSERT_FALSE(dead_process->is_alive());
469+
470+
mgr.process_pools_for_test()[version] = {alive_process, dead_process};
471+
472+
ProcessPtr selected;
473+
Status status = mgr.get_process(version, &selected);
474+
475+
EXPECT_TRUE(status.ok()) << status.to_string();
476+
EXPECT_EQ(selected, alive_process);
477+
EXPECT_FALSE(mgr.process_pools_for_test()[version][1]->is_alive());
478+
EXPECT_EQ(mgr.process_pools_for_test()[version][1]->get_child_pid(), dead_pid);
479+
480+
mgr.shutdown();
481+
}
482+
420483
TEST_F(PythonServerTest, GetProcessLoadBalancing) {
421484
setup_doris_home();
422485
std::string python_path = create_fake_python_with_socket_creation("3.9.16");

regression-test/data/pythonudaf_p0/test_pythonudaf_drop.out

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,3 +8,8 @@
88
-- !py_udaf_drop_3 --
99
6
1010

11+
-- !py_udaf_drop_4 --
12+
6
13+
14+
-- !py_udaf_drop_5 --
15+
6

regression-test/data/pythonudf_p0/test_pythonudf_drop.out

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,3 +20,8 @@
2020
-- !py_udf_drop_7 --
2121
23
2222

23+
-- !py_udf_drop_8 --
24+
32
25+
26+
-- !py_udf_drop_9 --
27+
33

regression-test/data/pythonudtf_p0/test_pythonudtf_drop.out

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,3 +11,10 @@
1111
1
1212
2
1313

14+
-- !py_udtf_drop_4 --
15+
1
16+
2
17+
18+
-- !py_udtf_drop_5 --
19+
1
20+
2

regression-test/suites/pythonudaf_p0/test_pythonudaf_drop.groovy

Lines changed: 36 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,21 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18-
suite('test_pythonudaf_drop') {
18+
suite('test_pythonudaf_drop', 'nonConcurrent') {
1919
def runtime_version = '3.8.10'
2020
def zipA = """${context.file.parent}/udaf_scripts/python_udaf_drop_a/python_udaf_drop_test.zip"""
2121
def zipB = """${context.file.parent}/udaf_scripts/python_udaf_drop_b/python_udaf_drop_test.zip"""
22+
def backendId_to_backendIP = [:]
23+
def backendId_to_backendHttpPort = [:]
24+
getBackendIpHttpPort(backendId_to_backendIP, backendId_to_backendHttpPort)
25+
26+
def execOnBackend = { be_ip, localCmd, remoteCmd ->
27+
if (be_ip == "127.0.0.1" || be_ip == "localhost") {
28+
cmd(localCmd)
29+
} else {
30+
sshExec("root", be_ip, remoteCmd, false)
31+
}
32+
}
2233

2334
scp_udf_file_to_all_be(zipA)
2435
scp_udf_file_to_all_be(zipB)
@@ -88,9 +99,33 @@ suite('test_pythonudaf_drop') {
8899
sql '''SELECT py_drop_sum_a(v) FROM py_udaf_drop_tbl;'''
89100
exception 'Can not found function'
90101
}
102+
103+
// Case 3: kill Python servers between two aggregate queries, next CREATE handshake should recover
104+
sql '''DROP FUNCTION IF EXISTS py_drop_sum_reconnect(INT)'''
105+
sql """
106+
CREATE AGGREGATE FUNCTION py_drop_sum_reconnect(INT) RETURNS BIGINT PROPERTIES (
107+
"type" = "PYTHON_UDF",
108+
"file" = "file://${zipA}",
109+
"symbol" = "drop_udaf.SumAgg",
110+
"runtime_version" = "${runtime_version}"
111+
)
112+
"""
113+
114+
qt_py_udaf_drop_4 '''SELECT py_drop_sum_reconnect(v) FROM py_udaf_drop_tbl;'''
115+
116+
backendId_to_backendIP.values().each { be_ip ->
117+
execOnBackend(
118+
be_ip,
119+
"pkill -f 'python_server.py grpc+unix:///tmp/doris_python_udf' || true",
120+
"pkill -f 'python_server.py grpc+unix:///tmp/doris_python_udf' || true")
121+
}
122+
123+
qt_py_udaf_drop_5 '''SELECT py_drop_sum_reconnect(v) FROM py_udaf_drop_tbl;'''
124+
try_sql('DROP FUNCTION IF EXISTS py_drop_sum_reconnect(INT);')
91125
} finally {
92126
try_sql('DROP FUNCTION IF EXISTS py_drop_sum_once(INT);')
93127
try_sql('DROP FUNCTION IF EXISTS py_drop_sum_a(INT);')
94128
try_sql('DROP FUNCTION IF EXISTS py_drop_sum_b(INT);')
129+
try_sql('DROP FUNCTION IF EXISTS py_drop_sum_reconnect(INT);')
95130
}
96131
}

regression-test/suites/pythonudf_p0/test_pythonudf_drop.groovy

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18-
suite("test_pythonudf_drop") {
18+
suite("test_pythonudf_drop", 'nonConcurrent') {
1919
def runtime_version = "3.8.10"
2020
def zipA = """${context.file.parent}/udf_scripts/python_udf_drop_a/python_udf_drop_test.zip"""
2121
def zipB = """${context.file.parent}/udf_scripts/python_udf_drop_b/python_udf_drop_test.zip"""
@@ -149,11 +149,35 @@ suite("test_pythonudf_drop") {
149149

150150
qt_py_udf_drop_7 """SELECT py_drop_recover_required(22);"""
151151
try_sql("DROP FUNCTION IF EXISTS py_drop_recover_required(INT);")
152+
153+
// Case 5: kill Python servers between two queries, next client handshake should recover
154+
sql """DROP FUNCTION IF EXISTS py_drop_reconnect(INT)"""
155+
sql """
156+
CREATE FUNCTION py_drop_reconnect(INT) RETURNS INT PROPERTIES (
157+
"type" = "PYTHON_UDF",
158+
"file" = "file://${zipA}",
159+
"symbol" = "drop_udf.evaluate",
160+
"runtime_version" = "${runtime_version}"
161+
)
162+
"""
163+
164+
qt_py_udf_drop_8 """SELECT py_drop_reconnect(31);"""
165+
166+
backendId_to_backendIP.values().each { be_ip ->
167+
execOnBackend(
168+
be_ip,
169+
"pkill -f 'python_server.py grpc+unix:///tmp/doris_python_udf' || true",
170+
"pkill -f 'python_server.py grpc+unix:///tmp/doris_python_udf' || true")
171+
}
172+
173+
qt_py_udf_drop_9 """SELECT py_drop_reconnect(32);"""
174+
try_sql("DROP FUNCTION IF EXISTS py_drop_reconnect(INT);")
152175
} finally {
153176
try_sql("DROP FUNCTION IF EXISTS py_drop_once(INT);")
154177
try_sql("DROP FUNCTION IF EXISTS py_drop_a(INT);")
155178
try_sql("DROP FUNCTION IF EXISTS py_drop_b(INT);")
156179
try_sql("DROP FUNCTION IF EXISTS py_drop_recover_non_required(INT);")
157180
try_sql("DROP FUNCTION IF EXISTS py_drop_recover_required(INT);")
181+
try_sql("DROP FUNCTION IF EXISTS py_drop_reconnect(INT);")
158182
}
159183
}

regression-test/suites/pythonudtf_p0/test_pythonudtf_drop.groovy

Lines changed: 49 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,21 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18-
suite("test_pythonudtf_drop") {
18+
suite("test_pythonudtf_drop", 'nonConcurrent') {
1919
def runtime_version = "3.8.10"
2020
def zipA = """${context.file.parent}/udtf_scripts/python_udtf_drop_a/python_udtf_drop_test.zip"""
2121
def zipB = """${context.file.parent}/udtf_scripts/python_udtf_drop_b/python_udtf_drop_test.zip"""
22+
def backendId_to_backendIP = [:]
23+
def backendId_to_backendHttpPort = [:]
24+
getBackendIpHttpPort(backendId_to_backendIP, backendId_to_backendHttpPort)
25+
26+
def execOnBackend = { be_ip, localCmd, remoteCmd ->
27+
if (be_ip == "127.0.0.1" || be_ip == "localhost") {
28+
cmd(localCmd)
29+
} else {
30+
sshExec("root", be_ip, remoteCmd, false)
31+
}
32+
}
2233

2334
scp_udf_file_to_all_be(zipA)
2435
scp_udf_file_to_all_be(zipB)
@@ -122,9 +133,46 @@ suite("test_pythonudtf_drop") {
122133
"""
123134
exception "Can not found function"
124135
}
136+
137+
// Case 3: kill Python servers between two table-function queries, next handshake should recover
138+
sql """DROP FUNCTION IF EXISTS py_drop_t_reconnect(INT)"""
139+
sql """
140+
CREATE TABLES FUNCTION py_drop_t_reconnect(INT)
141+
RETURNS ARRAY<INT>
142+
PROPERTIES (
143+
"type" = "PYTHON_UDF",
144+
"file" = "file://${zipA}",
145+
"symbol" = "drop_udtf.process",
146+
"runtime_version" = "${runtime_version}"
147+
)
148+
"""
149+
150+
qt_py_udtf_drop_4 """
151+
SELECT c
152+
FROM py_udtf_drop_tbl
153+
LATERAL VIEW py_drop_t_reconnect(v) tmp AS c
154+
ORDER BY c;
155+
"""
156+
157+
backendId_to_backendIP.values().each { be_ip ->
158+
execOnBackend(
159+
be_ip,
160+
"pkill -f 'python_server.py grpc+unix:///tmp/doris_python_udf' || true",
161+
"pkill -f 'python_server.py grpc+unix:///tmp/doris_python_udf' || true")
162+
}
163+
sleep(2000)
164+
165+
qt_py_udtf_drop_5 """
166+
SELECT c
167+
FROM py_udtf_drop_tbl
168+
LATERAL VIEW py_drop_t_reconnect(v) tmp AS c
169+
ORDER BY c;
170+
"""
171+
try_sql("DROP FUNCTION IF EXISTS py_drop_t_reconnect(INT);")
125172
} finally {
126173
try_sql("DROP FUNCTION IF EXISTS py_drop_t_once(INT);")
127174
try_sql("DROP FUNCTION IF EXISTS py_drop_t_a(INT);")
128175
try_sql("DROP FUNCTION IF EXISTS py_drop_t_b(INT);")
176+
try_sql("DROP FUNCTION IF EXISTS py_drop_t_reconnect(INT);")
129177
}
130178
}

0 commit comments

Comments
 (0)