Skip to content
Open
9 changes: 9 additions & 0 deletions src/prefect/settings/models/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,15 @@ class WorkerSettings(PrefectBaseSettings):
),
)

process_command_wrappers: list[str] = Field(
default=["opentelemetry-instrument", "ddtrace-run"],
description=(
"List of wrapper commands to detect and propagate when spawning "
"flow run subprocesses. If a wrapper is detected as active in the "
"current environment, it will be prepended to the subprocess command."
),
)

webserver: WorkerWebserverSettings = Field(
default_factory=WorkerWebserverSettings,
description="Settings for a worker's webserver",
Expand Down
31 changes: 24 additions & 7 deletions src/prefect/workers/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
Module containing the Process worker used for executing flow runs as subprocesses.

To start a Process worker, run the following command:

```bash
prefect worker start --pool 'my-work-pool' --type process
```
Expand Down Expand Up @@ -32,6 +31,7 @@
from prefect.client.schemas.objects import Flow as APIFlow
from prefect.runner.runner import Runner
from prefect.settings import PREFECT_WORKER_QUERY_SECONDS
from prefect.settings.context import get_current_settings
from prefect.states import Pending
from prefect.utilities.processutils import get_sys_executable
from prefect.utilities.services import (
Expand All @@ -56,6 +56,17 @@
FR = TypeVar("FR") # used to capture the return type of a flow


def _detect_active_wrappers(wrappers: list[str]) -> list[str]:
"""Detect which known wrappers are active in the current environment."""
wrapper_env_signals: dict[str, Any] = {
"opentelemetry-instrument": lambda: any(
k.startswith("OTEL_") for k in os.environ
),
"ddtrace-run": lambda: any(k.startswith("DD_") for k in os.environ),
}
return [w for w in wrappers if (check := wrapper_env_signals.get(w)) and check()]


class ProcessJobConfiguration(BaseJobConfiguration):
stream_output: bool = Field(default=True)
working_dir: Optional[Path] = Field(default=None)
Expand Down Expand Up @@ -86,11 +97,17 @@ def prepare_for_flow_run(
)

self.env: dict[str, str | None] = {**os.environ, **self.env}
self.command: str | None = (
f"{get_sys_executable()} -m prefect.engine"
if self.command == self._base_flow_run_command()
else self.command
)

if self.command == self._base_flow_run_command():
executable = get_sys_executable()
wrappers = _detect_active_wrappers(
get_current_settings().worker.process_command_wrappers
)
if wrappers:
prefix = " ".join(wrappers)
self.command: str | None = f"{prefix} {executable} -m prefect.engine"
else:
self.command: str | None = f"{executable} -m prefect.engine"

@staticmethod
def _base_flow_run_command() -> str:
Expand Down Expand Up @@ -235,7 +252,7 @@ async def start(
async def run(
self,
flow_run: "FlowRun",
configuration: ProcessJobConfiguration,
configuration: "ProcessJobConfiguration",
task_status: Optional[anyio.abc.TaskStatus[int]] = None,
) -> ProcessWorkerResult:
if task_status is None:
Expand Down
135 changes: 117 additions & 18 deletions tests/workers/test_process_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import uuid
from datetime import timedelta
from pathlib import Path
from unittest.mock import AsyncMock, MagicMock
from unittest.mock import AsyncMock, MagicMock, patch

import anyio
import anyio.abc
Expand Down Expand Up @@ -539,7 +539,6 @@ async def test_submit_adhoc_run_with_existing_flow_run_reuses_id(
monkeypatch: pytest.MonkeyPatch,
):
"""Test that _submit_adhoc_run with flow_run parameter reuses the flow run ID."""
# Mock execute_bundle to prevent actual execution
mock_execute_bundle = AsyncMock()
monkeypatch.setattr(
"prefect.runner.runner.Runner.execute_bundle", mock_execute_bundle
Expand All @@ -550,24 +549,20 @@ def test_flow():
return "success"

async with ProcessWorker(work_pool_name=process_work_pool.name) as worker:
# Create an initial flow run
initial_flow_run = await prefect_client.create_flow_run(
test_flow,
parameters={},
state=client_schemas.State(type=client_schemas.StateType.FAILED),
)

# Call _submit_adhoc_run with the existing flow run to retry it
await worker._submit_adhoc_run(
flow=test_flow,
parameters={},
flow_run=initial_flow_run,
)

# The flow run should have been reused (same ID) and state set to Pending
retried_flow_run = await prefect_client.read_flow_run(initial_flow_run.id)

# State should be Pending (set before retry execution)
assert retried_flow_run.state is not None
assert retried_flow_run.state.type == client_schemas.StateType.PENDING

Expand All @@ -578,7 +573,6 @@ async def test_submit_adhoc_run_with_existing_flow_run_sets_pending_state(
monkeypatch: pytest.MonkeyPatch,
):
"""Test that _submit_adhoc_run sets the state to Pending when retrying."""
# Mock execute_bundle to prevent actual execution
mock_execute_bundle = AsyncMock()
monkeypatch.setattr(
"prefect.runner.runner.Runner.execute_bundle", mock_execute_bundle
Expand All @@ -589,31 +583,24 @@ def test_flow():
return "done"

async with ProcessWorker(work_pool_name=process_work_pool.name) as worker:
# Create an initial flow run with FAILED state
initial_flow_run = await prefect_client.create_flow_run(
test_flow,
parameters={},
state=client_schemas.State(type=client_schemas.StateType.FAILED),
)

# Verify initial state is FAILED
assert initial_flow_run.state is not None
assert initial_flow_run.state.type == client_schemas.StateType.FAILED

# Call _submit_adhoc_run with the existing flow run
await worker._submit_adhoc_run(
flow=test_flow,
parameters={},
flow_run=initial_flow_run,
)

# execute_bundle should have been called (which means state was set to Pending first)
mock_execute_bundle.assert_called()

# Verify the flow run state was set to Pending before execution
retried_flow_run = await prefect_client.read_flow_run(initial_flow_run.id)
# The state should have been set to Pending (or a subsequent state after execution)
# Since we mocked execute_bundle, the state should still be Pending
assert retried_flow_run.state is not None
assert retried_flow_run.state.type == client_schemas.StateType.PENDING

Expand All @@ -624,7 +611,6 @@ async def test_submit_adhoc_run_without_flow_run_creates_new_run(
monkeypatch: pytest.MonkeyPatch,
):
"""Test that _submit_adhoc_run creates a new flow run when flow_run is None."""
# Mock execute_bundle to prevent actual execution
mock_execute_bundle = AsyncMock()
monkeypatch.setattr(
"prefect.runner.runner.Runner.execute_bundle", mock_execute_bundle
Expand All @@ -634,17 +620,130 @@ async def test_submit_adhoc_run_without_flow_run_creates_new_run(
def test_flow():
return "new run"

# Get initial count of flow runs
initial_flow_runs = await prefect_client.read_flow_runs()
initial_count = len(initial_flow_runs)

async with ProcessWorker(work_pool_name=process_work_pool.name) as worker:
# Call _submit_adhoc_run without flow_run parameter
await worker._submit_adhoc_run(
flow=test_flow,
parameters={},
)

# A new flow run should have been created
final_flow_runs = await prefect_client.read_flow_runs()
assert len(final_flow_runs) > initial_count


class TestDetectActiveWrappers:
def test_no_env_vars_returns_empty(self, monkeypatch: pytest.MonkeyPatch):
monkeypatch.delenv("OTEL_SERVICE_NAME", raising=False)
monkeypatch.delenv("DD_SERVICE", raising=False)
from prefect.workers.process import _detect_active_wrappers

result = _detect_active_wrappers(["opentelemetry-instrument", "ddtrace-run"])
assert result == []

def test_otel_env_var_detects_otel_wrapper(self, monkeypatch: pytest.MonkeyPatch):
monkeypatch.setenv("OTEL_SERVICE_NAME", "my-service")
monkeypatch.delenv("DD_SERVICE", raising=False)
from prefect.workers.process import _detect_active_wrappers

result = _detect_active_wrappers(["opentelemetry-instrument", "ddtrace-run"])
assert result == ["opentelemetry-instrument"]

def test_dd_env_var_detects_ddtrace_wrapper(self, monkeypatch: pytest.MonkeyPatch):
monkeypatch.delenv("OTEL_SERVICE_NAME", raising=False)
monkeypatch.setenv("DD_SERVICE", "my-service")
from prefect.workers.process import _detect_active_wrappers

result = _detect_active_wrappers(["opentelemetry-instrument", "ddtrace-run"])
assert result == ["ddtrace-run"]

def test_both_env_vars_detects_both_wrappers(self, monkeypatch: pytest.MonkeyPatch):
monkeypatch.setenv("OTEL_SERVICE_NAME", "my-service")
monkeypatch.setenv("DD_SERVICE", "my-service")
from prefect.workers.process import _detect_active_wrappers

result = _detect_active_wrappers(["opentelemetry-instrument", "ddtrace-run"])
assert result == ["opentelemetry-instrument", "ddtrace-run"]

def test_empty_wrappers_list_returns_empty(self, monkeypatch: pytest.MonkeyPatch):
monkeypatch.setenv("OTEL_SERVICE_NAME", "my-service")
from prefect.workers.process import _detect_active_wrappers

result = _detect_active_wrappers([])
assert result == []

def test_unknown_wrapper_is_ignored(self, monkeypatch: pytest.MonkeyPatch):
monkeypatch.setenv("OTEL_SERVICE_NAME", "my-service")
from prefect.workers.process import _detect_active_wrappers

result = _detect_active_wrappers(["some-unknown-wrapper"])
assert result == []


class TestPrepareForFlowRunCommand:
async def test_no_wrappers_uses_plain_executable(
self, monkeypatch: pytest.MonkeyPatch, flow_run: FlowRun
):
monkeypatch.delenv("OTEL_SERVICE_NAME", raising=False)
monkeypatch.delenv("DD_SERVICE", raising=False)
from prefect.workers.process import ProcessJobConfiguration

config = ProcessJobConfiguration()
config.prepare_for_flow_run(flow_run)
assert "opentelemetry-instrument" not in (config.command or "")
assert "ddtrace-run" not in (config.command or "")
assert "prefect.engine" in (config.command or "")

async def test_otel_env_prepends_otel_wrapper(
self, monkeypatch: pytest.MonkeyPatch, flow_run: FlowRun
):
monkeypatch.setenv("OTEL_SERVICE_NAME", "my-service")
monkeypatch.delenv("DD_SERVICE", raising=False)
from prefect.workers.process import ProcessJobConfiguration

config = ProcessJobConfiguration()
config.prepare_for_flow_run(flow_run)
assert (config.command or "").startswith("opentelemetry-instrument")
assert "prefect.engine" in (config.command or "")

async def test_dd_env_prepends_ddtrace_wrapper(
self, monkeypatch: pytest.MonkeyPatch, flow_run: FlowRun
):
monkeypatch.delenv("OTEL_SERVICE_NAME", raising=False)
monkeypatch.setenv("DD_SERVICE", "my-service")
from prefect.workers.process import ProcessJobConfiguration

config = ProcessJobConfiguration()
config.prepare_for_flow_run(flow_run)
assert (config.command or "").startswith("ddtrace-run")
assert "prefect.engine" in (config.command or "")

async def test_custom_command_is_not_modified(
self, monkeypatch: pytest.MonkeyPatch, flow_run: FlowRun
):
monkeypatch.setenv("OTEL_SERVICE_NAME", "my-service")
from prefect.workers.process import ProcessJobConfiguration

config = ProcessJobConfiguration(command="echo hello")
config.prepare_for_flow_run(flow_run)
assert config.command == "echo hello"

async def test_empty_wrappers_setting_uses_plain_executable(
self, monkeypatch: pytest.MonkeyPatch, flow_run: FlowRun
):
monkeypatch.setenv("OTEL_SERVICE_NAME", "my-service")
from prefect.settings.context import get_current_settings
from prefect.workers.process import ProcessJobConfiguration

mock_settings = get_current_settings()
mock_settings.worker.process_command_wrappers = []

with patch(
"prefect.workers.process.get_current_settings",
return_value=mock_settings,
):
config = ProcessJobConfiguration()
config.prepare_for_flow_run(flow_run)
assert "opentelemetry-instrument" not in (config.command or "")
assert "prefect.engine" in (config.command or "")
Loading