diff --git a/integration-tests/test_schedule_statefulness.py b/integration-tests/test_schedule_statefulness.py index 674052917606..c277bbc185cb 100644 --- a/integration-tests/test_schedule_statefulness.py +++ b/integration-tests/test_schedule_statefulness.py @@ -25,6 +25,12 @@ from prefect.client.schemas.objects import DeploymentSchedule from prefect.client.schemas.schedules import CronSchedule from prefect.schedules import Cron, Schedule +from prefect.settings import ( + PREFECT_API_KEY, + PREFECT_API_URL, + PREFECT_SERVER_ALLOW_EPHEMERAL_MODE, + temporary_settings, +) @pytest.fixture() @@ -128,3 +134,37 @@ def test_schedule_statefulness(deployment_name: str): f"Expected named schedule to persist: {schedules}" ) print("All tests passed!") + + +def test_schedule_id_stability_for_no_slug_redeploy(deployment_name: str): + # Use an ephemeral local API to ensure this integration test exercises + # branch code and does not depend on external profile configuration. + with temporary_settings( + { + PREFECT_API_URL: "", + PREFECT_API_KEY: "", + PREFECT_SERVER_ALLOW_EPHEMERAL_MODE: True, + } + ): + initial_deployment = my_flow.to_deployment( + name=deployment_name, + schedules=[Cron("0 9 * * *")], + ) + initial_deployment.apply() + + initial_schedules = check_deployment_schedules(f"my-flow/{deployment_name}") + assert len(initial_schedules) == 1 + initial_schedule = initial_schedules[0] + assert initial_schedule.schedule == CronSchedule(cron="0 9 * * *") + + updated_deployment = my_flow.to_deployment( + name=deployment_name, + schedules=[Cron("0 10 * * *")], + ) + updated_deployment.apply() + + updated_schedules = check_deployment_schedules(f"my-flow/{deployment_name}") + assert len(updated_schedules) == 1 + updated_schedule = updated_schedules[0] + assert updated_schedule.schedule == CronSchedule(cron="0 10 * * *") + assert updated_schedule.id == initial_schedule.id diff --git a/src/prefect/server/api/deployments.py b/src/prefect/server/api/deployments.py index 8341ff5bfc22..a2dfabd5219a 100644 --- a/src/prefect/server/api/deployments.py +++ b/src/prefect/server/api/deployments.py @@ -2,9 +2,12 @@ Routes for interacting with Deployment objects. """ +import asyncio import datetime import logging -from typing import List, Optional +from collections import defaultdict +from contextlib import asynccontextmanager +from typing import Any, AsyncIterator, List, Optional from uuid import UUID import jsonschema.exceptions @@ -32,6 +35,7 @@ FlowRunCreateResult, ) from prefect.server.utilities.server import PrefectRouter +from prefect.settings import get_current_settings from prefect.types import DateTime from prefect.types._datetime import now from prefect.utilities.schema_tools.hydration import ( @@ -49,6 +53,109 @@ router: PrefectRouter = PrefectRouter(prefix="/deployments", tags=["Deployments"]) +# Lock timeout in seconds for deployment schedule updates +DEPLOYMENT_SCHEDULE_LOCK_TIMEOUT = 30 + +# In-memory locks keyed by deployment identifier, used when Redis is not available. +_deployment_locks: defaultdict[str, asyncio.Lock] = defaultdict(asyncio.Lock) + +try: + from prefect_redis.client import get_async_redis_client as _get_async_redis_client +except Exception: + _get_async_redis_client = None + + +def _get_redis_client() -> "Any | None": + """Try to get a Redis client if prefect-redis is configured as the broker.""" + broker = get_current_settings().server.events.messaging_broker + if "prefect_redis" not in broker: + return None + + if _get_async_redis_client is None: + logger.error( + "Redis messaging broker is configured, but prefect-redis is unavailable." + ) + raise HTTPException( + status_code=status.HTTP_503_SERVICE_UNAVAILABLE, + detail=( + "Redis messaging broker is configured but unavailable. " + "Unable to safely coordinate deployment updates." + ), + ) + + try: + return _get_async_redis_client() + except Exception: + logger.exception( + "Redis messaging broker is configured, but Redis client initialization failed." + ) + raise HTTPException( + status_code=status.HTTP_503_SERVICE_UNAVAILABLE, + detail=( + "Redis messaging broker is configured but unavailable. " + "Unable to safely coordinate deployment updates." + ), + ) + + +@asynccontextmanager +async def deployment_schedule_lock(lock_key: str) -> AsyncIterator[None]: + """Acquire a lock for modifying a deployment's schedules. + + Uses a Redis distributed lock when ``prefect-redis`` is configured as the + messaging broker (typical for HA / multi-server deployments). Falls back to + a per-key ``asyncio.Lock`` for single-server deployments. + + This prevents race conditions when multiple concurrent requests attempt to + modify the same deployment, which could lead to duplicate scheduled runs. + + Args: + lock_key: A unique key identifying the deployment. This should be + either the ``deployment_id`` (for updates) or a combination of + ``flow_id`` + ``name`` (for creates / upserts). + """ + redis_client = _get_redis_client() + if redis_client is not None: + import redis.exceptions + + try: + async with redis_client.lock( + name=f"deployment-schedule-update:{lock_key}", + blocking=False, + timeout=DEPLOYMENT_SCHEDULE_LOCK_TIMEOUT, + ): + yield + except redis.exceptions.LockError: + raise HTTPException( + status_code=status.HTTP_409_CONFLICT, + detail=( + "Another update to this deployment is in progress. " + "Please try again." + ), + ) + else: + # Single-server fallback: per-deployment asyncio lock. + lock = _deployment_locks[lock_key] + try: + acquired = lock.locked() + if acquired: + # Non-blocking: if the lock is already held, fail fast like + # the Redis path to match Nebula's ``blocking=False`` behavior. + raise HTTPException( + status_code=status.HTTP_409_CONFLICT, + detail=( + "Another update to this deployment is in progress. " + "Please try again." + ), + ) + async with lock: + yield + finally: + # Clean up locks for deployment ids that are no longer contended + # to avoid unbounded growth of _deployment_locks. + if not lock.locked(): + _deployment_locks.pop(lock_key, None) + def _multiple_schedules_error(deployment_id) -> HTTPException: return HTTPException( @@ -85,129 +192,133 @@ async def create_deployment( data["created_by"] = created_by.model_dump() if created_by else None data["updated_by"] = updated_by.model_dump() if created_by else None - async with db.session_context(begin_transaction=True) as session: - if ( - deployment.work_pool_name - and deployment.work_pool_name != DEFAULT_AGENT_WORK_POOL_NAME - ): - # Make sure that deployment is valid before beginning creation process - work_pool = await models.workers.read_work_pool_by_name( - session=session, work_pool_name=deployment.work_pool_name - ) - if work_pool is None: - raise HTTPException( - status_code=status.HTTP_404_NOT_FOUND, - detail=f'Work pool "{deployment.work_pool_name}" not found.', + # Lock on flow_id + name to prevent concurrent creates/upserts from + # racing and producing duplicate schedules. + lock_key = f"{deployment.flow_id}:{deployment.name}" + async with deployment_schedule_lock(lock_key): + async with db.session_context(begin_transaction=True) as session: + if ( + deployment.work_pool_name + and deployment.work_pool_name != DEFAULT_AGENT_WORK_POOL_NAME + ): + # Make sure that deployment is valid before beginning creation process + work_pool = await models.workers.read_work_pool_by_name( + session=session, work_pool_name=deployment.work_pool_name ) + if work_pool is None: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail=f'Work pool "{deployment.work_pool_name}" not found.', + ) - await validate_job_variables_for_deployment( - session, - work_pool, - deployment, + await validate_job_variables_for_deployment( + session, + work_pool, + deployment, + ) + + # hydrate the input model into a full model + deployment_dict: dict = deployment.model_dump( + exclude={"work_pool_name"}, + exclude_unset=True, ) - # hydrate the input model into a full model - deployment_dict: dict = deployment.model_dump( - exclude={"work_pool_name"}, - exclude_unset=True, - ) + requested_concurrency_limit = deployment_dict.pop( + "global_concurrency_limit_id", "unset" + ) + if requested_concurrency_limit != "unset": + if requested_concurrency_limit: + concurrency_limit = ( + await models.concurrency_limits_v2.read_concurrency_limit( + session=session, + concurrency_limit_id=requested_concurrency_limit, + ) + ) - requested_concurrency_limit = deployment_dict.pop( - "global_concurrency_limit_id", "unset" - ) - if requested_concurrency_limit != "unset": - if requested_concurrency_limit: - concurrency_limit = ( - await models.concurrency_limits_v2.read_concurrency_limit( + if not concurrency_limit: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail="Concurrency limit not found", + ) + + deployment_dict["concurrency_limit_id"] = requested_concurrency_limit + + if deployment.work_pool_name and deployment.work_queue_name: + # If a specific pool name/queue name combination was provided, get the + # ID for that work pool queue. + deployment_dict[ + "work_queue_id" + ] = await worker_lookups._get_work_queue_id_from_name( + session=session, + work_pool_name=deployment.work_pool_name, + work_queue_name=deployment.work_queue_name, + create_queue_if_not_found=True, + ) + elif deployment.work_pool_name: + # If just a pool name was provided, get the ID for its default + # work pool queue. + deployment_dict[ + "work_queue_id" + ] = await worker_lookups._get_default_work_queue_id_from_work_pool_name( + session=session, + work_pool_name=deployment.work_pool_name, + ) + elif deployment.work_queue_name: + # If just a queue name was provided, ensure that the queue exists and + # get its ID. + work_queue = await models.work_queues.ensure_work_queue_exists( + session=session, name=deployment.work_queue_name + ) + deployment_dict["work_queue_id"] = work_queue.id + + deployment = schemas.core.Deployment(**deployment_dict) + # check to see if relevant blocks exist, allowing us throw a useful error message + # for debugging + if deployment.infrastructure_document_id is not None: + infrastructure_block = ( + await models.block_documents.read_block_document_by_id( session=session, - concurrency_limit_id=requested_concurrency_limit, + block_document_id=deployment.infrastructure_document_id, ) ) - - if not concurrency_limit: + if not infrastructure_block: raise HTTPException( - status_code=status.HTTP_404_NOT_FOUND, - detail="Concurrency limit not found", + status_code=status.HTTP_409_CONFLICT, + detail=( + "Error creating deployment. Could not find infrastructure" + f" block with id: {deployment.infrastructure_document_id}. This" + " usually occurs when applying a deployment specification that" + " was built against a different Prefect database / workspace." + ), ) - deployment_dict["concurrency_limit_id"] = requested_concurrency_limit - - if deployment.work_pool_name and deployment.work_queue_name: - # If a specific pool name/queue name combination was provided, get the - # ID for that work pool queue. - deployment_dict[ - "work_queue_id" - ] = await worker_lookups._get_work_queue_id_from_name( - session=session, - work_pool_name=deployment.work_pool_name, - work_queue_name=deployment.work_queue_name, - create_queue_if_not_found=True, - ) - elif deployment.work_pool_name: - # If just a pool name was provided, get the ID for its default - # work pool queue. - deployment_dict[ - "work_queue_id" - ] = await worker_lookups._get_default_work_queue_id_from_work_pool_name( - session=session, - work_pool_name=deployment.work_pool_name, - ) - elif deployment.work_queue_name: - # If just a queue name was provided, ensure that the queue exists and - # get its ID. - work_queue = await models.work_queues.ensure_work_queue_exists( - session=session, name=deployment.work_queue_name - ) - deployment_dict["work_queue_id"] = work_queue.id - - deployment = schemas.core.Deployment(**deployment_dict) - # check to see if relevant blocks exist, allowing us throw a useful error message - # for debugging - if deployment.infrastructure_document_id is not None: - infrastructure_block = ( - await models.block_documents.read_block_document_by_id( + if deployment.storage_document_id is not None: + storage_block = await models.block_documents.read_block_document_by_id( session=session, - block_document_id=deployment.infrastructure_document_id, - ) - ) - if not infrastructure_block: - raise HTTPException( - status_code=status.HTTP_409_CONFLICT, - detail=( - "Error creating deployment. Could not find infrastructure" - f" block with id: {deployment.infrastructure_document_id}. This" - " usually occurs when applying a deployment specification that" - " was built against a different Prefect database / workspace." - ), + block_document_id=deployment.storage_document_id, ) + if not storage_block: + raise HTTPException( + status_code=status.HTTP_409_CONFLICT, + detail=( + "Error creating deployment. Could not find storage block with" + f" id: {deployment.storage_document_id}. This usually occurs" + " when applying a deployment specification that was built" + " against a different Prefect database / workspace." + ), + ) - if deployment.storage_document_id is not None: - storage_block = await models.block_documents.read_block_document_by_id( - session=session, - block_document_id=deployment.storage_document_id, + right_now = now("UTC") + model = await models.deployments.create_deployment( + session=session, deployment=deployment ) - if not storage_block: - raise HTTPException( - status_code=status.HTTP_409_CONFLICT, - detail=( - "Error creating deployment. Could not find storage block with" - f" id: {deployment.storage_document_id}. This usually occurs" - " when applying a deployment specification that was built" - " against a different Prefect database / workspace." - ), - ) - right_now = now("UTC") - model = await models.deployments.create_deployment( - session=session, deployment=deployment - ) - - if model.created >= right_now: - response.status_code = status.HTTP_201_CREATED + if model.created >= right_now: + response.status_code = status.HTTP_201_CREATED - return schemas.responses.DeploymentResponse.model_validate( - model, from_attributes=True - ) + return schemas.responses.DeploymentResponse.model_validate( + model, from_attributes=True + ) @router.patch("/{id:uuid}", status_code=status.HTTP_204_NO_CONTENT) @@ -216,7 +327,9 @@ async def update_deployment( deployment_id: UUID = Path(..., description="The deployment id", alias="id"), db: PrefectDBInterface = Depends(provide_database_interface), ) -> None: - async with db.session_context(begin_transaction=True) as session: + # Resolve lock key from deployment identity so PATCH and POST/upsert + # contend on the same key. + async with db.session_context() as session: existing_deployment = await models.deployments.read_deployment( session=session, deployment_id=deployment_id ) @@ -224,248 +337,270 @@ async def update_deployment( raise HTTPException( status.HTTP_404_NOT_FOUND, detail="Deployment not found." ) + lock_key = f"{existing_deployment.flow_id}:{existing_deployment.name}" - # Checking how we should handle schedule updates - # If not all existing schedules have slugs then we'll fall back to the existing logic where are schedules are recreated to match the request. - # If the existing schedules have slugs, but not all provided schedules have slugs, then we'll return a 422 to avoid accidentally blowing away schedules. - # Otherwise, we'll use the existing slugs and the provided slugs to make targeted updates to the deployment's schedules. - schedules_to_patch: list[schemas.actions.DeploymentScheduleUpdate] = [] - schedules_to_create: list[schemas.actions.DeploymentScheduleUpdate] = [] - all_provided_have_slugs = all( - schedule.slug is not None for schedule in deployment.schedules or [] - ) - all_existing_have_slugs = existing_deployment.schedules and all( - schedule.slug is not None for schedule in existing_deployment.schedules - ) - if all_provided_have_slugs and all_existing_have_slugs: - current_slugs = [ - schedule.slug for schedule in existing_deployment.schedules - ] - - # Check for duplicate replaces targets - replaces_targets: dict[str, str] = {} # old_slug -> new_slug - for schedule in deployment.schedules or []: - if schedule.replaces: - if schedule.replaces in replaces_targets: + async with deployment_schedule_lock(lock_key): + async with db.session_context(begin_transaction=True) as session: + existing_deployment = await models.deployments.read_deployment( + session=session, deployment_id=deployment_id + ) + if not existing_deployment: + raise HTTPException( + status.HTTP_404_NOT_FOUND, detail="Deployment not found." + ) + + # Checking how we should handle schedule updates + # If not all existing schedules have slugs then we'll fall back to the existing logic where are schedules are recreated to match the request. + # If the existing schedules have slugs, but not all provided schedules have slugs, then we'll return a 422 to avoid accidentally blowing away schedules. + # Otherwise, we'll use the existing slugs and the provided slugs to make targeted updates to the deployment's schedules. + schedules_to_patch: list[schemas.actions.DeploymentScheduleUpdate] = [] + schedules_to_create: list[schemas.actions.DeploymentScheduleUpdate] = [] + all_provided_have_slugs = all( + schedule.slug is not None for schedule in deployment.schedules or [] + ) + all_existing_have_slugs = existing_deployment.schedules and all( + schedule.slug is not None for schedule in existing_deployment.schedules + ) + if all_provided_have_slugs and all_existing_have_slugs: + current_slugs = [ + schedule.slug for schedule in existing_deployment.schedules + ] + + # Check for duplicate replaces targets + replaces_targets: dict[str, str] = {} # old_slug -> new_slug + for schedule in deployment.schedules or []: + if schedule.replaces: + if schedule.replaces in replaces_targets: + raise HTTPException( + status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, + detail=f"Multiple schedules have 'replaces' targeting the same slug: {schedule.replaces}", + ) + replaces_targets[schedule.replaces] = schedule.slug or "" + + # Check for slug collisions: if a schedule's new slug already + # exists and that existing slug is not being replaced, it's a + # collision + slugs_being_replaced = set(replaces_targets.keys()) + for schedule in deployment.schedules: + if ( + schedule.slug + and schedule.slug in current_slugs + and schedule.slug not in slugs_being_replaced + and schedule.replaces + ): raise HTTPException( status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, - detail=f"Multiple schedules have 'replaces' targeting the same slug: {schedule.replaces}", + detail=f"Cannot rename schedule from '{schedule.replaces}' to '{schedule.slug}': " + f"a schedule with slug '{schedule.slug}' already exists.", ) - replaces_targets[schedule.replaces] = schedule.slug or "" - - # Check for slug collisions: if a schedule's new slug already - # exists and that existing slug is not being replaced, it's a - # collision - slugs_being_replaced = set(replaces_targets.keys()) - for schedule in deployment.schedules: - if ( - schedule.slug - and schedule.slug in current_slugs - and schedule.slug not in slugs_being_replaced - and schedule.replaces - ): - raise HTTPException( - status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, - detail=f"Cannot rename schedule from '{schedule.replaces}' to '{schedule.slug}': " - f"a schedule with slug '{schedule.slug}' already exists.", - ) - for schedule in deployment.schedules: - # Check if this schedule replaces an existing one - target_slug = schedule.replaces if schedule.replaces else schedule.slug - - if target_slug in current_slugs: - schedules_to_patch.append(schedule) - elif schedule.replaces: - # replaces points to a non-existent slug - warn and create new - logger.warning( - f"Schedule with slug '{schedule.slug}' has 'replaces: {schedule.replaces}' " - f"but no schedule with slug '{schedule.replaces}' exists. Creating new schedule." + for schedule in deployment.schedules: + # Check if this schedule replaces an existing one + target_slug = ( + schedule.replaces if schedule.replaces else schedule.slug ) - if schedule.schedule: + + if target_slug in current_slugs: + schedules_to_patch.append(schedule) + elif schedule.replaces: + # replaces points to a non-existent slug - warn and create new + logger.warning( + f"Schedule with slug '{schedule.slug}' has 'replaces: {schedule.replaces}' " + f"but no schedule with slug '{schedule.replaces}' exists. Creating new schedule." + ) + if schedule.schedule: + schedules_to_create.append(schedule) + else: + raise HTTPException( + status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, + detail="Unable to create new deployment schedules without a schedule configuration.", + ) + elif schedule.schedule: schedules_to_create.append(schedule) else: raise HTTPException( status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, detail="Unable to create new deployment schedules without a schedule configuration.", ) - elif schedule.schedule: - schedules_to_create.append(schedule) - else: - raise HTTPException( - status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, - detail="Unable to create new deployment schedules without a schedule configuration.", - ) - # Clear schedules to handle their update/creation separately - deployment.schedules = None - elif not all_provided_have_slugs and all_existing_have_slugs: - raise HTTPException( - status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, - detail="Please provide a slug for each schedule in your request to ensure schedules are updated correctly.", - ) - - if deployment.work_pool_name: - # Make sure that deployment is valid before beginning creation process - work_pool = await models.workers.read_work_pool_by_name( - session=session, work_pool_name=deployment.work_pool_name - ) - try: - deployment.check_valid_configuration(work_pool.base_job_template) - except (MissingVariableError, jsonschema.exceptions.ValidationError) as exc: + # Clear schedules to handle their update/creation separately + deployment.schedules = None + elif not all_provided_have_slugs and all_existing_have_slugs: raise HTTPException( - status_code=status.HTTP_409_CONFLICT, - detail=f"Error creating deployment: {exc!r}", + status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, + detail="Please provide a slug for each schedule in your request to ensure schedules are updated correctly.", ) - if deployment.parameters is not None: - try: - dehydrated_params = deployment.parameters - ctx = await HydrationContext.build( - session=session, - raise_on_error=True, - render_jinja=True, - render_workspace_variables=True, - ) - parameters = hydrate(dehydrated_params, ctx) - deployment.parameters = parameters - except HydrationError as exc: - raise HTTPException( - status.HTTP_400_BAD_REQUEST, - detail=f"Error hydrating deployment parameters: {exc}", + if deployment.work_pool_name: + # Make sure that deployment is valid before beginning creation process + work_pool = await models.workers.read_work_pool_by_name( + session=session, work_pool_name=deployment.work_pool_name ) - else: - parameters = existing_deployment.parameters + try: + deployment.check_valid_configuration(work_pool.base_job_template) + except ( + MissingVariableError, + jsonschema.exceptions.ValidationError, + ) as exc: + raise HTTPException( + status_code=status.HTTP_409_CONFLICT, + detail=f"Error creating deployment: {exc!r}", + ) - enforce_parameter_schema = ( - deployment.enforce_parameter_schema - if deployment.enforce_parameter_schema is not None - else existing_deployment.enforce_parameter_schema - ) - if enforce_parameter_schema: - # ensure that the new parameters conform to the proposed schema - if deployment.parameter_openapi_schema: - openapi_schema = deployment.parameter_openapi_schema + if deployment.parameters is not None: + try: + dehydrated_params = deployment.parameters + ctx = await HydrationContext.build( + session=session, + raise_on_error=True, + render_jinja=True, + render_workspace_variables=True, + ) + parameters = hydrate(dehydrated_params, ctx) + deployment.parameters = parameters + except HydrationError as exc: + raise HTTPException( + status.HTTP_400_BAD_REQUEST, + detail=f"Error hydrating deployment parameters: {exc}", + ) else: - openapi_schema = existing_deployment.parameter_openapi_schema + parameters = existing_deployment.parameters - if not isinstance(openapi_schema, dict): - raise HTTPException( - status.HTTP_409_CONFLICT, - detail=( - "Error updating deployment: Cannot update parameters because" - " parameter schema enforcement is enabled and the deployment" - " does not have a valid parameter schema." - ), - ) - try: - validate( - parameters, - openapi_schema, - raise_on_error=True, - ignore_required=True, - ) - except ValidationError as exc: - raise HTTPException( - status.HTTP_409_CONFLICT, - detail=f"Error updating deployment: {exc}", - ) - except CircularSchemaRefError: - raise HTTPException( - status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, - detail="Invalid schema: Unable to validate schema with circular references.", - ) - - if deployment.global_concurrency_limit_id: - concurrency_limit = ( - await models.concurrency_limits_v2.read_concurrency_limit( - session=session, - concurrency_limit_id=deployment.global_concurrency_limit_id, - ) + enforce_parameter_schema = ( + deployment.enforce_parameter_schema + if deployment.enforce_parameter_schema is not None + else existing_deployment.enforce_parameter_schema ) + if enforce_parameter_schema: + # ensure that the new parameters conform to the proposed schema + if deployment.parameter_openapi_schema: + openapi_schema = deployment.parameter_openapi_schema + else: + openapi_schema = existing_deployment.parameter_openapi_schema - if not concurrency_limit: - raise HTTPException( - status_code=status.HTTP_404_NOT_FOUND, - detail="Concurrency limit not found", - ) - - result = await models.deployments.update_deployment( - session=session, - deployment_id=deployment_id, - deployment=deployment, - ) - - # Phase 1: For schedules with `replaces`, look up the row ID by - # old slug, then NULL out those slugs so the unique index on - # (deployment_id, slug) won't block the renames. - renamed_schedule_ids: dict[str, UUID] = {} - renamed_old_slugs: list[str] = [] - for schedule in schedules_to_patch: - if schedule.replaces: - row = ( - await session.execute( - sa.select(db.DeploymentSchedule.id).where( - sa.and_( - db.DeploymentSchedule.deployment_id == deployment_id, - db.DeploymentSchedule.slug == schedule.replaces, - ) - ) + if not isinstance(openapi_schema, dict): + raise HTTPException( + status.HTTP_409_CONFLICT, + detail=( + "Error updating deployment: Cannot update parameters" + " because parameter schema enforcement is enabled and" + " the deployment does not have a valid parameter" + " schema." + ), ) - ).scalar_one_or_none() - if row is None: + try: + validate( + parameters, + openapi_schema, + raise_on_error=True, + ignore_required=True, + ) + except ValidationError as exc: raise HTTPException( - status_code=status.HTTP_404_NOT_FOUND, - detail=f"Schedule with slug '{schedule.replaces}' no longer exists.", + status.HTTP_409_CONFLICT, + detail=f"Error updating deployment: {exc}", ) - renamed_schedule_ids[schedule.replaces] = row - renamed_old_slugs.append(schedule.replaces) - - if renamed_old_slugs: - await session.execute( - sa.update(db.DeploymentSchedule) - .where( - sa.and_( - db.DeploymentSchedule.deployment_id == deployment_id, - db.DeploymentSchedule.slug.in_(renamed_old_slugs), + except CircularSchemaRefError: + raise HTTPException( + status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, + detail="Invalid schema: Unable to validate schema with circular references.", + ) + + if deployment.global_concurrency_limit_id: + concurrency_limit = ( + await models.concurrency_limits_v2.read_concurrency_limit( + session=session, + concurrency_limit_id=deployment.global_concurrency_limit_id, ) ) - .values(slug=None) + + if not concurrency_limit: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail="Concurrency limit not found", + ) + + result = await models.deployments.update_deployment( + session=session, + deployment_id=deployment_id, + deployment=deployment, ) - await session.flush() - # Phase 2: Apply the actual updates. Renamed schedules use - # their row ID (slug is now NULL); others use slug as before. - for schedule in schedules_to_patch: - if schedule.replaces: - await models.deployments.update_deployment_schedule( - session=session, - deployment_id=deployment_id, - schedule=schedule, - deployment_schedule_id=renamed_schedule_ids[schedule.replaces], + # Phase 1: For schedules with `replaces`, look up the row ID by + # old slug, then NULL out those slugs so the unique index on + # (deployment_id, slug) won't block the renames. + renamed_schedule_ids: dict[str, UUID] = {} + renamed_old_slugs: list[str] = [] + for schedule in schedules_to_patch: + if schedule.replaces: + row = ( + await session.execute( + sa.select(db.DeploymentSchedule.id).where( + sa.and_( + db.DeploymentSchedule.deployment_id + == deployment_id, + db.DeploymentSchedule.slug == schedule.replaces, + ) + ) + ) + ).scalar_one_or_none() + if row is None: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail=f"Schedule with slug '{schedule.replaces}' no longer exists.", + ) + renamed_schedule_ids[schedule.replaces] = row + renamed_old_slugs.append(schedule.replaces) + + if renamed_old_slugs: + await session.execute( + sa.update(db.DeploymentSchedule) + .where( + sa.and_( + db.DeploymentSchedule.deployment_id == deployment_id, + db.DeploymentSchedule.slug.in_(renamed_old_slugs), + ) + ) + .values(slug=None) ) - else: - await models.deployments.update_deployment_schedule( + await session.flush() + + # Phase 2: Apply the actual updates. Renamed schedules use + # their row ID (slug is now NULL); others use slug as before. + for schedule in schedules_to_patch: + if schedule.replaces: + await models.deployments.update_deployment_schedule( + session=session, + deployment_id=deployment_id, + schedule=schedule, + deployment_schedule_id=renamed_schedule_ids[schedule.replaces], + ) + else: + await models.deployments.update_deployment_schedule( + session=session, + deployment_id=deployment_id, + schedule=schedule, + deployment_schedule_slug=schedule.slug, + ) + if schedules_to_create: + await models.deployments.create_deployment_schedules( session=session, deployment_id=deployment_id, - schedule=schedule, - deployment_schedule_slug=schedule.slug, + schedules=[ + schemas.actions.DeploymentScheduleCreate( + schedule=schedule.schedule, # type: ignore We will raise above if schedule is not provided + active=schedule.active + if schedule.active is not None + else True, + slug=schedule.slug, + parameters=schedule.parameters, + ) + for schedule in schedules_to_create + ], ) - if schedules_to_create: - await models.deployments.create_deployment_schedules( - session=session, - deployment_id=deployment_id, - schedules=[ - schemas.actions.DeploymentScheduleCreate( - schedule=schedule.schedule, # type: ignore We will raise above if schedule is not provided - active=schedule.active if schedule.active is not None else True, - slug=schedule.slug, - parameters=schedule.parameters, - ) - for schedule in schedules_to_create - ], + if not result: + raise HTTPException( + status.HTTP_404_NOT_FOUND, detail="Deployment not found." ) - if not result: - raise HTTPException(status.HTTP_404_NOT_FOUND, detail="Deployment not found.") @router.get("/name/{flow_name}/{deployment_name}") diff --git a/src/prefect/server/models/deployments.py b/src/prefect/server/models/deployments.py index b1fa030035cb..0918475c070e 100644 --- a/src/prefect/server/models/deployments.py +++ b/src/prefect/server/models/deployments.py @@ -170,21 +170,45 @@ async def create_deployment( future_only=True, ) - await delete_schedules_for_deployment(session=session, deployment_id=deployment_id) + # For upserts, preserve schedule IDs when possible by reconciling schedules + # positionally instead of deleting and recreating all rows. + existing_schedules = await read_deployment_schedules( + session=session, deployment_id=deployment_id + ) + existing_schedules.sort(key=lambda s: s.id) + + for i, new_schedule in enumerate(schedules): + if i < len(existing_schedules): + await update_deployment_schedule( + session=session, + deployment_id=deployment_id, + deployment_schedule_id=existing_schedules[i].id, + schedule=schemas.actions.DeploymentScheduleUpdate( + schedule=new_schedule.schedule, + active=new_schedule.active, + parameters=new_schedule.parameters, + slug=new_schedule.slug, + ), + ) + else: + await create_deployment_schedules( + session=session, + deployment_id=deployment_id, + schedules=[ + schemas.actions.DeploymentScheduleCreate( + schedule=new_schedule.schedule, + active=new_schedule.active, + parameters=new_schedule.parameters, + slug=new_schedule.slug, + ) + ], + ) - if schedules: - await create_deployment_schedules( + for i in range(len(schedules), len(existing_schedules)): + await delete_deployment_schedule( session=session, deployment_id=deployment_id, - schedules=[ - schemas.actions.DeploymentScheduleCreate( - schedule=schedule.schedule, - active=schedule.active, - parameters=schedule.parameters, - slug=schedule.slug, - ) - for schedule in schedules - ], + deployment_schedule_id=existing_schedules[i].id, ) if requested_concurrency_limit != "unset": @@ -296,25 +320,62 @@ async def update_deployment( ) if should_update_schedules: - # If schedules were provided, remove the existing schedules and - # replace them with the new ones. - await delete_schedules_for_deployment( + # Update schedules in place when possible to preserve their IDs. + # When schedule IDs are preserved, the idempotency key for + # auto-scheduled flow runs (which includes the schedule ID) remains + # stable across redeployments, preventing the scheduler from creating + # duplicate runs due to changed idempotency keys. + existing_schedules = await read_deployment_schedules( session=session, deployment_id=deployment_id ) - await create_deployment_schedules( - session=session, - deployment_id=deployment_id, - schedules=[ - schemas.actions.DeploymentScheduleCreate( - schedule=schedule.schedule, - active=schedule.active if schedule.active is not None else True, - parameters=schedule.parameters, - slug=schedule.slug, + # Sort by id for stable positional matching (UUIDv7 IDs are + # time-ordered, so this reflects creation order). + existing_schedules.sort(key=lambda s: s.id) + + new_schedules = [ + schedule for schedule in schedules if schedule.schedule is not None + ] + + # Update existing schedules in place by position + for i, new_schedule in enumerate(new_schedules): + if i < len(existing_schedules): + await update_deployment_schedule( + session=session, + deployment_id=deployment_id, + deployment_schedule_id=existing_schedules[i].id, + schedule=schemas.actions.DeploymentScheduleUpdate( + schedule=new_schedule.schedule, + active=new_schedule.active + if new_schedule.active is not None + else True, + parameters=new_schedule.parameters, + slug=new_schedule.slug, + ), ) - for schedule in schedules - if schedule.schedule is not None - ], - ) + else: + # More new schedules than existing ones: create the extras + await create_deployment_schedules( + session=session, + deployment_id=deployment_id, + schedules=[ + schemas.actions.DeploymentScheduleCreate( + schedule=new_schedule.schedule, + active=new_schedule.active + if new_schedule.active is not None + else True, + parameters=new_schedule.parameters, + slug=new_schedule.slug, + ) + ], + ) + + # Delete any extra old schedules beyond what's needed + for i in range(len(new_schedules), len(existing_schedules)): + await delete_deployment_schedule( + session=session, + deployment_id=deployment_id, + deployment_schedule_id=existing_schedules[i].id, + ) if requested_concurrency_limit_update != "unset": await _create_or_update_deployment_concurrency_limit( diff --git a/tests/server/orchestration/api/test_deployments.py b/tests/server/orchestration/api/test_deployments.py index b1d3e75af476..278c54a094f7 100644 --- a/tests/server/orchestration/api/test_deployments.py +++ b/tests/server/orchestration/api/test_deployments.py @@ -1,15 +1,19 @@ +import asyncio import datetime +from contextlib import asynccontextmanager from typing import List from uuid import uuid4 import pytest import sqlalchemy as sa +from fastapi import HTTPException from httpx._client import AsyncClient from starlette import status from prefect._internal.testing import retry_asserts from prefect.client.schemas.responses import DeploymentResponse from prefect.server import models, schemas +from prefect.server.api import deployments as deployments_api from prefect.server.database.orm_models import Flow from prefect.server.events.clients import AssertingEventsClient from prefect.server.schemas.actions import DeploymentCreate, DeploymentUpdate @@ -347,6 +351,75 @@ async def test_create_deployment_populates_and_returned_created( assert parse_datetime(response.json()["created"]) >= current_time assert parse_datetime(response.json()["updated"]) >= current_time + async def test_create_deployment_upsert_preserves_schedule_ids_without_slugs( + self, client, flow + ): + schedule1 = schemas.schedules.IntervalSchedule( + interval=datetime.timedelta(days=1) + ) + schedule2 = schemas.schedules.IntervalSchedule( + interval=datetime.timedelta(days=2) + ) + create_data = DeploymentCreate( # type: ignore + name="Schedule ID Stability", + flow_id=flow.id, + schedules=[ + schemas.actions.DeploymentScheduleCreate( + schedule=schedule1, + active=True, + ), + schemas.actions.DeploymentScheduleCreate( + schedule=schedule2, + active=False, + ), + ], + ).model_dump(mode="json") + create_response = await client.post("/deployments/", json=create_data) + assert create_response.status_code == status.HTTP_201_CREATED + deployment_id = create_response.json()["id"] + original_schedule_ids = [ + schedule["id"] for schedule in create_response.json()["schedules"] + ] + + schedule3 = schemas.schedules.IntervalSchedule( + interval=datetime.timedelta(days=3) + ) + schedule4 = schemas.schedules.IntervalSchedule( + interval=datetime.timedelta(days=7) + ) + upsert_data = DeploymentCreate( # type: ignore + name="Schedule ID Stability", + flow_id=flow.id, + schedules=[ + schemas.actions.DeploymentScheduleCreate( + schedule=schedule3, + active=True, + ), + schemas.actions.DeploymentScheduleCreate( + schedule=schedule4, + active=False, + ), + ], + ).model_dump(mode="json") + upsert_response = await client.post("/deployments/", json=upsert_data) + assert upsert_response.status_code == status.HTTP_200_OK + assert upsert_response.json()["id"] == deployment_id + + schedules = [ + schemas.core.DeploymentSchedule(**s) + for s in upsert_response.json()["schedules"] + ] + assert len(schedules) == 2 + assert set(str(schedule.id) for schedule in schedules) == set( + original_schedule_ids + ) + + schedules_by_interval = {s.schedule.interval: s for s in schedules} + assert schedule3.interval in schedules_by_interval + assert schedules_by_interval[schedule3.interval].active is True + assert schedule4.interval in schedules_by_interval + assert schedules_by_interval[schedule4.interval].active is False + async def test_creating_deployment_with_inactive_schedule_creates_no_runs( self, session, client, flow ): @@ -1583,6 +1656,86 @@ async def test_paginate_deployments_returns_empty_list(self, client): class TestUpdateDeployment: + async def test_post_and_patch_contend_on_same_schedule_lock_key( + self, client, flow, monkeypatch + ): + data = DeploymentCreate( + name="lock-shared-deployment", + flow_id=flow.id, + ).model_dump(mode="json") + create_response = await client.post("/deployments/", json=data) + assert create_response.status_code == status.HTTP_201_CREATED + deployment_id = create_response.json()["id"] + deployment_name = create_response.json()["name"] + + lock_keys: list[str] = [] + held_keys: set[str] = set() + first_lock_acquired = asyncio.Event() + release_first_lock = asyncio.Event() + + @asynccontextmanager + async def fake_deployment_schedule_lock(lock_key: str): + lock_keys.append(lock_key) + if lock_key in held_keys: + raise HTTPException( + status_code=status.HTTP_409_CONFLICT, + detail="Another update to this deployment is in progress.", + ) + + held_keys.add(lock_key) + try: + if not first_lock_acquired.is_set(): + first_lock_acquired.set() + await release_first_lock.wait() + yield + finally: + held_keys.remove(lock_key) + + monkeypatch.setattr( + deployments_api, + "deployment_schedule_lock", + fake_deployment_schedule_lock, + ) + + patch_task = asyncio.create_task( + client.patch( + f"/deployments/{deployment_id}", + json={"description": "patched while lock held"}, + ) + ) + await first_lock_acquired.wait() + + upsert_response = await client.post("/deployments/", json=data) + assert upsert_response.status_code == status.HTTP_409_CONFLICT + + release_first_lock.set() + patch_response = await patch_task + assert patch_response.status_code == status.HTTP_204_NO_CONTENT + assert lock_keys == [ + f"{flow.id}:{deployment_name}", + f"{flow.id}:{deployment_name}", + ] + + async def test_deployment_schedule_lock_fails_closed_when_redis_unavailable( + self, monkeypatch + ): + class FakeSettings: + class server: + class events: + messaging_broker = "prefect_redis.messaging" + + monkeypatch.setattr( + deployments_api, "get_current_settings", lambda: FakeSettings + ) + monkeypatch.setattr(deployments_api, "_get_async_redis_client", None) + + with pytest.raises(HTTPException) as exc: + async with deployments_api.deployment_schedule_lock("deployment-lock-key"): + pass + + assert exc.value.status_code == status.HTTP_503_SERVICE_UNAVAILABLE + assert "configured but unavailable" in str(exc.value.detail) + async def test_update_deployment_with_schedule_allows_addition_of_concurrency( self, client, deployment ): @@ -2063,15 +2216,18 @@ async def test_update_deployment_with_multiple_schedules( ] assert len(schedules) == 2 - assert [schedule.id for schedule in schedules] != original_schedule_ids - - assert isinstance(schedules[0].schedule, schemas.schedules.IntervalSchedule) - assert schedules[0].schedule.interval == schedule4.interval - assert schedules[0].active is False - - assert isinstance(schedules[1].schedule, schemas.schedules.IntervalSchedule) - assert schedules[1].schedule.interval == schedule3.interval - assert schedules[1].active is True + # Schedule IDs are preserved because schedules are updated in place + # (rather than deleted and recreated) to keep idempotency keys stable. + assert set(str(schedule.id) for schedule in schedules) == set( + original_schedule_ids + ) + + # Verify both schedule configs were applied (order-independent) + schedules_by_interval = {s.schedule.interval: s for s in schedules} + assert schedule3.interval in schedules_by_interval + assert schedules_by_interval[schedule3.interval].active is True + assert schedule4.interval in schedules_by_interval + assert schedules_by_interval[schedule4.interval].active is False async def test_update_deployment_with_multiple_schedules_and_existing_slugs( self, diff --git a/ui-v2/src/api/prefect.ts b/ui-v2/src/api/prefect.ts index bb88acd124e3..6ee23d813dfe 100644 --- a/ui-v2/src/api/prefect.ts +++ b/ui-v2/src/api/prefect.ts @@ -8319,7 +8319,7 @@ export interface components { */ IntervalSchedule: { /** Interval */ - interval: number; + interval: number | string; /** * Anchor Date * Format: date-time