Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions src/prefect/events/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ async def _emit(self, event: Event) -> None:


def should_emit_events() -> bool:
from prefect.settings import get_current_settings

if not get_current_settings().client.emit_events:
return False
return (
emit_events_to_cloud()
or should_emit_events_to_running_server()
Expand Down
14 changes: 14 additions & 0 deletions src/prefect/settings/models/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,20 @@ class ClientSettings(PrefectBaseSettings):
""",
)

emit_events: bool = Field(
default=True,
description="""
Whether the client should emit events to the Prefect server.
When disabled, no events will be sent, which can reduce memory usage
for long-running flows that do not rely on event-driven features
(automations, triggers).
""",
validation_alias=AliasChoices(
AliasPath("emit_events"),
"prefect_client_emit_events",
),
)

metrics: ClientMetricsSettings = Field(
default_factory=ClientMetricsSettings,
description="Settings for controlling metrics reporting from the client",
Expand Down
53 changes: 53 additions & 0 deletions tests/events/client/test_emit_events_setting.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
"""Tests for PREFECT_CLIENT_EMIT_EVENTS setting."""

from unittest import mock

from prefect.events.worker import should_emit_events
from prefect.settings import PREFECT_CLIENT_EMIT_EVENTS, temporary_settings


class TestEmitEventsSetting:
"""Tests for the client.emit_events setting."""

def test_default_is_true(self):
"""The setting defaults to True so existing behavior is preserved."""
assert PREFECT_CLIENT_EMIT_EVENTS.default() is True

def test_should_emit_events_respects_setting_when_false(self):
"""When emit_events is False, should_emit_events() returns False
regardless of API URL or key configuration."""
with temporary_settings(updates={PREFECT_CLIENT_EMIT_EVENTS: False}):
assert should_emit_events() is False

def test_should_emit_events_normal_when_true(self):
"""When emit_events is True (default), should_emit_events() delegates
to the existing logic based on API URL/key."""
with (
temporary_settings(updates={PREFECT_CLIENT_EMIT_EVENTS: True}),
mock.patch(
"prefect.events.worker.emit_events_to_cloud", return_value=False
),
mock.patch(
"prefect.events.worker.should_emit_events_to_running_server",
return_value=False,
),
mock.patch(
"prefect.events.worker.should_emit_events_to_ephemeral_server",
return_value=False,
),
):
assert should_emit_events() is False

def test_should_emit_events_true_with_cloud(self):
"""When emit_events is True and cloud is configured, returns True."""
with (
temporary_settings(updates={PREFECT_CLIENT_EMIT_EVENTS: True}),
mock.patch("prefect.events.worker.emit_events_to_cloud", return_value=True),
):
assert should_emit_events() is True

def test_emit_events_false_overrides_cloud(self):
"""Even with cloud configured, emit_events=False disables emission."""
with temporary_settings(updates={PREFECT_CLIENT_EMIT_EVENTS: False}):
# Even if cloud would return True, setting takes precedence
assert should_emit_events() is False
1 change: 1 addition & 0 deletions tests/test_settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,7 @@
"PREFECT_API_TLS_INSECURE_SKIP_VERIFY": {"test_value": True},
"PREFECT_API_URL": {"test_value": "https://api.prefect.io"},
"PREFECT_CLIENT_CSRF_SUPPORT_ENABLED": {"test_value": True},
"PREFECT_CLIENT_EMIT_EVENTS": {"test_value": False},
"PREFECT_CLIENT_CUSTOM_HEADERS": {"test_value": '{"X-CUSTOM": "foobar"}'},
"PREFECT_CLIENT_ENABLE_METRICS": {"test_value": True, "legacy": True},
"PREFECT_CLIENT_MAX_RETRIES": {"test_value": 3},
Expand Down
Loading