Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
111 changes: 76 additions & 35 deletions src/prefect/logging/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,33 +9,19 @@
import uuid
import warnings
from contextlib import asynccontextmanager
from functools import lru_cache
from typing import TYPE_CHECKING, Any, Callable, Dict, TextIO, Type

from rich.console import Console
from rich.highlighter import Highlighter, NullHighlighter
from rich.theme import Theme
from typing_extensions import Self

import prefect.context
from prefect._internal.concurrency.api import create_call, from_sync
from prefect._internal.concurrency.event_loop import get_running_loop
from prefect._internal.concurrency.services import BatchedQueueService
from prefect._internal.concurrency.threads import in_global_loop
from prefect.client.orchestration import get_client
from prefect.client.schemas.actions import LogCreate
from prefect.exceptions import MissingContextError
from prefect.logging.highlighters import PrefectConsoleHighlighter
from prefect.settings import (
PREFECT_API_URL,
PREFECT_LOGGING_COLORS,
PREFECT_LOGGING_INTERNAL_LEVEL,
PREFECT_LOGGING_MARKUP,
PREFECT_LOGGING_TO_API_BATCH_INTERVAL,
PREFECT_LOGGING_TO_API_BATCH_SIZE,
PREFECT_LOGGING_TO_API_MAX_LOG_SIZE,
PREFECT_LOGGING_TO_API_WHEN_MISSING_FLOW,
)
from prefect.types._datetime import from_timestamp

if sys.version_info >= (3, 12):
StreamHandler = logging.StreamHandler[TextIO]
Expand All @@ -52,6 +38,54 @@
_api_log_sink: Callable[[Dict[str, Any]], None] | None = None


@lru_cache(maxsize=1)
def _get_prefect_context_module():
import prefect.context

return prefect.context


@lru_cache(maxsize=1)
def _get_get_client():
from prefect.client.orchestration import get_client

return get_client


@lru_cache(maxsize=1)
def _get_log_models():
from prefect.client.schemas.actions import LogCreate
from prefect.exceptions import MissingContextError
from prefect.types._datetime import from_timestamp

return LogCreate, MissingContextError, from_timestamp


@lru_cache(maxsize=1)
def _get_logging_settings():
from prefect.settings import (
PREFECT_API_URL,
PREFECT_LOGGING_COLORS,
PREFECT_LOGGING_INTERNAL_LEVEL,
PREFECT_LOGGING_MARKUP,
PREFECT_LOGGING_TO_API_BATCH_INTERVAL,
PREFECT_LOGGING_TO_API_BATCH_SIZE,
PREFECT_LOGGING_TO_API_MAX_LOG_SIZE,
PREFECT_LOGGING_TO_API_WHEN_MISSING_FLOW,
)

return {
"api_url": PREFECT_API_URL,
"logging_colors": PREFECT_LOGGING_COLORS,
"logging_internal_level": PREFECT_LOGGING_INTERNAL_LEVEL,
"logging_markup": PREFECT_LOGGING_MARKUP,
"logging_to_api_batch_interval": PREFECT_LOGGING_TO_API_BATCH_INTERVAL,
"logging_to_api_batch_size": PREFECT_LOGGING_TO_API_BATCH_SIZE,
"logging_to_api_max_log_size": PREFECT_LOGGING_TO_API_MAX_LOG_SIZE,
"logging_to_api_when_missing_flow": PREFECT_LOGGING_TO_API_WHEN_MISSING_FLOW,
}


def set_api_log_sink(sink: Callable[[Dict[str, Any]], None] | None) -> None:
global _api_log_sink
_api_log_sink = sink
Expand All @@ -68,15 +102,16 @@ def emit_api_log(log: Dict[str, Any]) -> None:
class APILogWorker(BatchedQueueService[Dict[str, Any]]):
@property
def max_batch_size(self) -> int:
settings = _get_logging_settings()
return max(
PREFECT_LOGGING_TO_API_BATCH_SIZE.value()
- PREFECT_LOGGING_TO_API_MAX_LOG_SIZE.value(),
PREFECT_LOGGING_TO_API_MAX_LOG_SIZE.value(),
settings["logging_to_api_batch_size"].value()
- settings["logging_to_api_max_log_size"].value(),
settings["logging_to_api_max_log_size"].value(),
)

@property
def min_interval(self) -> float | None:
return PREFECT_LOGGING_TO_API_BATCH_INTERVAL.value()
return _get_logging_settings()["logging_to_api_batch_interval"].value()

async def _handle_batch(self, items: list[dict[str, Any]]):
try:
Expand All @@ -85,23 +120,25 @@ async def _handle_batch(self, items: list[dict[str, Any]]):
# Roughly replicate the behavior of the stdlib logger error handling
if logging.raiseExceptions and sys.stderr:
sys.stderr.write("--- Error logging to API ---\n")
if PREFECT_LOGGING_INTERNAL_LEVEL.value() == "DEBUG":
if _get_logging_settings()["logging_internal_level"].value() == "DEBUG":
traceback.print_exc(file=sys.stderr)
else:
# Only log the exception message in non-DEBUG mode
sys.stderr.write(str(e))

@asynccontextmanager
async def _lifespan(self):
get_client = _get_get_client()
async with get_client() as self._client:
yield

@classmethod
def instance(cls: Type[Self], *args: Any) -> Self:
settings = _get_logging_settings()
settings = (
PREFECT_LOGGING_TO_API_BATCH_SIZE.value(),
PREFECT_API_URL.value(),
PREFECT_LOGGING_TO_API_MAX_LOG_SIZE.value(),
settings["logging_to_api_batch_size"].value(),
settings["api_url"].value(),
settings["logging_to_api_max_log_size"].value(),
)

# Ensure a unique worker is retrieved per relevant logging settings
Expand Down Expand Up @@ -161,7 +198,7 @@ def emit(self, record: logging.LogRecord) -> None:
Send a log to the `APILogWorker`
"""
try:
profile = prefect.context.get_settings_context()
profile = _get_prefect_context_module().get_settings_context()

if not profile.settings.logging.to_api.enabled:
return # Respect the global settings toggle
Expand All @@ -176,11 +213,12 @@ def emit(self, record: logging.LogRecord) -> None:

def handleError(self, record: logging.LogRecord) -> None:
_, exc, _ = sys.exc_info()
_, MissingContextError, _ = _get_log_models()

if isinstance(exc, MissingContextError):
log_handling_when_missing_flow = (
PREFECT_LOGGING_TO_API_WHEN_MISSING_FLOW.value()
)
log_handling_when_missing_flow = _get_logging_settings()[
"logging_to_api_when_missing_flow"
].value()
if log_handling_when_missing_flow == "warn":
# Warn when a logger is used outside of a run context, the stack level here
# gets us to the user logging call
Expand Down Expand Up @@ -211,10 +249,11 @@ def prepare(self, record: logging.LogRecord) -> Dict[str, Any]:
flow_run_id = getattr(record, "flow_run_id", None)
task_run_id = getattr(record, "task_run_id", None)
worker_id = getattr(record, "worker_id", None)
LogCreate, MissingContextError, from_timestamp = _get_log_models()

if not flow_run_id:
try:
context = prefect.context.get_run_context()
context = _get_prefect_context_module().get_run_context()
except MissingContextError:
raise MissingContextError(
f"Logger {record.name!r} attempted to send logs to the API without"
Expand Down Expand Up @@ -259,8 +298,8 @@ def prepare(self, record: logging.LogRecord) -> Dict[str, Any]:
).model_dump(mode="json")

log_size = log["__payload_size__"] = self._get_payload_size(log)
if log_size > PREFECT_LOGGING_TO_API_MAX_LOG_SIZE.value():
max_size = PREFECT_LOGGING_TO_API_MAX_LOG_SIZE.value()
max_size = _get_logging_settings()["logging_to_api_max_log_size"].value()
if log_size > max_size:
oversize = log_size - max_size
BUFFER = 50
truncated_length = max(len(formatted_message) - oversize - BUFFER, 0)
Expand Down Expand Up @@ -305,6 +344,7 @@ def prepare(self, record: logging.LogRecord) -> Dict[str, Any]:
"""

worker_id = getattr(record, "worker_id", None)
LogCreate, _, from_timestamp = _get_log_models()

log = LogCreate(
worker_id=worker_id,
Expand All @@ -315,10 +355,10 @@ def prepare(self, record: logging.LogRecord) -> Dict[str, Any]:
).model_dump(mode="json")

log_size = log["__payload_size__"] = self._get_payload_size(log)
if log_size > PREFECT_LOGGING_TO_API_MAX_LOG_SIZE.value():
max_size = _get_logging_settings()["logging_to_api_max_log_size"].value()
if log_size > max_size:
raise ValueError(
f"Log of size {log_size} is greater than the max size of "
f"{PREFECT_LOGGING_TO_API_MAX_LOG_SIZE.value()}"
f"Log of size {log_size} is greater than the max size of {max_size}"
)

return log
Expand All @@ -343,8 +383,9 @@ def __init__(
"""
super().__init__(stream=stream)

styled_console = PREFECT_LOGGING_COLORS.value()
markup_console = PREFECT_LOGGING_MARKUP.value()
settings = _get_logging_settings()
styled_console = settings["logging_colors"].value()
markup_console = settings["logging_markup"].value()
if styled_console:
highlighter_instance = highlighter()
theme = Theme(styles, inherit=False)
Expand Down