Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 15 additions & 4 deletions python/ray/scripts/scripts.py
Original file line number Diff line number Diff line change
Expand Up @@ -1322,9 +1322,19 @@ def kill_procs(
gracefully, they are added here.
"""
process_infos = []
for proc in psutil.process_iter(["name", "cmdline"]):
pre_stopped = []
for proc in psutil.process_iter(["name", "cmdline", "status"]):
try:
process_infos.append((proc, proc.name(), proc.cmdline()))
if proc.status() == psutil.STATUS_ZOMBIE:
name = proc.name()
cmdline = proc.cmdline()
for keyword, filter_by_cmd in processes_to_kill:
corpus = name if filter_by_cmd else subprocess.list2cmdline(cmdline)
if keyword in corpus:
pre_stopped.append(proc)
break
else:
process_infos.append((proc, proc.name(), proc.cmdline()))
Comment thread
cursor[bot] marked this conversation as resolved.
except psutil.Error:
pass
Comment thread
ssam18 marked this conversation as resolved.

Expand Down Expand Up @@ -1387,7 +1397,7 @@ def kill_procs(
# Dedup processes.
stopped, alive = psutil.wait_procs(stopped, timeout=0)
procs_to_kill = stopped + alive
total_found = len(procs_to_kill)
total_found = len(procs_to_kill) + len(pre_stopped)

# Wait for grace period to terminate processes.
gone_procs = set()
Expand All @@ -1399,7 +1409,8 @@ def on_terminate(proc):
stopped, alive = psutil.wait_procs(
procs_to_kill, timeout=grace_period, callback=on_terminate
)
total_stopped = len(stopped)
# Zombies were already dead; count them toward the stopped total.
total_stopped = len(stopped) + len(pre_stopped)

# For processes that are not killed within the grace period,
# we send force termination signals.
Expand Down
9 changes: 7 additions & 2 deletions python/ray/scripts/symmetric_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -247,8 +247,13 @@ def symmetric_run(address, min_nodes, ray_args_and_entrypoint):
*ray_start_args,
]

# This command will block until the Ray cluster is stopped.
subprocess.run(ray_start_cmd, check=True)
worker_result = subprocess.run(ray_start_cmd)
if worker_result.returncode != 0:
click.echo(
f"ray start --block exited with code {worker_result.returncode} "
"(expected on normal cluster shutdown).",
err=True,
)

except subprocess.CalledProcessError as e:
click.echo(f"Failed to start Ray: {e}", err=True)
Expand Down
Loading