Skip to content

Commit 0787271

Browse files
ctruedenclaude
andcommitted
Add Service.init() for early worker initialization
Enables execution of scripts before the worker's I/O loop starts, providing a general solution for imports that interfere with I/O operations (e.g., numpy hang on Windows - numpy/numpy#24290). The init script is passed to workers via a temporary file referenced by the APPOSE_INIT_SCRIPT environment variable. GroovyWorker and python_worker both support this mechanism. As per: * apposed/appose-java@fb6b861 * apposed/appose-java@5e3eb7f 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]>
1 parent 9dea3eb commit 0787271

File tree

2 files changed

+100
-0
lines changed

2 files changed

+100
-0
lines changed

src/appose/service.py

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
from __future__ import annotations
1010

1111
import subprocess
12+
import tempfile
1213
import threading
1314
from enum import Enum
1415
from pathlib import Path
@@ -56,6 +57,7 @@ def __init__(self, cwd: str | Path, env_vars: dict[str, str] | None = None, *arg
5657
self._stderr_thread: threading.Thread | None = None
5758
self._monitor_thread: threading.Thread | None = None
5859
self._debug_callback: Callable[[Any], Any] | None = None
60+
self._init_script: str | None = None
5961
self._syntax: ScriptSyntax | None = None
6062

6163
def debug(self, debug_callback: Callable[[Any], Any]) -> None:
@@ -68,6 +70,32 @@ def debug(self, debug_callback: Callable[[Any], Any]) -> None:
6870
"""
6971
self._debug_callback = debug_callback
7072

73+
def init(self, script: str) -> "Service":
74+
"""
75+
Register a script to be executed when the worker process first starts up,
76+
before any tasks are processed. This is useful for early initialization that
77+
must happen before the worker's main loop begins, such as importing libraries
78+
that may interfere with I/O operations.
79+
80+
Example: On Windows, importing numpy can hang when stdin is open for reading
81+
(described at https://github.com/numpy/numpy/issues/24290).
82+
Using service.init("import numpy") works around this by importing
83+
numpy before the worker's I/O loop starts.
84+
85+
Args:
86+
script: The script code to execute during worker initialization.
87+
88+
Returns:
89+
This service object, for chaining method calls.
90+
91+
Raises:
92+
RuntimeError: If the service has already started.
93+
"""
94+
if self._process is not None:
95+
raise RuntimeError("Service already started")
96+
self._init_script = script
97+
return self
98+
7199
def start(self) -> None:
72100
"""
73101
Explicitly launch the worker process associated with this service.
@@ -84,6 +112,19 @@ def start(self) -> None:
84112

85113
prefix = f"Appose-Service-{self._service_id}"
86114

115+
# If an init script is provided, write it to a temporary file
116+
# and pass its path via environment variable.
117+
if self._init_script:
118+
with tempfile.NamedTemporaryFile(
119+
mode="w",
120+
encoding="utf-8",
121+
prefix="appose-init-",
122+
suffix=".txt",
123+
delete=False,
124+
) as init_file:
125+
init_file.write(self._init_script)
126+
self._env_vars["APPOSE_INIT_SCRIPT"] = init_file.name
127+
87128
self._process = process.builder(self._cwd, self._env_vars, *self._args)
88129
self._stdout_thread = threading.Thread(
89130
target=self._stdout_loop, name=f"{prefix}-Stdout"

tests/test_service.py

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,3 +131,62 @@ def test_main_thread_queue_python():
131131
assert TaskStatus.COMPLETE == task.status
132132
thread = task.outputs.get("thread")
133133
assert thread != "MainThread"
134+
135+
136+
def test_init():
137+
"""Tests that init script is executed before tasks run."""
138+
env = appose.system()
139+
with env.python().init("init_value = 'initialized'") as service:
140+
maybe_debug(service)
141+
142+
# Verify that the init script was executed and the variable is accessible.
143+
task = service.task("init_value").wait_for()
144+
assert TaskStatus.COMPLETE == task.status
145+
146+
result = task.result()
147+
assert result == "initialized", "Init script should set init_value variable"
148+
149+
150+
def test_init_numpy():
151+
"""Tests that NumPy works on every platform, even Windows."""
152+
env = (
153+
appose.pixi()
154+
.base("target/envs/test-init-numpy")
155+
.conda("numpy=2.3.4")
156+
.pypi("appose==0.7.2")
157+
.log_debug()
158+
.build()
159+
)
160+
with env.python().init("import numpy") as service:
161+
maybe_debug(service)
162+
163+
task = service.task(
164+
"import numpy\n"
165+
"narr = numpy.random.default_rng(seed=1337).random([3, 5])\n"
166+
"[float(v) for v in narr.flatten()]"
167+
).wait_for()
168+
assert TaskStatus.COMPLETE == task.status
169+
170+
result = task.outputs.get("result")
171+
assert isinstance(result, list)
172+
expected = [
173+
0.8781019003,
174+
0.1855279616,
175+
0.9209004548,
176+
0.9465658637,
177+
0.8745080903,
178+
0.1157427629,
179+
0.1937316623,
180+
0.3417371975,
181+
0.4957909002,
182+
0.8983712328,
183+
0.0064586191,
184+
0.2274114670,
185+
0.7936549524,
186+
0.4142867178,
187+
0.0838144031,
188+
]
189+
for i, expected_val in enumerate(expected):
190+
actual_val = result[i]
191+
assert isinstance(actual_val, (int, float))
192+
assert abs(actual_val - expected_val) < 1e-10

0 commit comments

Comments
 (0)