diff --git a/.cspell.json b/.cspell.json index 4d0f70a51ab..fb552061112 100644 --- a/.cspell.json +++ b/.cspell.json @@ -777,6 +777,9 @@ "lsyncd", "awk", "gawk", + "picklable", + "ndarray", + "mult", "ruleid" ] } diff --git a/dashboard/src/components/group/UpdateReleaseGroupDialog.vue b/dashboard/src/components/group/UpdateReleaseGroupDialog.vue index 18398d40af8..cc23a280d08 100644 --- a/dashboard/src/components/group/UpdateReleaseGroupDialog.vue +++ b/dashboard/src/components/group/UpdateReleaseGroupDialog.vue @@ -536,17 +536,9 @@ export default { throw new DashboardError('Please select an app to proceed'); } }, - onSuccess(candidate) { - this.$router.push({ - name: 'Deploy Candidate', - params: { - id: candidate, - name: this.bench, - }, - }); - this.restrictMessage = ''; + onSuccess() { this.show = false; - this.$emit('success', candidate); + this.$emit('success', null); }, onError: this.setErrorMessage.bind(this), }; diff --git a/dashboard/src/objects/group.js b/dashboard/src/objects/group.js index 80f05e240a4..b56aba48ced 100644 --- a/dashboard/src/objects/group.js +++ b/dashboard/src/objects/group.js @@ -10,8 +10,8 @@ import PatchAppDialog from '../components/group/PatchAppDialog.vue'; import { getTeam, switchToTeam } from '../data/team'; import router from '../router'; import { confirmDialog, icon, renderDialog } from '../utils/components'; -import { getToastErrorMessage } from '../utils/toast'; import { date, duration } from '../utils/format'; +import { getToastErrorMessage } from '../utils/toast'; import { getJobsTab } from './common/jobs'; import { getPatchesTab } from './common/patches'; import { tagTab } from './common/tags'; @@ -510,14 +510,17 @@ export default { } else if (group.doc.deploy_information.update_available) { let UpdateReleaseGroupDialog = defineAsyncComponent( () => - import('../components/group/UpdateReleaseGroupDialog.vue'), + import( + '../components/group/UpdateReleaseGroupDialog.vue' + ), ); renderDialog( h(UpdateReleaseGroupDialog, { bench: group.name, lastDeploy: true, onSuccess(candidate) { - group.doc.deploy_information.deploy_in_progress = true; + group.doc.deploy_information.has_running_release_pipeline = true; + group.doc.deploy_information.update_available = false; if (candidate) { group.doc.deploy_information.last_deploy.name = candidate; @@ -830,7 +833,9 @@ export default { onClick() { let ConfigEditorDialog = defineAsyncComponent( () => - import('../components/EnvironmentVariableEditorDialog.vue'), + import( + '../components/EnvironmentVariableEditorDialog.vue' + ), ); renderDialog( h(ConfigEditorDialog, { @@ -921,7 +926,8 @@ export default { bench: group.name, lastDeploy: group.doc?.deploy_information?.last_deploy, onSuccess(candidate) { - group.doc.deploy_information.deploy_in_progress = true; + group.doc.deploy_information.has_running_release_pipeline = true; + group.doc.deploy_information.update_available = false; if (candidate) { group.doc.deploy_information.last_deploy = { name: candidate, @@ -932,6 +938,17 @@ export default { ); }, }, + { + label: 'Validating Deploy', + slots: { + prefix: () => h(LoadingIndicator, { class: 'w-4 h-4' }), + }, + theme: 'green', + condition: () => + !group.doc.deploy_information.deploy_in_progress && + !group.doc.deploy_information.bench_creation_underway && + group.doc.deploy_information.has_running_release_pipeline, + }, { label: 'Deploy in progress', slots: { diff --git a/press/api/bench.py b/press/api/bench.py index f9e1cacf9bc..cc057dd8d5e 100644 --- a/press/api/bench.py +++ b/press/api/bench.py @@ -44,6 +44,7 @@ from press.press.doctype.deploy_candidate.deploy_candidate import DeployCandidate from press.press.doctype.deploy_candidate_build.deploy_candidate_build import DeployCandidateBuild from press.press.doctype.marketplace_app.marketplace_app import MarketplaceApp + from press.press.doctype.release_pipeline.release_pipeline import ReleasePipeline @frappe.whitelist() @@ -390,15 +391,18 @@ def dependencies(name: str): @frappe.whitelist() @protected("Release Group") def update_dependencies(name: str, dependencies: str): - dependencies = frappe.parse_json(dependencies) + dependencies_dict: list[frappe._dict] = frappe.parse_json(dependencies) rg: ReleaseGroup = frappe.get_doc("Release Group", name) - if len(rg.dependencies) != len(dependencies): + + if len(rg.dependencies) != len(dependencies_dict): frappe.throw("Need all required dependencies") - if diff := set([d["key"] for d in dependencies]) - set(d.dependency for d in rg.dependencies): + + if diff := set([d["key"] for d in dependencies_dict]) - set(d.dependency for d in rg.dependencies): frappe.throw("Invalid dependencies: " + ", ".join(diff)) + for dep, new in zip( sorted(rg.dependencies, key=lambda x: x.dependency), - sorted(dependencies, key=lambda x: x["key"]), + sorted(dependencies_dict, key=lambda x: x["key"]), strict=False, ): if dep.dependency != new["key"]: @@ -798,15 +802,20 @@ def deploy_and_update( sites: list | None = None, run_will_fail_check: bool = True, ): - validate_app_hashes(apps) + # We check permissions early on and don't change permissions in the middle of the Workflow + current_team = get_current_team() + rg_team = frappe.db.get_value("Release Group", name, "team") - # Returns name of the Deploy Candidate that is running the build - return get_bench_update( - name, - apps, - sites, - False, - ).deploy(run_will_fail_check) + if rg_team != current_team: + frappe.throw("Bench can only be deployed by the bench owner", exc=frappe.PermissionError) + + release_pipeline: ReleasePipeline = frappe.get_doc( + {"doctype": "Release Pipeline", "release_group": name, "team": current_team} + ) + release_pipeline.insert() + release_pipeline.create_release.run_as_workflow( + apps=apps, sites=sites, run_will_fail_check=run_will_fail_check + ) @frappe.whitelist() @@ -936,9 +945,9 @@ def validate_branch(name: str, app: str, branch: str): def get_branches_for_marketplace_app(app: str, marketplace_app: str, app_source: AppSource) -> list[dict]: """Return list of branches allowed for this `marketplace` app""" branch_set = set() - marketplace_app: MarketplaceApp = frappe.get_doc("Marketplace App", marketplace_app) + marketplace_app_doc: MarketplaceApp = frappe.get_doc("Marketplace App", marketplace_app) - for marketplace_app_source in marketplace_app.sources: + for marketplace_app_source in marketplace_app_doc.sources: app_source = frappe.get_doc("App Source", marketplace_app_source.source) branch_set.add(app_source.branch) @@ -1146,10 +1155,10 @@ def show_app_versions(name: str, dc_name: str) -> list[dict[str, Any]]: { "name": app.app, "hash": app.hash[:7], - "branch": sources.get(app.source).get("branch"), - "repository": sources.get(app.source).get("repository"), - "repository_owner": sources.get(app.source).get("repository_owner"), - "repository_url": sources.get(app.source).get("repository_url"), + "branch": sources.get(app.source, {}).get("branch"), + "repository": sources.get(app.source, {}).get("repository"), + "repository_owner": sources.get(app.source, {}).get("repository_owner"), + "repository_url": sources.get(app.source, {}).get("repository_url"), } for app in deploy_candidate.apps if app diff --git a/press/api/tests/test_bench.py b/press/api/tests/test_bench.py index a7fff30976b..ee10aab41d3 100644 --- a/press/api/tests/test_bench.py +++ b/press/api/tests/test_bench.py @@ -147,31 +147,6 @@ def test_deploy_and_update_fn_creates_bench_update(self): self.assertEqual(dc_count_after, dc_count_before + 1) self.assertEqual(bu_count_after, bu_count_before + 1) - @patch( - "press.press.doctype.deploy_candidate.deploy_candidate.frappe.enqueue_doc", - new=foreground_enqueue_doc, - ) - @patch("press.press.doctype.deploy_candidate.deploy_candidate.frappe.db.commit", new=Mock()) - def test_deploy_and_update_fn_fails_without_release_argument(self): - group = new( - { - "title": "Test Bench", - "apps": [{"name": self.app.name, "source": self.app_source.name}], - "version": self.version, - "cluster": "Default", - "saas_app": None, - "server": None, - } - ) - - self.assertRaises( - frappe.exceptions.ValidationError, - deploy_and_update, - group, - [{"app": self.app.name}], - [], - ) - @patch("press.press.doctype.deploy_candidate.deploy_candidate.frappe.db.commit", new=Mock()) def test_deploy_fn_fails_without_apps(self): frappe.set_user(self.team.user) diff --git a/press/exceptions.py b/press/exceptions.py index becdb5a68f7..bb5f23350f1 100644 --- a/press/exceptions.py +++ b/press/exceptions.py @@ -103,3 +103,7 @@ class ArchiveBenchError(ValidationError): class MonitorServerDown(ValidationError): pass + + +class ReleasePipelineFailure(Exception): + pass diff --git a/press/hooks.py b/press/hooks.py index 17801a6003d..d592c3118f3 100644 --- a/press/hooks.py +++ b/press/hooks.py @@ -353,6 +353,7 @@ ], "* * * * *": [ "press.press.doctype.virtual_disk_snapshot.virtual_disk_snapshot.sync_physical_backup_snapshots", + "press.workflow_engine.doctype.press_workflow_task.press_workflow_task.retry_tasks", "press.press.doctype.deploy_candidate_build.deploy_candidate_build.run_scheduled_builds", "press.press.doctype.agent_request_failure.agent_request_failure.remove_old_failures", "press.saas.doctype.site_access_token.site_access_token.cleanup_expired_access_tokens", @@ -365,6 +366,7 @@ "press.press.doctype.site.saas_pool.create", "press.press.doctype.virtual_disk_snapshot.virtual_disk_snapshot.sync_rolling_snapshots", "press.press.doctype.database_server.database_server.auto_purge_binlogs_by_size_limit", + "press.workflow_engine.doctype.press_workflow.press_workflow.retry_workflows", ], "*/30 * * * *": [ "press.press.doctype.site_update.scheduled_auto_updates.trigger", diff --git a/press/modules.txt b/press/modules.txt index d3146624329..d5be7322ed3 100644 --- a/press/modules.txt +++ b/press/modules.txt @@ -4,4 +4,5 @@ Marketplace SaaS Partner Infrastructure -Incident Management \ No newline at end of file +Incident Management +Workflow Engine \ No newline at end of file diff --git a/press/press/doctype/bench/bench.py b/press/press/doctype/bench/bench.py index 38e2b174334..809ef5d654b 100644 --- a/press/press/doctype/bench/bench.py +++ b/press/press/doctype/bench/bench.py @@ -1317,27 +1317,23 @@ def cancel_and_retry_bench_job_if_required(job: AgentJob) -> bool: if not has_retryable_error: return False - job.cancel_job() - - frappe.db.set_value("Agent Job", job.name, "status", "Failure") - frappe.db.set_value("Bench", job.bench, "status", "Broken") - - # Trigger immediate archival of bench to allow retry bench: Bench = frappe.get_doc("Bench", job.bench) - bench.archive(retry_new_bench=True) - return True - - -def retry_new_bench_job_if_possible(bench: Bench): - """Check if there are retries left, if yes then trigger a new bench job immediately.""" retry_count = frappe.db.count( "Bench", {"build": bench.build, "server": bench.server, "group": bench.group} ) if retry_count >= 3: - return + # We can't retry anymore so accept the fate and proceed with archival with job processing + return False - bench.retry_bench() + job.cancel_job() + + frappe.db.set_value("Agent Job", job.name, "status", "Failure") + frappe.db.set_value("Bench", job.bench, "status", "Broken") + + bench = bench.reload() + bench.archive(retry_new_bench=True) + return True def process_new_bench_job_update(job: AgentJob): # noqa: C901 @@ -1445,7 +1441,7 @@ def process_archive_bench_job_update(job: AgentJob): retry_new_bench = request_data.get("retry_new_bench", False) if updated_status == "Archived" and retry_new_bench: - retry_new_bench_job_if_possible(bench) + bench.retry_bench() # We know now for sure that the bench can be retired def process_add_ssh_user_job_update(job): diff --git a/press/press/doctype/bench_update/bench_update.py b/press/press/doctype/bench_update/bench_update.py index 3319f02447b..e4f5404a07d 100644 --- a/press/press/doctype/bench_update/bench_update.py +++ b/press/press/doctype/bench_update/bench_update.py @@ -84,10 +84,19 @@ def validate_inplace_update(self): frappe.ValidationError, ) - def deploy(self, run_will_fail_check=False) -> str: + def deploy( + self, + run_will_fail_check=False, + validate_pre_candidate_checks: bool = True, + create_build: bool = True, + ) -> str: + """Creates and returns candidate name or build name depending on the point of invocation.""" rg: ReleaseGroup = frappe.get_doc("Release Group", self.group) - candidate = rg.create_deploy_candidate(self.apps, run_will_fail_check) - deploy = candidate.schedule_build_and_deploy() + candidate = rg.create_deploy_candidate( + apps_to_update=self.apps, + run_will_fail_check=run_will_fail_check, + validate_pre_candidate_checks=validate_pre_candidate_checks, + ) self.candidate = candidate.name self.save() @@ -97,6 +106,12 @@ def deploy(self, run_will_fail_check=False) -> str: f"Invalid name found for deploy candidate '{candidate.name}' of type {type(candidate.name)}" ) + if not create_build: + # In case we are not scheduling build from here (eg. new build flow) return candidate name here + return candidate.name + + deploy = candidate.schedule_build_and_deploy() + return deploy["name"] def update_inplace(self) -> str: @@ -164,15 +179,17 @@ def get_bench_update( apps: list, sites: list | None = None, is_inplace_update: bool = False, + ignore_permissions_check: bool = False, ) -> BenchUpdate: if sites is None: sites = [] - current_team = get_current_team() - rg_team = frappe.db.get_value("Release Group", name, "team") + if not ignore_permissions_check: + current_team = get_current_team() + rg_team = frappe.db.get_value("Release Group", name, "team") - if rg_team != current_team: - frappe.throw("Bench can only be deployed by the bench owner", exc=frappe.PermissionError) + if rg_team != current_team: + frappe.throw("Bench can only be deployed by the bench owner", exc=frappe.PermissionError) bench_update: "BenchUpdate" = frappe.get_doc( { @@ -192,4 +209,5 @@ def get_bench_update( "is_inplace_update": is_inplace_update, } ).insert(ignore_permissions=True) + return bench_update diff --git a/press/press/doctype/deploy_candidate_build/deploy_candidate_build.py b/press/press/doctype/deploy_candidate_build/deploy_candidate_build.py index 9cebf2782c2..cbc7247c734 100644 --- a/press/press/doctype/deploy_candidate_build/deploy_candidate_build.py +++ b/press/press/doctype/deploy_candidate_build/deploy_candidate_build.py @@ -1167,6 +1167,7 @@ def pre_build(self, **kwargs): queue=queue, timeout=2400, enqueue_after_commit=True, + job_id=f"deploy_candidate_build:{self.name}", ) frappe.set_user(user) diff --git a/press/press/doctype/release_group/release_group.py b/press/press/doctype/release_group/release_group.py index a07a9a1f1d8..063f5470cb4 100644 --- a/press/press/doctype/release_group/release_group.py +++ b/press/press/doctype/release_group/release_group.py @@ -725,15 +725,14 @@ def check_auto_scales(self) -> None: @frappe.whitelist() def create_deploy_candidate( - self, - apps_to_update=None, - run_will_fail_check=False, + self, apps_to_update=None, run_will_fail_check=False, validate_pre_candidate_checks: bool = True ) -> "DeployCandidate | None": if not self.enabled: return None - self.check_app_server_storage() - self.check_auto_scales() + if validate_pre_candidate_checks: + self.check_app_server_storage() + self.check_auto_scales() apps = self.get_apps_to_update(apps_to_update) if apps_to_update is None: @@ -864,6 +863,16 @@ def get_sorted_based_on_rg_apps(self, apps): return sorted_apps + @property + def has_running_release_pipeline(self) -> bool: + return bool( + frappe.db.exists( + "Release Pipeline", + {"release_group": self.name, "status": ("in", ["Pending", "Running", "Retrying"])}, + "name", + ) + ) + @frappe.whitelist() def deploy_information(self): out = frappe._dict(update_available=False) @@ -874,6 +883,14 @@ def deploy_information(self): out.last_deploy = self.last_dc_info out.deploy_in_progress = self.deploy_in_progress + out.has_running_release_pipeline = self.has_running_release_pipeline + if not out.deploy_in_progress and out.has_running_release_pipeline: + # Check if the deploy has finished and bench creation is underway. + out.bench_creation_underway = bool( + frappe.db.exists("Bench", {"group": self.name, "status": ("in", ("Installing", "Pending"))}) + ) + else: + out.bench_creation_underway = False out.removed_apps = self.get_removed_apps() out.update_available = ( @@ -881,6 +898,7 @@ def deploy_information(self): or (len(out.removed_apps) > 0) or self.dependency_update_pending ) + out.update_available = False if out.has_running_release_pipeline else out.update_available out.number_of_apps = len(self.apps) out.sites = [ diff --git a/press/press/doctype/release_pipeline/__init__.py b/press/press/doctype/release_pipeline/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/press/press/doctype/release_pipeline/release_pipeline.js b/press/press/doctype/release_pipeline/release_pipeline.js new file mode 100644 index 00000000000..ff450c50b3b --- /dev/null +++ b/press/press/doctype/release_pipeline/release_pipeline.js @@ -0,0 +1,8 @@ +// Copyright (c) 2026, Frappe and contributors +// For license information, please see license.txt + +// frappe.ui.form.on("Release Pipeline", { +// refresh(frm) { + +// }, +// }); diff --git a/press/press/doctype/release_pipeline/release_pipeline.json b/press/press/doctype/release_pipeline/release_pipeline.json new file mode 100644 index 00000000000..e7fed847f2c --- /dev/null +++ b/press/press/doctype/release_pipeline/release_pipeline.json @@ -0,0 +1,87 @@ +{ + "actions": [], + "allow_rename": 1, + "creation": "2026-03-26 13:28:36.481790", + "doctype": "DocType", + "engine": "InnoDB", + "field_order": [ + "team", + "release_group", + "status" + ], + "fields": [ + { + "fieldname": "team", + "fieldtype": "Link", + "in_list_view": 1, + "label": "Team", + "options": "Team", + "reqd": 1, + "search_index": 1 + }, + { + "fieldname": "release_group", + "fieldtype": "Link", + "label": "Release Group", + "options": "Release Group", + "read_only": 1, + "search_index": 1 + }, + { + "default": "Pending", + "fieldname": "status", + "fieldtype": "Select", + "label": "Status", + "options": "Pending\nRunning\nPartial Success\nSuccess\nFailure\nRetrying" + } + ], + "grid_page_length": 50, + "index_web_pages_for_search": 1, + "links": [], + "modified": "2026-04-09 15:10:24.783344", + "modified_by": "Administrator", + "module": "Press", + "name": "Release Pipeline", + "owner": "Administrator", + "permissions": [ + { + "create": 1, + "delete": 1, + "email": 1, + "export": 1, + "print": 1, + "read": 1, + "report": 1, + "role": "System Manager", + "share": 1, + "write": 1 + }, + { + "create": 1, + "email": 1, + "export": 1, + "print": 1, + "read": 1, + "report": 1, + "role": "Press Admin", + "share": 1, + "write": 1 + }, + { + "create": 1, + "email": 1, + "export": 1, + "print": 1, + "read": 1, + "report": 1, + "role": "Press Member", + "share": 1, + "write": 1 + } + ], + "row_format": "Dynamic", + "rows_threshold_for_grid_search": 20, + "sort_field": "modified", + "sort_order": "DESC", + "states": [] +} \ No newline at end of file diff --git a/press/press/doctype/release_pipeline/release_pipeline.py b/press/press/doctype/release_pipeline/release_pipeline.py new file mode 100644 index 00000000000..407052a5b07 --- /dev/null +++ b/press/press/doctype/release_pipeline/release_pipeline.py @@ -0,0 +1,463 @@ +# Copyright (c) 2026, Frappe and contributors +# For license information, please see license.txt +import typing +from functools import cached_property +from typing import Any, TypedDict + +import frappe + +from press.exceptions import InsufficientSpaceOnServer, ReleasePipelineFailure +from press.press.doctype.bench_update.bench_update import get_bench_update +from press.workflow_engine.doctype.press_workflow.decorators import flow, task +from press.workflow_engine.doctype.press_workflow.exceptions import PressWorkflowTaskEnqueued +from press.workflow_engine.doctype.press_workflow.press_workflow import PressWorkflow +from press.workflow_engine.doctype.press_workflow.workflow_builder import WorkflowBuilder + +if typing.TYPE_CHECKING: + from press.press.doctype.agent_job.agent_job import AgentJob + from press.press.doctype.bench_update.bench_update import BenchUpdate + from press.press.doctype.deploy_candidate.deploy_candidate import DeployCandidate + from press.press.doctype.deploy_candidate_build.deploy_candidate_build import DeployCandidateBuild + from press.press.doctype.release_group.release_group import ReleaseGroup + + +BENCH_TRANSITION_STATES = ["Pending", "Installing", "Updating"] +# Keeping this here now, will eventually move all notifications logic here. +SKIP_NOTIFICATIONS_FOR = ["orchestrate_build_monitoring", "monitor_bench_creation"] + + +class FailedBenchJobs(TypedDict): + name: str + bench: str + + +class BenchInfo(TypedDict): + name: str + status: str + + +class ReleasePipeline(WorkflowBuilder): + # begin: auto-generated types + # This code is auto-generated. Do not modify anything in this block. + + from typing import TYPE_CHECKING + + if TYPE_CHECKING: + from frappe.types import DF + + release_group: DF.Link | None + status: DF.Literal["Pending", "Running", "Partial Success", "Success", "Failure", "Retrying"] + team: DF.Link + # end: auto-generated types + + def update_pipeline_status( + self, + status: typing.Literal[ + "Pending", + "Running", + "Partial Success", + "Success", + "Failure", + "Retrying", + ], + ): + self.status = status + self.save() + + @cached_property + def release_group_doc(self) -> "ReleaseGroup": + return frappe.get_doc("Release Group", self.release_group) + + @cached_property + def workflow_name(self) -> str: + return frappe.db.get_value( + "Press Workflow", {"linked_doctype": "Release Pipeline", "linked_docname": self.name}, "name" + ) + + def get_task_name(self, func): + """Get task name for the given function""" + return frappe.db.get_value( + "Press Workflow Task", {"method_name": func.__name__, "workflow": self.workflow_name}, "name" + ) + + @task + def validate_app_hashes(self, apps: list[dict[str, str]]): + """Validate App Hashes""" + from press.api.bench import validate_app_hashes as app_hash_validation + + app_hash_validation(apps) + + self.update_pipeline_status("Running") # Mark the pipeline as running! + + @task + def validate_server_storages(self): + """Validate server storage for all servers in the release group.""" + self.release_group_doc.check_app_server_storage() + + @task + def validate_auto_scales_on_servers(self): + """Validate no server in release group is autoscaled.""" + self.release_group_doc.check_auto_scales() + + @task + def create_deploy_candidate( + self, + apps: list[dict[str, str]], + sites: list[dict[str, Any]], + run_will_fail_check: bool = False, + create_deploy: bool = False, + ) -> str: + """Create a Deploy Candidate for the release group.""" + assert isinstance(self.release_group, str) + bench_update: BenchUpdate = get_bench_update( + self.release_group, apps, sites, is_inplace_update=False, ignore_permissions_check=True + ) + return bench_update.deploy( + run_will_fail_check=run_will_fail_check, + validate_pre_candidate_checks=False, + create_build=create_deploy, + ) + + @task + def initiate_pre_build_validations(self, deploy_candidate: str) -> str: + """Start the deploy candidate build process which will run the pre-build validations.""" + candidate: DeployCandidate = frappe.get_doc("Deploy Candidate", deploy_candidate) + deploy_candidate_build = candidate.schedule_build_and_deploy() + return deploy_candidate_build["name"] + + def _get_required_build_count(self, deploy_candidate: str) -> int: + """Get the number of builds required for this deploy, as we can have arm & intel build for the same deploy candidate""" + intel_build, arm_build = frappe.db.get_value( + "Deploy Candidate", deploy_candidate, ["requires_intel_build", "requires_arm_build"] + ) + + return len([build for build in [intel_build, arm_build] if build]) + + def _check_for_scheduled_build_retries(self, deploy_candidate_build: str): + """Check if there are any scheduled retries for this build""" + deploy_candidate_build_doc: DeployCandidateBuild = frappe.get_doc( + "Deploy Candidate Build", deploy_candidate_build + ) + + agent_job: AgentJob = frappe.get_doc( + "Agent Job", + { + "reference_doctype": "Deploy Candidate Build", + "reference_name": deploy_candidate_build, + "job_type": "Run Remote Builder", + }, + ) + + if deploy_candidate_build_doc.should_build_retry(exc=None, job=agent_job): + self.update_pipeline_status("Retrying") + raise PressWorkflowTaskEnqueued( + f"Build {deploy_candidate_build} has scheduled retries. Waiting for retries to complete.", + self.workflow_name, + self.get_task_name(self.monitor_pre_build_validation), + ) + + def _get_latest_retried_build(self, deploy_candidate_build: str) -> str: + """In case there are retries for the build, get the latest retried build to monitor.""" + deploy_candidate = frappe.db.get_value( + "Deploy Candidate Build", deploy_candidate_build, "deploy_candidate" + ) + + # Get the latest **retried** build + retried_build = frappe.db.get_value( + "Deploy Candidate Build", + { + "group": self.release_group, + "deploy_candidate": deploy_candidate, + "name": ("!=", deploy_candidate_build), + }, + "name", + order_by="creation desc", + ) + + return retried_build or deploy_candidate_build + + @task + def monitor_pre_build_validation(self, deploy_candidate_build: str): + """Monitors the Deploy Candidate Build until the remote build job is created.""" + task_name = self.get_task_name(self.monitor_pre_build_validation) + deploy_candidate_build_status = frappe.db.get_value( + "Deploy Candidate Build", deploy_candidate_build, "status" + ) + + if deploy_candidate_build_status in ["Running", "Success"]: + return # We have enqueued the remote agent job + + if deploy_candidate_build_status == "Failure": + raise ReleasePipelineFailure( + f"Pre Build Validation failed for Deploy Candidate Build {deploy_candidate_build}. " + "Please check the build logs for more details." + ) + + raise PressWorkflowTaskEnqueued( + f"Waiting for remote build job to be enqueued for Deploy Candidate Build {deploy_candidate_build}", + self.workflow_name, + task_name, + ) + + @task + def monitor_build_success(self, deploy_candidate_build: str): + """Monitor build till terminal state.""" + deploy_candidate_build = self._get_latest_retried_build(deploy_candidate_build) + deploy_candidate_build_status = frappe.db.get_value( + "Deploy Candidate Build", deploy_candidate_build, "status" + ) + + if deploy_candidate_build_status == "Success": + return # Remote Build succeeded can mark as success and proceed + + if deploy_candidate_build_status == "Failure": + # This will raise and enqueue the function again in case there are scheduled retries for the build + self._check_for_scheduled_build_retries(deploy_candidate_build) + raise ReleasePipelineFailure( + f"Remote build failed for Deploy Candidate Build {deploy_candidate_build}. Please check the build logs for more details." + ) + + raise PressWorkflowTaskEnqueued( + f"Waiting for build to complete for Deploy Candidate Build {deploy_candidate_build}", + self.workflow_name, + self.get_task_name(self.monitor_build_success), + ) + + def _is_active_bench_work_in_progress(self, builds: list[str]) -> bool: + """Checks the entire lifecycle (Queue + Agent Jobs) for active work.""" + queue_active = frappe.db.exists( + "New Bench Queue", + {"group": self.release_group, "status": "Queued"}, + ) + if queue_active: + return True + + benches_from_builds = frappe.db.get_all("Bench", {"build": ["in", builds]}, pluck="name") + if not benches_from_builds: + # No benches created yet and nothing in queue? No work in progress. + # Since after insert of bench creates agent job immediately. + return False + + will_benches_be_retried = frappe.db.exists( + "Agent Job", + { + "job_type": "Archive Bench", + "bench": ("in", benches_from_builds), + "request_data": ( + "like", + '%"retry_new_bench": true%', # Since retry is being passed here it will definitely lead to creation of new bench + ), # Should not be too heavy since bench is indexed? + "status": ["in", ["Undelivered", "Pending", "Running"]], + }, + ) + + if will_benches_be_retried: + self.update_pipeline_status("Retrying") + return True + + # Even if there are no retries scheduled, we want to wait for the current bench jobs to be completed + agent_job_active = frappe.db.exists( + "Agent Job", + { + "job_type": ["in", ["New Bench", "Archive Bench"]], + "status": ["in", ["Undelivered", "Pending", "Running"]], + "bench": ["in", benches_from_builds], + }, + ) + + return bool(agent_job_active) + + def _calculate_bench_doc_requirements(self, candidate: str, builds: list[str]) -> int: + # This can have intel and arm server both will have different builds + number_of_expected_bench_docs = len(self.release_group_doc.servers) + # Total number of bench docs created regardless of the server platforms + + if not builds: + raise ReleasePipelineFailure(f"No builds found for Deploy Candidate {candidate}.") + + return number_of_expected_bench_docs + + def _finalize_pipeline_status(self, builds: list, expected_count: int): + """Finalize the pipeline status based on the number of failed bench deploys vs expected bench deploys.""" + successful_deploys = frappe.db.count("Bench", {"build": ["in", builds], "status": "Active"}) + + if successful_deploys == expected_count: + return self.update_pipeline_status("Success") + + if successful_deploys == 0: + self.update_pipeline_status("Failure") + raise ReleasePipelineFailure(f"All {expected_count} bench deploy(s) failed.") + + # If some succeeded and others are permanently failed + return self.update_pipeline_status("Partial Success") + + def _get_secondary_build(self, deploy_candidate: str, primary_build: str) -> str | None: + """Finds a build for the same candidate but on a different platform.""" + primary_platform = frappe.db.get_value("Deploy Candidate Build", primary_build, "platform") + + return frappe.db.get_value( + "Deploy Candidate Build", + { + "deploy_candidate": deploy_candidate, + "name": ("!=", primary_build), + "group": self.release_group, + "platform": ("!=", primary_platform), + }, + "name", + ) + + @task + def run_pre_release_checks(self, apps: list[dict[str, str]]): + """Groups all early-exit validation logic.""" + try: + self.validate_app_hashes(apps) # This sets status to "Running" + self.validate_server_storages() + self.validate_auto_scales_on_servers() + except (frappe.ValidationError, InsufficientSpaceOnServer) as e: + raise ReleasePipelineFailure(str(e)) from e + + @task + def prepare_deployment(self, apps, sites, run_will_fail_check) -> tuple[str, str]: + """Creates the candidate and returns the primary build name.""" + try: + deploy_candidate = self.create_deploy_candidate( + apps=apps, + sites=sites, + run_will_fail_check=run_will_fail_check, + create_deploy=False, + ) + primary_build = self.initiate_pre_build_validations(deploy_candidate) + return deploy_candidate, primary_build + except frappe.ValidationError as e: + raise ReleasePipelineFailure(f"Failed to prepare deployment: {e!s}") from e + + @task + def orchestrate_build_monitoring(self, deploy_candidate: str, primary_build: str): + """Monitors primary and, if necessary, secondary builds.""" + # Monitor Primary + self.monitor_pre_build_validation(primary_build) + self.monitor_build_success(primary_build) + + # Check for Secondary Architecture + if self._get_required_build_count(deploy_candidate) == 2: + secondary_build = self._get_secondary_build(deploy_candidate, primary_build) + + if not secondary_build: + # Wait for sometime for the secondary build to be created in case of any delays in build scheduling + raise PressWorkflowTaskEnqueued( + f"Waiting for secondary build creation for {deploy_candidate}", + self.workflow_name, + self.get_task_name(self.monitor_build_success), + ) + + self.monitor_pre_build_validation(secondary_build) + self.monitor_build_success(secondary_build) + + if self.status == "Retrying": + # If we were in retrying status, it means builds have succeeded after retries, we can move back to running status + self.update_pipeline_status("Running") + + @task + def monitor_bench_creation(self, deploy_candidate_build: str): + """Monitor new bench creation accounting for any failures and retries.""" + candidate = frappe.db.get_value("Deploy Candidate Build", deploy_candidate_build, "deploy_candidate") + intel_build, arm_build = frappe.db.get_value( + "Deploy Candidate", candidate, ["intel_build", "arm_build"] + ) + builds = [b for b in [intel_build, arm_build] if b] + expected = self._calculate_bench_doc_requirements(candidate=candidate, builds=builds) + + # This should take care of the retries as well. + if self._is_active_bench_work_in_progress(builds): + raise PressWorkflowTaskEnqueued( + "Benches in progress, Waiting...", + self.workflow_name, + self.get_task_name(self.monitor_bench_creation), + ) + + # Just another safety lock to ensure no early failures occur + statues = frappe.db.get_all("Bench", {"build": ["in", builds]}, pluck="status") + in_transition = [status for status in statues if status in BENCH_TRANSITION_STATES] + + if in_transition: + raise PressWorkflowTaskEnqueued( + "Benches are in transition states...", + self.workflow_name, + self.get_task_name(self.monitor_bench_creation), + ) + + self._finalize_pipeline_status(builds=builds, expected_count=expected) + + def get_failure_summary(self, workflow: PressWorkflow) -> str | None: + """The first failure gets fails everything""" + point_of_failure = [step.task for step in workflow.steps if step.status == "Failure"] + + if not point_of_failure: + return None + + point_of_failure = point_of_failure[0] + workflow_object_name, method_name = frappe.db.get_value( + "Press Workflow Task", point_of_failure, ["exception", "method_name"] + ) + + if method_name in SKIP_NOTIFICATIONS_FOR: + # Notifications for build failures are handled separately in the deploy notifications + return None + + return frappe.db.get_value("Press Workflow Object", workflow_object_name, "summary") + + def on_update(self): + """A few steps have their notifications handled seperately in (ref deploy_notifications.py) skipping them""" + if not self.has_value_changed("status"): + return + + if self.status != "Failure": + return + + workflow = frappe.get_doc("Press Workflow", self.workflow_name) + failure_summary = self.get_failure_summary(workflow) + + if not failure_summary: + return # No failed tasks found, no need to create a notification + + frappe.get_doc( + { + "doctype": "Press Notification", + "title": "Update Failure", + "type": "Bench Deploy", + "is_actionable": False, + "class": "Error", + "team": self.team, + "document_type": "Release Pipeline", + "document_name": self.name, + "message": failure_summary, + } + ).insert() + + frappe.publish_realtime( + "press_notification", doctype="Press Notification", message={"team": self.team} + ) + + @flow + def create_release( + self, + apps: list[dict[str, str]], + sites: list[dict[str, Any]], + run_will_fail_check: bool = False, + ): + """Orchestrates the release process from validation to bench creation with recursive monitoring and retry handling""" + try: + # 1. Validation Phase + self.run_pre_release_checks(apps) + + # 2. Initialization Phase + deploy_candidate, primary_build = self.prepare_deployment(apps, sites, run_will_fail_check) + + # 3. Monitoring Phase (Handles 1 or 2 builds) + self.orchestrate_build_monitoring(deploy_candidate, primary_build) + + # 4. Finalization Phase + self.monitor_bench_creation(primary_build) + + except ReleasePipelineFailure: + self.update_pipeline_status("Failure") diff --git a/press/press/doctype/release_pipeline/test_release_pipeline.py b/press/press/doctype/release_pipeline/test_release_pipeline.py new file mode 100644 index 00000000000..7fbc5a8d917 --- /dev/null +++ b/press/press/doctype/release_pipeline/test_release_pipeline.py @@ -0,0 +1,134 @@ +# Copyright (c) 2026, Frappe and Contributors +# See license.txt +import shutil +from unittest.mock import Mock, patch + +import frappe +from frappe.database.mariadb.database import MariaDBDatabase +from frappe.tests.utils import FrappeTestCase + +from press.api.bench import deploy_and_update +from press.press.doctype.agent_job.agent_job import poll_pending_jobs +from press.press.doctype.agent_job.test_agent_job import fake_agent_job +from press.press.doctype.app.test_app import create_test_app +from press.press.doctype.app_release.test_app_release import create_test_app_release +from press.press.doctype.app_source.test_app_source import create_test_app_source +from press.press.doctype.deploy_candidate_build.deploy_candidate_build import DeployCandidateBuild +from press.press.doctype.release_group.test_release_group import create_test_release_group +from press.press.doctype.release_pipeline.release_pipeline import ReleasePipeline +from press.press.doctype.server.test_server import create_test_server +from press.utils import get_current_team + + +def get_mock_context_file(*args, **kwargs): + return frappe.mock("file_path") + + +def mock_build_monitoring(*args, **kwargs): + """Simulates monitoring of the build however returns success immediately without raising task enqueued error""" + return + + +def mock_pre_build_validation_monitoring(*args, **kwargs): + """Simulates monitoring of the pre-build validation however returns success immediately without raising task enqueued error""" + return + + +def mock_bench_monitoring(*args, **kwargs): + """Simulates monitoring of the benches however returns success immediately without raising task enqueued error""" + return + + +@patch.object(MariaDBDatabase, "commit", Mock()) +class TestReleasePipeline(FrappeTestCase): + def tearDown(self): + frappe.set_user("Administrator") + + @classmethod + def setUpClass(cls): + super().setUpClass() + server = create_test_server(use_for_build=True) + cls.test_frappe_app = create_test_app("frappe") + cls.test_erpnext_app = create_test_app("erpnext", "Erpnext App") + cls.test_frappe_release = create_test_app_release( + create_test_app_source( + app=cls.test_frappe_app, + version="Version 15", + repository_url="https://github.com/frappe/frappe", + ), + "79f10f8769a9a839abaef1f220da2d2eea5eea27", # pragma: allowlist secret + ) + cls.test_erpnext_release = create_test_app_release( + create_test_app_source( + app=cls.test_erpnext_app, + version="Version 15", + repository_url="https://github.com/frappe/erpnext", + ), + "a2626ed55f437f69b99a29cfb0b9ead219f59458", # pragma: allowlist secret + ) + cls.test_release_group = create_test_release_group( + apps=[cls.test_frappe_app, cls.test_erpnext_app], + frappe_version="Version 15", + servers=[server.name], + ) + frappe.db.set_single_value("Press Settings", "build_directory", "/tmp/test-build-dir/") + frappe.db.set_single_value("Press Settings", "clone_directory", "/tmp/test-clone-dir/") + + def create_deploy_and_update(self): + deploy_and_update( + self.test_release_group.name, + apps=[ + { + "app": "frappe", + "source": "SRC-Frappe-001", + "release": self.test_frappe_release.name, + "hash": self.test_frappe_release.hash, + }, + { + "app": "erpnext", + "source": "SRC-Erpnext-001", + "release": self.test_erpnext_release.name, + "hash": self.test_erpnext_release.hash, + }, + ], + ) + + @patch.object(DeployCandidateBuild, "_upload_build_context", get_mock_context_file) + @patch.object(DeployCandidateBuild, "_build", Mock()) + @patch.object(ReleasePipeline, "monitor_pre_build_validation", mock_pre_build_validation_monitoring) + @patch.object(ReleasePipeline, "monitor_build_success", mock_build_monitoring) + def test_release_pipeline_creation(self): + self.create_deploy_and_update() + + release_pipeline: ReleasePipeline = frappe.get_last_doc("Release Pipeline") + self.assertEqual(release_pipeline.release_group, self.test_release_group.name) + self.assertEqual(release_pipeline.team, get_current_team()) + release_pipeline = frappe.get_doc( + "Press Workflow", + { + "linked_doctype": "Release Pipeline", + "linked_docname": release_pipeline.name, + }, + ) + + frappe.get_last_doc( + "Press Workflow" + ) # To ensure nothing is raised when fetching the workflow created for the release pipeline + + @patch.object(DeployCandidateBuild, "_upload_build_context", get_mock_context_file) + @patch.object(DeployCandidateBuild, "_build", Mock()) + @patch.object(ReleasePipeline, "monitor_pre_build_validation", mock_pre_build_validation_monitoring) + @patch.object(ReleasePipeline, "monitor_build_success", mock_build_monitoring) + def test_release_pipeline_build_creation(self): + with fake_agent_job("Remote Build Job", "Success"): + self.create_deploy_and_update() + poll_pending_jobs() + + frappe.get_last_doc( + "Deploy Candidate Build" + ) # Just ensure this is created without error since we are mocking the build + + @classmethod + def tearDownClass(cls): + shutil.rmtree(frappe.db.get_single_value("Press Settings", "build_directory"), ignore_errors=True) + shutil.rmtree(frappe.db.get_single_value("Press Settings", "clone_directory"), ignore_errors=True) diff --git a/press/utils/test.py b/press/utils/test.py index 65f63ae7d4b..7de9df6d505 100644 --- a/press/utils/test.py +++ b/press/utils/test.py @@ -1,5 +1,6 @@ """Utility methods for writing tests""" +import sys from collections.abc import Callable from urllib.parse import urlparse, urlunparse @@ -46,6 +47,98 @@ def foreground_enqueue_doc( getattr(frappe.get_doc(doctype, docname), method)(**kwargs) +def _foreground_run_workflow_doc(doctype: str, docname: str, job_id: str) -> None: + """ + Tracks in-flight job IDs to prevent direct recursion. When the same job_id + is re-enqueued while it is already on the call-stack the request is deferred; + once the outermost invocation finishes, deferred calls are drained in order. + This mirrors the enqueue_after_commit + deduplication semantics used in + production. + """ + if not hasattr(frappe.local, "_fg_wf_in_flight"): + frappe.local._fg_wf_in_flight = set() + if not hasattr(frappe.local, "_fg_wf_pending"): + frappe.local._fg_wf_pending = {} + + in_flight: set = frappe.local._fg_wf_in_flight + pending: dict = frappe.local._fg_wf_pending + + if job_id in in_flight: + # Already executing this job - defer until the outermost call drains it. + print( + f"[FG] DEFER {job_id} (in-flight: {sorted(in_flight)})", + file=sys.stderr, + flush=True, + ) + pending[job_id] = (doctype, docname) + return + + print(f"[FG] START {job_id}", file=sys.stderr, flush=True) + in_flight.add(job_id) + method_title = "unknown_method" + try: + doc = frappe.get_doc(doctype, docname) + method_title = ( + doc.main_method_title + if hasattr(doc, "main_method_title") + else (doc.method_title if hasattr(doc, "method_title") else "unknown_method") + ) + print( + f"[FG] RUN {job_id} {method_title} | status={getattr(doc, 'status', '?')}", + file=sys.stderr, + flush=True, + ) + doc.run() + print( + f"[FG] DONE {job_id} {method_title} | status={getattr(frappe.get_doc(doctype, docname), 'status', '?')}", + file=sys.stderr, + flush=True, + ) + # Drain any re-enqueue requests that arrived while this job was running. + retry = 0 + while job_id in pending: + retry += 1 + pending.pop(job_id) + print(f"[FG] RETRY {job_id} {method_title} (#{retry})", file=sys.stderr, flush=True) + doc = frappe.get_doc(doctype, docname) + print( + f"[FG] RUN {job_id} {method_title} | status={getattr(doc, 'status', '?')} (retry #{retry})", + file=sys.stderr, + flush=True, + ) + doc.run() + print( + f"[FG] DONE {job_id} {method_title} | status={getattr(frappe.get_doc(doctype, docname), 'status', '?')} (retry #{retry})", + file=sys.stderr, + flush=True, + ) + finally: + print( + f"[FG] FINISH {job_id} {method_title} | pending={list(pending.keys())}", + file=sys.stderr, + flush=True, + ) + in_flight.discard(job_id) + + +def foreground_enqueue_task(task_name: str) -> None: + print(f"[FG] enqueue_task({task_name})", file=sys.stderr, flush=True) + _foreground_run_workflow_doc( + "Press Workflow Task", + task_name, + f"press_workflow_task||{task_name}||run", + ) + + +def foreground_enqueue_workflow(workflow_name: str) -> None: + print(f"[FG] enqueue_workflow({workflow_name})", file=sys.stderr, flush=True) + _foreground_run_workflow_doc( + "Press Workflow", + workflow_name, + f"press_workflow||{workflow_name}||run", + ) + + def foreground_enqueue( method: str | Callable, queue: str = "default", diff --git a/press/workflow_engine/README.md b/press/workflow_engine/README.md new file mode 100644 index 00000000000..444129481f3 --- /dev/null +++ b/press/workflow_engine/README.md @@ -0,0 +1,363 @@ +# Press Workflow Engine + +An asynchronous workflow engine for Frappe. Define long-running workflows with automatic task tracking, state persistence, and fault tolerance using simple decorators. + +--- + +## Quick Start + +### 1. Inherit from WorkflowBuilder + +```python +from press_agent_manager.workflow_engine.doctype.press_workflow.decorators import flow, task +from press_agent_manager.workflow_engine.doctype.press_workflow.workflow_builder import WorkflowBuilder + + +class MyDoctype(WorkflowBuilder): + pass +``` + +### 2. Define tasks with `@task` + +```python +class MyDoctype(WorkflowBuilder): + + @task + def fetch_data(self) -> dict: + """Fetch data from external API""" + response = requests.get(self.api_url) + return response.json() + + @task + def process_item(self, item: dict) -> dict: + """Process a single item""" + return {"id": item["id"], "processed": True} +``` + +### 3. Define a flow with `@flow` + +```python +class MyDoctype(WorkflowBuilder): + + # ... tasks defined above ... + + @flow + def run_pipeline(self) -> dict: + """Main processing pipeline""" + data = self.fetch_data() + + results = [] + for i, item in enumerate(data["items"]): + result = self.process_item.with_task_id(f"item_{i}")(item) + results.append(result) + + self.status = "Completed" + self.save(ignore_permissions=True) + return {"count": len(results)} +``` + +### 4. Start the workflow + +```python +doc = frappe.get_doc("My Doctype", "DOC-001") + +# Run as background workflow (async) +workflow_name = doc.run_pipeline.run_as_workflow() + +# Or just call it directly (sync) - works like a normal method +result = doc.run_pipeline() +``` + +The engine handles task execution, persistence, retries, and resumption automatically. + +--- + +## Decorators + +### `@task` + +Marks a method as a workflow task. + +**In workflow mode** (via `.run_as_workflow()`): Tasks are enqueued for background execution, results are persisted, and the workflow pauses until the task completes. + +**In normal mode** (direct call): Behaves like a regular method - runs synchronously and returns immediately. + +```python +class MyDoctype(WorkflowBuilder): + + @task + def my_task(self, arg1: int, arg2: str) -> dict: + """First line of docstring becomes the step title in UI""" + return {"result": arg1 + len(arg2)} + +# Both work: +doc.my_task(1, "hello") # sync, normal method call +doc.some_flow.run_as_workflow() # async, my_task tracked as workflow step +``` + +### `@flow` + +Marks a method as a workflow entry point. + +**In workflow mode** (via `.run_as_workflow()`): Creates a `Press Workflow` document, discovers all task calls via AST inspection, and tracks them as steps. + +**In normal mode** (direct call): Runs synchronously like a normal method. + +```python +class MyDoctype(WorkflowBuilder): + + @task + def some_task(self, x: int) -> int: + return x * 2 + + @flow + def my_workflow(self, x: int) -> dict: + """My workflow description""" + result = self.some_task(x) + return {"done": True, "result": result} + +# Both work: +doc.my_workflow(42) # sync +doc.my_workflow.run_as_workflow(42) # async, creates Press Workflow +``` + +--- + +## Sharing State Between Tasks + +> **Important**: Do NOT use in-memory structures to share state between tasks. + +Since tasks run in separate background jobs, things like `self.some_var`, `frappe.local.xyz`, or `frappe.flags.xyz` won't persist between task executions. By the time Task B runs, whatever Task A stored in memory is gone. + +**Wrong:** + +```python +class MyDoctype(WorkflowBuilder): + + @task + def step_one(self): + self.intermediate_data = {"important": "stuff"} # Won't work! + frappe.flags.my_data = [1, 2, 3] # Won't work! + + @task + def step_two(self): + print(self.intermediate_data) # AttributeError or stale data +``` + +**Correct - use `self.kv`:** + +The key-value store persists data to the database and works both in workflow mode and normal execution: + +```python +class MyDoctype(WorkflowBuilder): + + @task + def step_one(self): + self.kv.set("intermediate_data", {"important": "stuff"}) + self.kv.set("items", [1, 2, 3]) + + @task + def step_two(self): + data = self.kv.get("intermediate_data") # {"important": "stuff"} + items = self.kv.get("items") # [1, 2, 3] + + # Clean up when done + self.kv.delete("intermediate_data") +``` + +The `self.kv` store: + +- Works in both workflow mode and normal sync calls +- Persists any picklable Python object +- Is scoped to the workflow instance + +--- + +## Nested Tasks and Loops + +### Using `.with_task_id()` in loops + +When calling the same task multiple times (like in a loop), you need unique task IDs to prevent deduplication issues: + +```python +class MyDoctype(WorkflowBuilder): + + @task + def process_item(self, item: dict) -> dict: + return {"processed": item["id"]} + + @flow + def process_all(self) -> list: + results = [] + for i, item in enumerate(self.items): + # Each iteration needs a unique task ID + result = self.process_item.with_task_id(f"item_{i}")(item) + results.append(result) + return results +``` + +Without `.with_task_id()`, the engine would see identical signatures and return cached results from the first call. + +### Nested task calls + +Tasks can call other tasks: + +```python +class MyDoctype(WorkflowBuilder): + + @task + def multiply(self, a: int, b: int) -> int: + return a * b + + @task + def power(self, base: int, exponent: int) -> int: + """Calculate power using nested multiply calls""" + result = 1 + for i in range(exponent): + result = self.multiply.with_task_id(f"mult_{i}")(result, base) + return result +``` + +The engine handles the nesting - parent tasks pause while nested tasks execute, then resume. + +--- + +## Warning: Never Catch `PressWorkflowTaskEnqueued` + +The workflow engine uses `PressWorkflowTaskEnqueued` internally to pause execution when a task is enqueued. If you catch this exception, the workflow will hang forever or behave unpredictably. + +**Wrong:** + +```python +class MyDoctype(WorkflowBuilder): + + @task + def my_task(self): + try: + self.some_other_task() + except Exception as e: # This catches PressWorkflowTaskEnqueued! + print(f"Error: {e}") +``` + +**Correct:** + +```python +from press_agent_manager.workflow_engine.doctype.press_workflow.exceptions import ( + PressWorkflowTaskEnqueued, +) + +class MyDoctype(WorkflowBuilder): + + @task + def my_task(self): + try: + self.some_other_task() + except PressWorkflowTaskEnqueued: + raise # Always re-raise this + except Exception as e: + print(f"Error: {e}") +``` + +--- + +## Checking Workflow Status + +```python +workflow = frappe.get_doc("Press Workflow", workflow_name) + +# Basic info +print(workflow.status) # "Queued", "Running", "Success", "Failure", "Fatal" +print(workflow.stdout) # Captured print output +print(workflow.duration) # Total execution time + +# Get result (raises if still running or failed) +# If the flow failed, get_result() re-raises the original exception +from press_agent_manager.workflow_engine.doctype.press_workflow.exceptions import ( + PressWorkflowRunningError, + PressWorkflowFailedError, + PressWorkflowFatalError, +) + +try: + result = workflow.get_result() +except PressWorkflowRunningError: + print("Still running...") +except PressWorkflowFailedError as e: + print(f"Failed: {e}") +except PressWorkflowFatalError as e: + # Fatal = internal issue or bug in the workflow engine itself + # (not an error in your flow code) + print(f"Fatal error: {e}") +except Exception as e: + # The actual exception raised during flow execution + print(f"Original error: {e}") +``` + +### Viewing step progress + +```python +for step in workflow.steps: + print(f"{step.step_title}: {step.status}") +``` + +--- + +## How It Works + +### Workflow creation + +When you call `.run_as_workflow()`: + +1. The decorator parses the flow method's AST to find `self.task_name()` calls +2. Creates a `Press Workflow` doc linked to your document +3. Pre-registers discovered tasks as workflow steps +4. Enqueues the workflow via `frappe.enqueue_doc()` + +### Task execution + +When the workflow runs and hits a `@task` call: + +1. Generates a signature from method name + arguments (+ optional task_id) +2. Checks if a task with that signature already exists: + - **Exists and succeeded**: Returns cached result immediately + - **Exists and running/queued**: Raises `PressWorkflowTaskEnqueued` to pause + - **Doesn't exist**: Creates new `Press Workflow Task` and enqueues it +3. The workflow pauses until the task completes + +### Resumption + +After a task finishes: + +1. It re-enqueues its parent (either another task or the main workflow) +2. Parent runs again, hits the same task call +3. This time the task exists and succeeded, so it returns the cached result +4. Execution continues to the next task + +### Deduplication + +Tasks are deduplicated by signature: + +- `method_name` + canonicalized `(args, kwargs)` + optional `task_id` +- Same signature = same task (returns cached result) +- Different params or task_id = new task + +--- + +## Configuration + +Set queue names in `site_config.json`: + +```json +{ + "press_workflow_queue": "short", + "press_workflow_task_queue": "default" +} +``` + +## Tips + +- First line of a task/flow docstring becomes the step title in the UI +- Always use `.with_task_id()` when calling the same task multiple times +- Use `self.kv` for sharing data between tasks, never in-memory variables +- Tasks that are never reached (due to conditionals) are marked as "Skipped" +- Exceptions in tasks are captured and re-raised when the workflow resumes diff --git a/press/workflow_engine/__init__.py b/press/workflow_engine/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/press/workflow_engine/doctype/__init__.py b/press/workflow_engine/doctype/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/press/workflow_engine/doctype/press_workflow/__init__.py b/press/workflow_engine/doctype/press_workflow/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/press/workflow_engine/doctype/press_workflow/decorators.py b/press/workflow_engine/doctype/press_workflow/decorators.py new file mode 100644 index 00000000000..fa46a01eeec --- /dev/null +++ b/press/workflow_engine/doctype/press_workflow/decorators.py @@ -0,0 +1,188 @@ +# Copyright (c) 2026, Frappe and contributors +# For license information, please see license.txt +from __future__ import annotations + +import inspect +import typing +from functools import wraps +from typing import Any, Concatenate, Generic, ParamSpec, Protocol, TypeVar, overload + +import frappe +from frappe.model.document import Document + +from press.workflow_engine.doctype.press_workflow.workflow_builder import WorkflowBuilder +from press.workflow_engine.doctype.press_workflow_object.press_workflow_object import ( + PressWorkflowObject, +) +from press.workflow_engine.utils import ( + called_methods_in_order, + is_func_accept_task_id, + method_title, +) + +if typing.TYPE_CHECKING: + from collections.abc import Callable + from inspect import Signature + + +def _in_workflow_execution(instance: WorkflowBuilder | None) -> bool: + return bool( + isinstance(instance, WorkflowBuilder) + and getattr(instance, "name", None) + and getattr(instance, "workflow_name", None) + and getattr(instance.flags, "in_press_workflow_execution", False) + ) + + +_P = ParamSpec("_P") +_R = TypeVar("_R") +_R_co = TypeVar("_R_co", covariant=True) + + +class _BoundTask(Generic[_P, _R_co]): + def __init__(self, wrapped: Callable[..., _R_co], instance: WorkflowBuilder) -> None: + self._wrapped = wrapped + self._instance = instance + self._task_id: str | None = None + wraps(wrapped)(self) + + def _execute(self, args: tuple, kwargs: dict) -> _R_co: + if self._task_id is not None: + kwargs = {**kwargs, "task_id": self._task_id} + + if _in_workflow_execution(self._instance): + return self._instance.run_task(self._wrapped, args, kwargs) # type: ignore[return-value] + + if not is_func_accept_task_id(self._wrapped): + kwargs = {k: v for k, v in kwargs.items() if k != "task_id"} + return self._wrapped(self._instance, *args, **kwargs) + + def __call__(self, *args: _P.args, **kwargs: _P.kwargs) -> _R_co: + return self._execute(args, kwargs) # type: ignore[arg-type] + + def with_task_id(self, task_id: str) -> "_BoundTask[_P, _R_co]": + bound: _BoundTask[_P, _R_co] = _BoundTask(self._wrapped, self._instance) + bound._task_id = task_id + return bound # type: ignore[return-value] + + +class _TaskDescriptor(Generic[_P, _R_co]): + def __init__(self, wrapped: Callable[Concatenate[Any, _P], _R_co]) -> None: + self._wrapped = wrapped + wraps(wrapped)(self) # type: ignore[arg-type] + + def __set_name__(self, owner: type, name: str) -> None: + self._name = name + + @overload + def __get__(self, instance: None, owner: type) -> "_TaskDescriptor[_P, _R_co]": ... + + @overload + def __get__(self, instance: Any, owner: type) -> _BoundTask[_P, _R_co]: ... + + def __get__(self, instance: Any, owner: type) -> Any: + if instance is None: + return self + return _BoundTask(self._wrapped, instance) + + +def task(wrapped: Callable[Concatenate[Any, _P], _R]) -> _TaskDescriptor[_P, _R]: + """Mark a method as a workflow task. + + When called inside an active workflow execution, the method is handed off + to the workflow engine (enqueued, tracked, retried). Outside a workflow + it behaves like a normal method call. Supports `.with_task_id()` to + attach an explicit task identifier. + """ + return _TaskDescriptor(wrapped) + + +_Self = TypeVar("_Self") + + +class FlowCallable(Protocol[_P, _R_co]): + def __call__(self, *args: _P.args, **kwargs: _P.kwargs) -> _R_co: ... + def run_as_workflow(self, *args: _P.args, **kwargs: _P.kwargs) -> str: ... + + +class BoundFlow: + def __init__(self, instance: Any, signature_without_self: Signature, wrapped: Callable) -> None: + self._instance = instance + self._wrapped = wrapped + self._signature_without_self = signature_without_self + + def __call__(self, *args: Any, **kwargs: Any) -> Any: + return self._wrapped(self._instance, *args, **kwargs) + + def run_as_workflow(self, *args: Any, **kwargs: Any) -> str: + self._signature_without_self.bind(*args, **kwargs) + + instance = self._instance + if not isinstance(instance, WorkflowBuilder): + raise TypeError( + "run_as_workflow() can only be called on instances of WorkflowBuilder subclasses." + ) + + methods = called_methods_in_order(type(instance), self._wrapped) + + seen: set[str] = set() + methods = [m for m in methods if not (m[0] in seen or seen.add(m[0]))] # type: ignore[func-returns-value] + + return ( + frappe.get_doc( + { + "doctype": "Press Workflow", + "args": PressWorkflowObject.store(args) if args else None, + "kwargs": PressWorkflowObject.store(kwargs) if kwargs else None, + "linked_doctype": instance.doctype, # type: ignore + "linked_docname": instance.name, # type: ignore + "main_method_name": self._wrapped.__name__, + "main_method_title": method_title(self._wrapped), + "steps": [ + { + "step_title": title, + "step_method": name, + "status": "Pending", + } + for name, title in methods + ], + } + ) + .insert(ignore_permissions=True) + .name + ) + + +@overload +def flow(wrapped: Callable[Concatenate[_Self, _P], _R_co]) -> FlowCallable[_P, _R_co]: ... + + +@overload +def flow(wrapped: Callable[..., Any]) -> Any: ... + + +def flow(wrapped: Callable[..., Any]) -> Any: + """Mark a method as a workflow flow. + + Direct calls execute the method normally. Calling `.run_as_workflow()` + instead creates a Press Workflow document, auto-discovers the `self.*` + task calls via AST inspection, and registers them as tracked steps. + """ + sig = inspect.signature(wrapped) + params = list(sig.parameters.values()) + sig_without_self = sig.replace(parameters=params[1:] if params and params[0].name == "self" else params) + + class FlowDescriptor: + def __set_name__(self, owner: type, name: str) -> None: + if not issubclass(owner, Document): + raise TypeError( + f"@flow can only decorate methods on Document subclasses. " + f"'{owner.__name__}' does not inherit from Document." + ) + + def __get__(self, instance: Any, owner: type) -> Any: + if instance is None: + return self + return BoundFlow(instance=instance, signature_without_self=sig_without_self, wrapped=wrapped) + + return FlowDescriptor() diff --git a/press/workflow_engine/doctype/press_workflow/exceptions.py b/press/workflow_engine/doctype/press_workflow/exceptions.py new file mode 100644 index 00000000000..8174a391be0 --- /dev/null +++ b/press/workflow_engine/doctype/press_workflow/exceptions.py @@ -0,0 +1,31 @@ +# Copyright (c) 2026, Frappe and contributors +# For license information, please see license.txt + + +class PressWorkflowTaskEnqueued(Exception): + """Raised when a task is enqueued and the flow needs to be paused.""" + + def __init__(self, message: str, workflow_name: str, task_name: str): + super().__init__(message) + self.workflow_name = workflow_name + self.task_name = task_name + + +class PressWorkflowRunningError(Exception): + """Raised when trying to get result of a running or queued workflow.""" + + pass + + +class PressWorkflowFailedError(Exception): + """Raised when the workflow failed but no exception was recorded.""" + + pass + + +class PressWorkflowFatalError(Exception): + """Raised when the workflow encountered a fatal error.""" + + def __init__(self, message: str, traceback: str | None = None): + super().__init__(message) + self.traceback = traceback diff --git a/press/workflow_engine/doctype/press_workflow/press_workflow.js b/press/workflow_engine/doctype/press_workflow/press_workflow.js new file mode 100644 index 00000000000..6f82d82f35d --- /dev/null +++ b/press/workflow_engine/doctype/press_workflow/press_workflow.js @@ -0,0 +1,8 @@ +// Copyright (c) 2026, Frappe and contributors +// For license information, please see license.txt + +// frappe.ui.form.on("Press Workflow", { +// refresh(frm) { + +// }, +// }); diff --git a/press/workflow_engine/doctype/press_workflow/press_workflow.json b/press/workflow_engine/doctype/press_workflow/press_workflow.json new file mode 100644 index 00000000000..e3503645a35 --- /dev/null +++ b/press/workflow_engine/doctype/press_workflow/press_workflow.json @@ -0,0 +1,245 @@ +{ + "actions": [], + "autoname": "hash", + "creation": "2026-02-28 18:27:35.577710", + "doctype": "DocType", + "engine": "InnoDB", + "field_order": [ + "status", + "column_break_lkci", + "linked_doctype", + "column_break_xuyw", + "linked_docname", + "section_break_bicj", + "main_method_name", + "main_method_title", + "column_break_ccie", + "args", + "kwargs", + "kv_storage_section", + "key_value_store", + "section_break_zpgq", + "start", + "column_break_oqfs", + "end", + "column_break_nxoy", + "duration", + "section_break_djui", + "steps", + "section_break_pfpj", + "output", + "column_break_lhnh", + "exception", + "section_break_xglm", + "stdout", + "traceback" + ], + "fields": [ + { + "fieldname": "linked_doctype", + "fieldtype": "Link", + "in_list_view": 1, + "in_standard_filter": 1, + "label": "Linked Doctype", + "options": "DocType", + "reqd": 1, + "set_only_once": 1 + }, + { + "fieldname": "column_break_ccie", + "fieldtype": "Column Break" + }, + { + "fieldname": "linked_docname", + "fieldtype": "Dynamic Link", + "in_list_view": 1, + "in_standard_filter": 1, + "label": "Linked Docname", + "options": "linked_doctype", + "reqd": 1, + "set_only_once": 1 + }, + { + "default": "Queued", + "fieldname": "status", + "fieldtype": "Select", + "in_list_view": 1, + "in_standard_filter": 1, + "label": "Status", + "options": "Queued\nRunning\nSuccess\nFailure\nFatal", + "read_only": 1, + "reqd": 1 + }, + { + "fieldname": "column_break_lkci", + "fieldtype": "Column Break" + }, + { + "fieldname": "column_break_xuyw", + "fieldtype": "Column Break" + }, + { + "fieldname": "section_break_bicj", + "fieldtype": "Section Break", + "label": "Execution Details" + }, + { + "fieldname": "output", + "fieldtype": "Link", + "label": "Output", + "options": "Press Workflow Object", + "read_only": 1 + }, + { + "fieldname": "exception", + "fieldtype": "Link", + "label": "Exception", + "options": "Press Workflow Object", + "read_only": 1 + }, + { + "fieldname": "kv_storage_section", + "fieldtype": "Section Break", + "label": "KV Storage" + }, + { + "fieldname": "key_value_store", + "fieldtype": "Table", + "label": "Key Value Store", + "options": "Press Workflow KV", + "read_only": 1 + }, + { + "fieldname": "start", + "fieldtype": "Datetime", + "label": "Start", + "read_only": 1 + }, + { + "fieldname": "end", + "fieldtype": "Datetime", + "label": "End", + "read_only": 1 + }, + { + "fieldname": "duration", + "fieldtype": "Duration", + "label": "Duration", + "read_only": 1 + }, + { + "fieldname": "main_method_name", + "fieldtype": "Data", + "in_list_view": 1, + "in_standard_filter": 1, + "label": "Main Method Name", + "length": 512, + "reqd": 1, + "set_only_once": 1 + }, + { + "fieldname": "main_method_title", + "fieldtype": "Data", + "label": "Main Method Title", + "length": 512, + "reqd": 1, + "set_only_once": 1 + }, + { + "fieldname": "section_break_pfpj", + "fieldtype": "Section Break", + "label": "Result" + }, + { + "fieldname": "column_break_lhnh", + "fieldtype": "Column Break" + }, + { + "fieldname": "section_break_xglm", + "fieldtype": "Section Break" + }, + { + "fieldname": "stdout", + "fieldtype": "Long Text", + "label": "Stdout", + "read_only": 1 + }, + { + "fieldname": "traceback", + "fieldtype": "Long Text", + "label": "Traceback", + "read_only": 1 + }, + { + "fieldname": "args", + "fieldtype": "Link", + "label": "Args", + "options": "Press Workflow Object", + "set_only_once": 1 + }, + { + "fieldname": "kwargs", + "fieldtype": "Link", + "label": "Kwargs", + "options": "Press Workflow Object", + "set_only_once": 1 + }, + { + "fieldname": "section_break_zpgq", + "fieldtype": "Section Break", + "label": "Progress" + }, + { + "fieldname": "column_break_oqfs", + "fieldtype": "Column Break" + }, + { + "fieldname": "column_break_nxoy", + "fieldtype": "Column Break" + }, + { + "fieldname": "section_break_djui", + "fieldtype": "Section Break" + }, + { + "fieldname": "steps", + "fieldtype": "Table", + "label": "Steps", + "options": "Press Workflow Step", + "read_only": 1 + } + ], + "grid_page_length": 50, + "index_web_pages_for_search": 1, + "links": [ + { + "link_doctype": "Press Workflow Task", + "link_fieldname": "workflow" + } + ], + "modified": "2026-03-11 00:51:08.486677", + "modified_by": "Administrator", + "module": "Workflow Engine", + "name": "Press Workflow", + "naming_rule": "Random", + "owner": "Administrator", + "permissions": [ + { + "create": 1, + "delete": 1, + "email": 1, + "export": 1, + "print": 1, + "read": 1, + "report": 1, + "role": "System Manager", + "share": 1, + "write": 1 + } + ], + "row_format": "Dynamic", + "rows_threshold_for_grid_search": 20, + "sort_field": "creation", + "sort_order": "DESC", + "states": [] +} diff --git a/press/workflow_engine/doctype/press_workflow/press_workflow.py b/press/workflow_engine/doctype/press_workflow/press_workflow.py new file mode 100644 index 00000000000..25ec0d8a493 --- /dev/null +++ b/press/workflow_engine/doctype/press_workflow/press_workflow.py @@ -0,0 +1,227 @@ +# Copyright (c) 2026, Frappe and contributors +# For license information, please see license.txt + +from __future__ import annotations + +import io +from contextlib import redirect_stdout +from typing import TYPE_CHECKING + +import frappe +from frappe.model.document import Document +from frappe.utils import now_datetime + +from press.workflow_engine.doctype.press_workflow.exceptions import ( + PressWorkflowFailedError, + PressWorkflowFatalError, + PressWorkflowRunningError, + PressWorkflowTaskEnqueued, +) +from press.workflow_engine.doctype.press_workflow_object.press_workflow_object import ( + PressWorkflowObject, +) +from press.workflow_engine.utils import calculate_duration + +if TYPE_CHECKING: + from press.workflow_engine.doctype.press_workflow.workflow_builder import WorkflowBuilder + from press.workflow_engine.doctype.press_workflow_step.press_workflow_step import ( + PressWorkflowStep, + ) + + +class PressWorkflow(Document): + # begin: auto-generated types + # This code is auto-generated. Do not modify anything in this block. + + from typing import TYPE_CHECKING + + if TYPE_CHECKING: + from frappe.types import DF + + from press.workflow_engine.doctype.press_workflow_kv.press_workflow_kv import ( + PressWorkflowKV, + ) + from press.workflow_engine.doctype.press_workflow_step.press_workflow_step import ( + PressWorkflowStep, + ) + + args: DF.Link | None + duration: DF.Duration | None + end: DF.Datetime | None + exception: DF.Link | None + key_value_store: DF.Table[PressWorkflowKV] + kwargs: DF.Link | None + linked_docname: DF.DynamicLink + linked_doctype: DF.Link + main_method_name: DF.Data + main_method_title: DF.Data + output: DF.Link | None + start: DF.Datetime | None + status: DF.Literal["Queued", "Running", "Success", "Failure", "Fatal"] + stdout: DF.LongText | None + steps: DF.Table[PressWorkflowStep] + traceback: DF.LongText | None + # end: auto-generated types + + def after_insert(self): + enqueue_workflow(self.name) # type: ignore + + def run(self): # noqa: C901 - best to keep it in one place + if not self.linked_doctype or not self.linked_docname: + frappe.throw("Cannot run flow without linked_doctype and linked_docname", frappe.ValidationError) + return + + try: + reference_doc: WorkflowBuilder = frappe.get_doc(self.linked_doctype, self.linked_docname) # type: ignore + reference_doc.workflow_name = self.name + reference_doc.flags.in_press_workflow_execution = True + + args = PressWorkflowObject.get_object(self.args) if self.args else () + kwargs = PressWorkflowObject.get_object(self.kwargs) if self.kwargs else {} + except Exception: + self.status = "Fatal" + self.traceback = frappe.get_traceback() + self.save() + return + + output = None + exception = None + status = "Running" + buffer = io.StringIO() + start = now_datetime() + + # Mark as Running immediately so the scheduler won't re-enqueue it + # while this execution is in progress. + self.status = "Running" + if not self.start: + self.start = start + self.save() + + if not frappe.flags.in_test: + frappe.db.commit() # nosemgrep + + try: + with redirect_stdout(buffer): + result = getattr(reference_doc, self.main_method_name)(*args, **kwargs) + + if result is not None: + output = PressWorkflowObject.store(result) # type: ignore + status = "Success" + except PressWorkflowTaskEnqueued: + # This is expected when a task is enqueued. + # The workflow will be resumed when the task is executed. + pass + except Exception as e: + exception = PressWorkflowObject.store(e, throw_on_error=False) + status = "Failure" + finally: + self.reload() + + if not self.start: + self.start = start + + if status in ["Success", "Failure"] and not self.end: + self.end = now_datetime() + + if self.start and self.end and not self.duration: + self.duration = calculate_duration(self.start, self.end) + + self.status = status + self.output = output + self.stdout = (self.stdout or "") + buffer.getvalue() + + if frappe.flags.in_test and self.stdout: + print(self.stdout) + + self.exception = exception + + self.update_skipped_steps_status(save=False) + self.save() + + def update_skipped_steps_status(self, save: bool = True): # noqa: C901 - best to keep it in one place + is_updated = False + + if self.status in ["Success", "Failure"]: + # Mark steps as skipped if they don't have a task associated + for step in self.steps: + if step.task: + continue + step.status = "Skipped" + is_updated = True + else: + # Mark steps as skipped if the next step has a task associated + previous_steps_with_no_task: list[PressWorkflowStep] = [] + for step in self.steps: + if step.task: + if previous_steps_with_no_task: + for s in previous_steps_with_no_task: + s.status = "Skipped" + is_updated = True + previous_steps_with_no_task = [] + else: + previous_steps_with_no_task.append(step) + + if is_updated and save: + self.save() + + def get_result(self): + if self.status in ["Queued", "Running"]: + raise PressWorkflowRunningError( + f"Workflow {self.name} is currently {'running' if self.status == 'Running' else 'queued'}" + ) + + if self.status == "Success": + if self.output: + return PressWorkflowObject.get_object(self.output) + return None + + if self.status == "Failure": + if self.exception: + exc = PressWorkflowObject.get_object(self.exception) + if isinstance(exc, Exception): + raise exc + raise PressWorkflowFailedError(f"Workflow failed with exception: {exc}") + raise PressWorkflowFailedError("Workflow failed but no exception was recorded.") + + if self.status == "Fatal": + raise PressWorkflowFatalError("Workflow encountered a fatal error.", traceback=self.traceback) + + return None + + +def enqueue_workflow(workflow_name: str) -> None: + if frappe.flags.in_test: + from press.utils.test import foreground_enqueue_workflow + + foreground_enqueue_workflow(workflow_name) + return + + frappe.enqueue_doc( + "Press Workflow", + workflow_name, + method="run", + queue=frappe.conf.get("press_workflow_queue", "short"), + timeout=3600, + deduplicate=True, + enqueue_after_commit=True, + job_id=f"press_workflow||{workflow_name}||run", + ) + + +def retry_workflows(): + workflows = frappe.get_all( + "Press Workflow", + filters={"status": ("in", ["Queued", "Running"])}, + pluck="name", + order_by="modified asc", + ) + for workflow_name in workflows: + try: + enqueue_workflow(workflow_name) + except Exception as e: + frappe.log_error( + "Error Processing workflow", + message=str(e), + reference_doctype="Press Workflow", + reference_name=workflow_name, + ) diff --git a/press/workflow_engine/doctype/press_workflow/test_press_workflow.py b/press/workflow_engine/doctype/press_workflow/test_press_workflow.py new file mode 100644 index 00000000000..510c4dc5939 --- /dev/null +++ b/press/workflow_engine/doctype/press_workflow/test_press_workflow.py @@ -0,0 +1,109 @@ +# Copyright (c) 2026, Frappe and Contributors +# See license.txt + +from typing import TYPE_CHECKING +from unittest.mock import patch + +import frappe +from frappe.tests.utils import FrappeTestCase + +from press.utils.test import foreground_enqueue, foreground_enqueue_doc + +if TYPE_CHECKING: + from press.workflow_engine.doctype.press_workflow.press_workflow import ( + PressWorkflow, + ) + from press.workflow_engine.doctype.press_workflow_test.press_workflow_test import ( + PressWorkflowTest, + ) + + +@patch("frappe.enqueue_doc", new=foreground_enqueue_doc) +@patch("frappe.enqueue", new=foreground_enqueue) +@patch("frappe.db.commit", new=lambda: None) # No-op commit to avoid issues with transactions in tests +class IntegrationTestPressWorkflow(FrappeTestCase): + def setUp(self): + frappe.db.delete("Press Workflow") + frappe.db.delete("Press Workflow Task") + frappe.db.delete("Press Workflow Object") + self.doc: PressWorkflowTest = frappe.get_doc( + { + "doctype": "Press Workflow Test", + "input_a": 3, + "input_b": 2, + } + ) # type: ignore + self.doc.insert() + + def tearDown(self): + frappe.db.delete("Press Workflow") + frappe.db.delete("Press Workflow Task") + frappe.db.delete("Press Workflow Object") + self.doc.delete() + + def get_wf(self, workflow_name: str) -> "PressWorkflow": + wf = frappe.get_doc("Press Workflow", workflow_name) + wf.reload() + return wf # type: ignore + + def test_workflow_success(self): + wf = self.get_wf(self.doc.main_success.run_as_workflow()) + self.assertEqual(wf.status, "Success") + self.assertEqual(wf.get_result(), "success output") + + def test_workflow_fail(self): + wf = self.get_wf(self.doc.main_fail.run_as_workflow()) + self.assertEqual(wf.status, "Failure") + with self.assertRaises(Exception) as e: + wf.get_result() + self.assertTrue("mock failure" in str(e.exception)) + + def test_workflow_with_task_success(self): + wf = self.get_wf(self.doc.main_with_task.run_as_workflow()) + self.assertEqual(wf.status, "Success") + self.assertEqual(wf.get_result(), "task done") + + def test_workflow_with_nested_task(self): + wf = self.get_wf(self.doc.main_with_nested_task.run_as_workflow()) + self.assertEqual(wf.status, "Success") + self.assertEqual(wf.get_result(), "task done") + + def test_workflow_with_failing_task(self): + wf = self.get_wf(self.doc.main_with_failing_task.run_as_workflow()) + self.assertEqual(wf.status, "Failure") + with self.assertRaises(Exception) as e: + wf.get_result() + self.assertTrue("task failed" in str(e.exception)) + + def test_workflow_with_args_task(self): + wf = self.get_wf(self.doc.main_with_args_task.run_as_workflow()) + self.assertEqual(wf.status, "Success") + self.assertEqual(wf.get_result(), 5) + + def test_workflow_with_task_id_loop(self): + wf = self.get_wf(self.doc.main_with_task_id_loop.run_as_workflow()) + self.assertEqual(wf.status, "Success") + self.assertEqual(wf.get_result(), 9) + + def test_workflow_with_task_id_passthrough(self): + wf = self.get_wf(self.doc.main_with_task_id_passthrough.run_as_workflow()) + self.assertEqual(wf.status, "Success") + self.assertEqual(wf.get_result(), 9) + tasks = frappe.get_all("Press Workflow Task", filters={"workflow": wf.name}, pluck="stdout") + self.assertTrue(any("power_mult_0" in str(t) for t in tasks)) + self.assertTrue(any("power_mult_1" in str(t) for t in tasks)) + + def test_workflow_with_noisy_task(self): + wf = self.get_wf(self.doc.main_with_noisy_task.run_as_workflow()) + self.assertEqual(wf.status, "Success") + self.assertEqual(wf.get_result(), "done") + + def test_workflow_as_flow(self): + wf = self.get_wf(self.doc.main_as_flow.run_as_workflow()) + self.assertEqual(wf.status, "Success") + self.assertEqual(wf.get_result(), "flow done") + + def test_flow_with_args(self): + wf = self.get_wf(self.doc.flow_with_args.run_as_workflow(x=4, y=5)) + self.assertEqual(wf.status, "Success") + self.assertEqual(wf.get_result(), 9) diff --git a/press/workflow_engine/doctype/press_workflow/workflow_builder.py b/press/workflow_engine/doctype/press_workflow/workflow_builder.py new file mode 100644 index 00000000000..70c5920cadb --- /dev/null +++ b/press/workflow_engine/doctype/press_workflow/workflow_builder.py @@ -0,0 +1,182 @@ +# Copyright (c) 2026, Frappe and contributors +# For license information, please see license.txt + +from collections.abc import Callable +from functools import wraps +from typing import TYPE_CHECKING, Any, Literal, TypeVar + +import frappe +from frappe.model.document import Document + +from press.workflow_engine.doctype.press_workflow.exceptions import PressWorkflowTaskEnqueued +from press.workflow_engine.doctype.press_workflow_kv.press_workflow_kv import ( + InMemoryKVStore, + KVStoreInterface, + WorkflowKVStore, +) +from press.workflow_engine.doctype.press_workflow_object.press_workflow_object import ( + ObjectDeserializeError, + ObjectPreviousSerializationFailedError, + PressWorkflowObject, +) +from press.workflow_engine.utils import ( + generate_function_signature, + is_func_accept_task_id, + method_title, +) + +if TYPE_CHECKING: + from press.workflow_engine.doctype.press_workflow_task.press_workflow_task import ( + PressWorkflowTask, + ) + + +F1 = TypeVar("F1", bound=Callable[..., Any]) + + +def ensure_to_resolve_context(fn: F1) -> F1: + @wraps(fn) + def wrapper(self: "WorkflowBuilder", *args, **kwargs): + self.resolve_context() + return fn(self, *args, **kwargs) + + return wrapper # type: ignore + + +class WorkflowBuilder(Document): + workflow_name: str | None = None + workflow_doc = None + kv_store_type: Literal["in_memory", "workflow_store"] = "in_memory" + kv_store_reference: KVStoreInterface | None = None + current_task_signature: str | None = None + + @ensure_to_resolve_context + def run_task(self, wrapped: Callable[..., object], args: tuple, kwargs: dict) -> Any: # noqa: C901 best to keep task execution logic in one place + assert self.workflow_name is not None, "Workflow name must be set to enqueue task" + + signature = generate_function_signature(wrapped, args, kwargs) + if self.current_task_signature and self.current_task_signature == signature: + new_kwargs = kwargs.copy() + if not is_func_accept_task_id(wrapped): + new_kwargs.pop("task_id", None) + + return wrapped(self, *args, **new_kwargs) + + task_name: str | None = frappe.db.exists( + "Press Workflow Task", + { + "workflow": self.workflow_name, + "signature": signature, + }, + ) # type: ignore + + if not task_name: + task_doc: PressWorkflowTask = frappe.new_doc("Press Workflow Task") # type: ignore + task_doc.workflow = self.workflow_name # type: ignore + task_doc.method_name = wrapped.__name__ # type: ignore + + task_doc.method_title = method_title(wrapped) # type: ignore + + task_doc.signature = signature # type: ignore + task_doc.args = PressWorkflowObject.store(args) if args else None # type: ignore + task_doc.kwargs = PressWorkflowObject.store(kwargs) if kwargs else None # type: ignore + task_doc.status = "Queued" # type: ignore + + # If we are currently inside a running task, record it as the parent + # so the new task can re-enqueue it when it completes. + task_doc.parent_task = getattr(self.flags, "current_press_workflow_task", None) # type: ignore + task_doc.insert(ignore_permissions=True) + + # If workflow want to monitor this step + # Store the reference of the task in workflow doctype + # If it's a nested task, ignore it + if not task_doc.parent_task and ( + tracked_step := str( + frappe.db.exists( + "Press Workflow Step", + { + "parenttype": "Press Workflow", + "parent": self.workflow_name, + "step_method": wrapped.__name__, + "task": ("is", "not set"), + }, + ) + ) + ): + frappe.db.set_value("Press Workflow Step", tracked_step, "task", task_doc.name) + + task_name = task_doc.name + assert task_name, "Task must be saved successfully before it can be run" + + status = frappe.db.get_value("Press Workflow Task", task_name, "status") + if status in ["Queued", "Running"]: + raise PressWorkflowTaskEnqueued( + f"Task {task_name} is in {status} state", + self.workflow_name, + task_name, + ) + + task_doc: PressWorkflowTask = frappe.get_doc("Press Workflow Task", task_name) # type: ignore + if task_doc.status == "Success": + return PressWorkflowObject.get_object(task_doc.output) if task_doc.output else None + + if task_doc.status == "Failure": + if task_doc.exception: + try: + exc = PressWorkflowObject.get_object(task_doc.exception) + except ObjectPreviousSerializationFailedError as e: + raise RuntimeError( + f"Task '{task_doc.method_title}' failed. Original exception could not be " + f"deserialized. Summary: {e.summary}" + ) from e + except ObjectDeserializeError as e: + raise RuntimeError( + f"Task '{task_doc.method_title}' failed and its exception could not be deserialized: {e}" + ) from e + if isinstance(exc, BaseException): + raise exc + raise RuntimeError( + f"Task '{task_doc.method_title}' failed with a non-exception object: {exc!r}" + ) + raise RuntimeError(f"Task '{task_doc.method_title}' failed with no recorded exception.") + + raise Exception(f"Task {task_doc.method_title} is in an unknown state: {task_doc.status}") + + @property + @ensure_to_resolve_context + def kv(self) -> KVStoreInterface: + if self.kv_store_reference: + return self.kv_store_reference + + if self.kv_store_type == "in_memory": + # Fallback for calls outside of workflow context + self.kv_store_reference = InMemoryKVStore() + + elif self.kv_store_type == "workflow_store": + assert self.workflow_name is not None, ( + "Workflow name must be set in frappe.flags.current_workflow to use workflow_store KV store." + ) + + self.kv_store_reference = WorkflowKVStore(self.workflow_name) + + else: + raise ValueError(f"Invalid KV store type: {self.kv_store_type}") + + return self.kv_store_reference + + def resolve_context(self) -> None: + if self.workflow_name or self.workflow_doc: + return + + current_workflow = getattr(frappe.flags, "current_press_workflow", None) + if current_workflow: + self.workflow_name = str(current_workflow) + self.workflow_doc = frappe.get_doc("Press Workflow", self.workflow_name) # type: ignore + if self.kv_store_type != "workflow_store": + # Store type is changing — discard any cached in-memory store. + self.kv_store_type = "workflow_store" + self.kv_store_reference = None + else: + if self.kv_store_type != "in_memory": + self.kv_store_type = "in_memory" + self.kv_store_reference = None diff --git a/press/workflow_engine/doctype/press_workflow_kv/__init__.py b/press/workflow_engine/doctype/press_workflow_kv/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/press/workflow_engine/doctype/press_workflow_kv/press_workflow_kv.json b/press/workflow_engine/doctype/press_workflow_kv/press_workflow_kv.json new file mode 100644 index 00000000000..2d828c74343 --- /dev/null +++ b/press/workflow_engine/doctype/press_workflow_kv/press_workflow_kv.json @@ -0,0 +1,45 @@ +{ + "actions": [], + "autoname": "hash", + "creation": "2026-03-03 21:13:02.443752", + "doctype": "DocType", + "engine": "InnoDB", + "field_order": [ + "key", + "value" + ], + "fields": [ + { + "fieldname": "key", + "fieldtype": "Data", + "in_list_view": 1, + "label": "Key", + "reqd": 1, + "search_index": 1 + }, + { + "fieldname": "value", + "fieldtype": "Link", + "in_list_view": 1, + "label": "Value", + "options": "Press Workflow Object", + "reqd": 1 + } + ], + "grid_page_length": 50, + "index_web_pages_for_search": 1, + "istable": 1, + "links": [], + "modified": "2026-03-03 21:15:51.697093", + "modified_by": "Administrator", + "module": "Workflow Engine", + "name": "Press Workflow KV", + "naming_rule": "Random", + "owner": "Administrator", + "permissions": [], + "row_format": "Dynamic", + "rows_threshold_for_grid_search": 20, + "sort_field": "creation", + "sort_order": "DESC", + "states": [] +} diff --git a/press/workflow_engine/doctype/press_workflow_kv/press_workflow_kv.py b/press/workflow_engine/doctype/press_workflow_kv/press_workflow_kv.py new file mode 100644 index 00000000000..a4d89b28d1b --- /dev/null +++ b/press/workflow_engine/doctype/press_workflow_kv/press_workflow_kv.py @@ -0,0 +1,115 @@ +# Copyright (c) 2026, Frappe and contributors +# For license information, please see license.txt + +import abc +from typing import Any + +import frappe +from frappe.model.document import Document + +from press.workflow_engine.doctype.press_workflow_object.press_workflow_object import ( + PressWorkflowObject, +) + + +class KVStoreInterface(abc.ABC): + @abc.abstractmethod + def set(self, key: str, value: Any, throw_on_error: bool = True): + pass + + @abc.abstractmethod + def get(self, key: str) -> Any | None: + pass + + @abc.abstractmethod + def delete(self, key: str): + pass + + +class WorkflowKVStore(KVStoreInterface): + def __init__(self, workflow_name: str) -> None: + self.workflow_name = workflow_name + self.parent_field = "key_value_store" + self.parent_type = "Press Workflow" + + def set(self, key: str, value: Any, throw_on_error: bool = True): + kv_name = self._get_kv_record_name(key) + if kv_name: + kv_doc: PressWorkflowKV = frappe.get_doc("Press Workflow KV", kv_name) # type: ignore + else: + kv_doc = frappe.new_doc("Press Workflow KV") # type: ignore + kv_doc.parent = self.workflow_name + kv_doc.parentfield = self.parent_field + kv_doc.parenttype = self.parent_type + kv_doc.key = key + + if kv_doc.value: + frappe.db.set_value("Press Workflow Object", str(kv_doc.value), "deleted", True) + + kv_doc.value = PressWorkflowObject.store(value, throw_on_error=throw_on_error) + kv_doc.save(ignore_permissions=True) + + def get(self, key: str) -> Any | None: + kv_name = self._get_kv_record_name(key) + if not kv_name: + return None + + object_name = frappe.db.get_value("Press Workflow KV", kv_name, "value") + if not object_name: + return None + + return PressWorkflowObject.get_object(str(object_name)) + + def delete(self, key: str): + kv_name = self._get_kv_record_name(key) + if kv_name: + object_name = frappe.db.get_value("Press Workflow KV", kv_name, "value") + if object_name: + frappe.db.set_value("Press Workflow Object", str(object_name), "deleted", True) + frappe.delete_doc("Press Workflow KV", kv_name, force=True) + + def _get_kv_record_name(self, key: str) -> str | None: + name = frappe.db.exists( + "Press Workflow KV", + { + "parent": self.workflow_name, + "parentfield": self.parent_field, + "parenttype": self.parent_type, + "key": key, + }, + ) + if name: + return str(name) + return None + + +class InMemoryKVStore(KVStoreInterface): + def __init__(self): + self.store = {} + + def set(self, key: str, value: Any, throw_on_error: bool = True): + self.store[key] = value + + def get(self, key: str) -> Any | None: + return self.store.get(key) + + def delete(self, key: str): + if key in self.store: + del self.store[key] + + +class PressWorkflowKV(Document): + # begin: auto-generated types + # This code is auto-generated. Do not modify anything in this block. + + from typing import TYPE_CHECKING + + if TYPE_CHECKING: + from frappe.types import DF + + key: DF.Data + parent: DF.Data + parentfield: DF.Data + parenttype: DF.Data + value: DF.Link + # end: auto-generated types diff --git a/press/workflow_engine/doctype/press_workflow_kv/test_press_workflow_kv.py b/press/workflow_engine/doctype/press_workflow_kv/test_press_workflow_kv.py new file mode 100644 index 00000000000..3af2845a30b --- /dev/null +++ b/press/workflow_engine/doctype/press_workflow_kv/test_press_workflow_kv.py @@ -0,0 +1,69 @@ +# Copyright (c) 2026, Frappe and contributors +# For license information, please see license.txt + +import frappe +from frappe.tests.utils import FrappeTestCase + +from press.workflow_engine.doctype.press_workflow_kv.press_workflow_kv import ( + InMemoryKVStore, + WorkflowKVStore, +) + + +class TestPressWorkflowKV(FrappeTestCase): + def setUp(self): + self.workflow_name = frappe.generate_hash(length=10) + self.store = WorkflowKVStore(workflow_name=self.workflow_name) + + def tearDown(self): + frappe.db.delete("Press Workflow KV", {"parent": self.workflow_name}) + frappe.db.delete("Press Workflow Object") + + def test_in_memory_kv_store(self): + store = InMemoryKVStore() + store.set("test_key", "test_value") + self.assertEqual(store.get("test_key"), "test_value") + + store.delete("test_key") + self.assertIsNone(store.get("test_key")) + + def test_workflow_kv_store_set_and_get(self): + self.store.set("test_key", {"data": 123}) + + value = self.store.get("test_key") + self.assertEqual(value, {"data": 123}) + + def test_workflow_kv_store_update(self): + self.store.set("test_key", "initial_value") + initial_kv_name = self.store._get_kv_record_name("test_key") + initial_obj_name = frappe.db.get_value("Press Workflow KV", initial_kv_name, "value") + + self.store.set("test_key", "updated_value") + updated_kv_name = self.store._get_kv_record_name("test_key") + updated_obj_name = frappe.db.get_value("Press Workflow KV", updated_kv_name, "value") + + self.assertEqual(initial_kv_name, updated_kv_name) + self.assertNotEqual(initial_obj_name, updated_obj_name) + + is_deleted = frappe.db.get_value("Press Workflow Object", initial_obj_name, "deleted") + self.assertTrue(is_deleted) + + value = self.store.get("test_key") + self.assertEqual(value, "updated_value") + + def test_workflow_kv_store_delete(self): + self.store.set("test_key", "to_be_deleted") + kv_name = self.store._get_kv_record_name("test_key") + obj_name = frappe.db.get_value("Press Workflow KV", kv_name, "value") + + self.store.delete("test_key") + + self.assertFalse(frappe.db.exists("Press Workflow KV", kv_name)) + + is_deleted = frappe.db.get_value("Press Workflow Object", obj_name, "deleted") + self.assertTrue(is_deleted) + + self.assertIsNone(self.store.get("test_key")) + + def test_workflow_kv_store_get_nonexistent(self): + self.assertIsNone(self.store.get("nonexistent_key")) diff --git a/press/workflow_engine/doctype/press_workflow_object/__init__.py b/press/workflow_engine/doctype/press_workflow_object/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/press/workflow_engine/doctype/press_workflow_object/press_workflow_object.js b/press/workflow_engine/doctype/press_workflow_object/press_workflow_object.js new file mode 100644 index 00000000000..b1a996a86ef --- /dev/null +++ b/press/workflow_engine/doctype/press_workflow_object/press_workflow_object.js @@ -0,0 +1,8 @@ +// Copyright (c) 2026, Frappe and contributors +// For license information, please see license.txt + +// frappe.ui.form.on("Press Workflow Object", { +// refresh(frm) { + +// }, +// }); diff --git a/press/workflow_engine/doctype/press_workflow_object/press_workflow_object.json b/press/workflow_engine/doctype/press_workflow_object/press_workflow_object.json new file mode 100644 index 00000000000..b24708a6002 --- /dev/null +++ b/press/workflow_engine/doctype/press_workflow_object/press_workflow_object.json @@ -0,0 +1,82 @@ +{ + "actions": [], + "autoname": "hash", + "creation": "2026-02-28 18:28:13.551517", + "doctype": "DocType", + "engine": "InnoDB", + "field_order": [ + "summary", + "type_qualname", + "serialized", + "serialization_failed", + "deleted" + ], + "fields": [ + { + "fieldname": "summary", + "fieldtype": "Data", + "in_list_view": 1, + "label": "Summary", + "length": 512, + "read_only": 1, + "reqd": 1 + }, + { + "fieldname": "type_qualname", + "fieldtype": "Data", + "in_list_view": 1, + "label": "Python Type", + "length": 256, + "read_only": 1, + "reqd": 1 + }, + { + "description": "Base64 Pickled Object", + "fieldname": "serialized", + "fieldtype": "Long Text", + "label": "Serialized", + "read_only": 1 + }, + { + "default": "0", + "fieldname": "serialization_failed", + "fieldtype": "Check", + "label": "Serialization Failed", + "read_only": 1 + }, + { + "default": "0", + "fieldname": "deleted", + "fieldtype": "Check", + "label": "Deleted", + "read_only": 1 + } + ], + "grid_page_length": 50, + "links": [], + "modified": "2026-03-03 21:32:26.447603", + "modified_by": "Administrator", + "module": "Workflow Engine", + "name": "Press Workflow Object", + "naming_rule": "Random", + "owner": "Administrator", + "permissions": [ + { + "create": 1, + "delete": 1, + "email": 1, + "export": 1, + "print": 1, + "read": 1, + "report": 1, + "role": "System Manager", + "share": 1, + "write": 1 + } + ], + "row_format": "Dynamic", + "rows_threshold_for_grid_search": 20, + "sort_field": "creation", + "sort_order": "DESC", + "states": [] +} diff --git a/press/workflow_engine/doctype/press_workflow_object/press_workflow_object.py b/press/workflow_engine/doctype/press_workflow_object/press_workflow_object.py new file mode 100644 index 00000000000..2efda2879f6 --- /dev/null +++ b/press/workflow_engine/doctype/press_workflow_object/press_workflow_object.py @@ -0,0 +1,171 @@ +# Copyright (c) 2026, Frappe and contributors +# For license information, please see license.txt +from __future__ import annotations + +import base64 +import pickle +from typing import Any + +import frappe +from frappe.model.document import Document +from frappe.utils import strip_html_tags + + +class ObjectSerializeError(frappe.ValidationError): + """Raised when an object cannot be serialized into a Press Workflow Object.""" + + pass + + +class ObjectDeserializeError(frappe.ValidationError): + """Raised when a Press Workflow Object cannot be deserialized.""" + + pass + + +class ObjectPreviousSerializationFailedError(ObjectDeserializeError): + """Raised when deserialization fails because the original serialization failed. + The stored object summary is accessible via the `summary` attribute. + """ + + def __init__(self, message: str, summary: str = ""): + super().__init__(message) + self.summary = summary + + +class PressWorkflowObject(Document): + # begin: auto-generated types + # This code is auto-generated. Do not modify anything in this block. + + from typing import TYPE_CHECKING + + if TYPE_CHECKING: + from frappe.types import DF + + deleted: DF.Check + serialization_failed: DF.Check + serialized: DF.LongText | None + summary: DF.Data + type_qualname: DF.Data + # end: auto-generated types + + @staticmethod + def store(obj: Any, throw_on_error: bool = True) -> str: + """ + Serialize and store any Python object. + + Args: + obj: The Python object to serialize. + throw_on_error: If True, raises `PressWorkflowObjectSerializeError` on failure. + If False, stores a summary of the object and flags it as failed. + + Returns: + str: The name of the created `Press Workflow Object` document. + + Raises: + ObjectSerializeError: If pickling fails and `throw_on_error` is True. + """ + type_qualname = f"{type(obj).__module__}.{type(obj).__qualname__}" + if len(type_qualname) > 256: + type_qualname = type_qualname[:250] + "..." + + doc: PressWorkflowObject = frappe.new_doc("Press Workflow Object") # type: ignore + doc.type_qualname = type_qualname + doc.serialization_failed = False + + try: + summary = str(obj) + except Exception: + summary = repr(type(obj)) + + # FW strips < and > from Data fields (treating them as HTML tags). + # If the entire summary looks like an HTML tag (e.g. ), + # it will be stripped to an empty string. + # to prevent this, strip the brackets first, or provide a fallback. + summary_stripped = strip_html_tags(summary).strip() + if not summary_stripped: + summary = f"Instance of {type_qualname}" + else: + # If it didn't strip to empty, we can just replace the brackets to be safe + summary = summary.replace("<", "[").replace(">", "]") + + if len(summary) > 512: + summary = summary[:500] + "..." + doc.summary = summary + + try: + doc.serialized = base64.b64encode(pickle.dumps(obj)).decode("ascii") + except Exception as exc: + if throw_on_error: + raise ObjectSerializeError( + f"Failed to serialize object of type {type_qualname!r}: {exc}" + ) from exc + + doc.serialized = None + doc.serialization_failed = True + + doc.insert(ignore_permissions=True) + return str(doc.name) + + @staticmethod + def get_object(doc_name: str) -> Any: + """ + Retrieves and deserializes a stored Python object by its document name. + + Args: + doc_name: The name of the `Press Workflow Object` document. + + Returns: + Any: The deserialized Python object. + + Raises: + ObjectPreviousSerializationFailedError: If the object originally failed to serialize. + The object's stored summary is accessible via the exception's `summary` attribute. + ObjectDeserializeError: If the document exists but deserialization fails. + """ + doc: PressWorkflowObject = frappe.get_doc("Press Workflow Object", doc_name) # type: ignore + + if doc.serialization_failed: + raise ObjectPreviousSerializationFailedError( + f"Cannot deserialize {doc_name!r}: Serialization previously failed for object of type {doc.type_qualname!r}.", + summary=doc.summary, + ) + + if not doc.serialized: + raise ObjectDeserializeError(f"Object of type {doc.type_qualname!r} has no serialized data.") + + try: + return pickle.loads(base64.b64decode(doc.serialized.encode("ascii"))) + except Exception as exc: + raise ObjectDeserializeError( + f"Failed to deserialize object of type {doc.type_qualname!r}: {exc}" + ) from exc + + @staticmethod + def get_summary(doc_name: str) -> str: + """ + Fetch only the summary of a stored object without deserializing it. + Useful for logging, debugging, or re-raising exceptions efficiently. + + Args: + doc_name: The name of the `Press Workflow Object` document. + + Returns: + str: A string summary of the object. + + Raises: + frappe.DoesNotExistError: If the document doesn't exist. + """ + summary = frappe.db.get_value("Press Workflow Object", doc_name, "summary") + + if summary is None: + frappe.throw( + f"Press Workflow Object {doc_name!r} does not exist.", + exc=frappe.DoesNotExistError, + ) + + return str(summary) + + +def delete_trashed_objects(): + frappe.db.delete("Press Workflow Object", {"deleted": 1}) diff --git a/press/workflow_engine/doctype/press_workflow_object/test_press_workflow_object.py b/press/workflow_engine/doctype/press_workflow_object/test_press_workflow_object.py new file mode 100644 index 00000000000..f1ef91662e6 --- /dev/null +++ b/press/workflow_engine/doctype/press_workflow_object/test_press_workflow_object.py @@ -0,0 +1,80 @@ +# Copyright (c) 2026, Frappe and Contributors +# See license.txt + +import frappe +from frappe.tests import IntegrationTestCase + +from press.workflow_engine.doctype.press_workflow_object.press_workflow_object import ( + ObjectPreviousSerializationFailedError, + ObjectSerializeError, + PressWorkflowObject, +) + +# On IntegrationTestCase, the doctype test records and all +# link-field test record dependencies are recursively loaded +# Use these module variables to add/remove to/from that list +EXTRA_TEST_RECORD_DEPENDENCIES = [] # eg. ["User"] +IGNORE_TEST_RECORD_DEPENDENCIES = [] # eg. ["User"] + + +class MyCustomClass: + def __init__(self, name: str, value: int): + self.name = name + self.value = value + + def __eq__(self, other): + if not isinstance(other, MyCustomClass): + return False + return self.name == other.name and self.value == other.value + + +class IntegrationTestPressWorkflowObject(IntegrationTestCase): + def tearDown(self): + frappe.db.rollback() + + def test_store_and_get_success(self): + obj = {"key": "value", "list": [1, 2, 3], "nested": {"a": "b"}} + doc_name = PressWorkflowObject.store(obj) + + self.assertTrue(doc_name) + retrieved = PressWorkflowObject.get_object(doc_name) + self.assertEqual(retrieved, obj) + + def test_store_and_get_custom_class(self): + obj = MyCustomClass("test_name", 42) + doc_name = PressWorkflowObject.store(obj) + + self.assertTrue(doc_name) + retrieved = PressWorkflowObject.get_object(doc_name) + self.assertIsInstance(retrieved, MyCustomClass) + self.assertEqual(retrieved, obj) + self.assertEqual(retrieved.name, "test_name") + self.assertEqual(retrieved.value, 42) + + def test_store_serialization_error_throw(self): + # Lambdas cannot be pickled + with self.assertRaises(ObjectSerializeError): + PressWorkflowObject.store(lambda x: x, throw_on_error=True) + + def test_store_serialization_error_no_throw(self): + doc_name = PressWorkflowObject.store(lambda x: x, throw_on_error=False) + + # Should store successfully but flag as failed + self.assertTrue(doc_name) + + summary = PressWorkflowObject.get_summary(doc_name) + self.assertIsInstance(summary, str) + + # Getting the object should raise the previous failure exception + with self.assertRaises(ObjectPreviousSerializationFailedError) as context: + PressWorkflowObject.get_object(doc_name) + + self.assertIn("Serialization previously failed", str(context.exception)) + self.assertEqual(context.exception.summary, summary) + + def test_get_summary(self): + obj = [1, 2, 3, "test"] + doc_name = PressWorkflowObject.store(obj) + + summary = PressWorkflowObject.get_summary(doc_name) + self.assertEqual(summary, str(obj)) diff --git a/press/workflow_engine/doctype/press_workflow_step/__init__.py b/press/workflow_engine/doctype/press_workflow_step/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/press/workflow_engine/doctype/press_workflow_step/press_workflow_step.json b/press/workflow_engine/doctype/press_workflow_step/press_workflow_step.json new file mode 100644 index 00000000000..45aec05ba0b --- /dev/null +++ b/press/workflow_engine/doctype/press_workflow_step/press_workflow_step.json @@ -0,0 +1,73 @@ +{ + "actions": [], + "autoname": "hash", + "creation": "2026-03-11 00:41:05.542514", + "doctype": "DocType", + "engine": "InnoDB", + "field_order": [ + "step_title", + "step_method", + "column_break_ryjv", + "status", + "task" + ], + "fields": [ + { + "fieldname": "step_title", + "fieldtype": "Data", + "in_list_view": 1, + "label": "Step Title", + "length": 512, + "read_only": 1, + "reqd": 1 + }, + { + "default": "Pending", + "fieldname": "status", + "fieldtype": "Select", + "in_list_view": 1, + "label": "Status", + "options": "Pending\nRunning\nSuccess\nFailure\nSkipped", + "reqd": 1 + }, + { + "fieldname": "step_method", + "fieldtype": "Data", + "label": "Step Method", + "length": 512, + "read_only": 1, + "reqd": 1, + "search_index": 1 + }, + { + "fieldname": "task", + "fieldtype": "Link", + "in_list_view": 1, + "label": "Task", + "options": "Press Workflow Task", + "read_only": 1, + "search_index": 1 + }, + { + "fieldname": "column_break_ryjv", + "fieldtype": "Column Break" + } + ], + "grid_page_length": 50, + "index_web_pages_for_search": 1, + "istable": 1, + "links": [], + "modified": "2026-03-11 02:34:47.659754", + "modified_by": "Administrator", + "module": "Workflow Engine", + "name": "Press Workflow Step", + "naming_rule": "Random", + "owner": "Administrator", + "permissions": [], + "read_only": 1, + "row_format": "Dynamic", + "rows_threshold_for_grid_search": 20, + "sort_field": "creation", + "sort_order": "DESC", + "states": [] +} diff --git a/press/workflow_engine/doctype/press_workflow_step/press_workflow_step.py b/press/workflow_engine/doctype/press_workflow_step/press_workflow_step.py new file mode 100644 index 00000000000..a36cf1c0ee8 --- /dev/null +++ b/press/workflow_engine/doctype/press_workflow_step/press_workflow_step.py @@ -0,0 +1,25 @@ +# Copyright (c) 2026, Frappe and contributors +# For license information, please see license.txt + +from frappe.model.document import Document + + +class PressWorkflowStep(Document): + # begin: auto-generated types + # This code is auto-generated. Do not modify anything in this block. + + from typing import TYPE_CHECKING + + if TYPE_CHECKING: + from frappe.types import DF + + parent: DF.Data + parentfield: DF.Data + parenttype: DF.Data + status: DF.Literal["Pending", "Running", "Success", "Failure", "Skipped"] + step_method: DF.Data + step_title: DF.Data + task: DF.Link | None + # end: auto-generated types + + pass diff --git a/press/workflow_engine/doctype/press_workflow_task/__init__.py b/press/workflow_engine/doctype/press_workflow_task/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/press/workflow_engine/doctype/press_workflow_task/press_workflow_task.js b/press/workflow_engine/doctype/press_workflow_task/press_workflow_task.js new file mode 100644 index 00000000000..3e9f7bcf26c --- /dev/null +++ b/press/workflow_engine/doctype/press_workflow_task/press_workflow_task.js @@ -0,0 +1,8 @@ +// Copyright (c) 2026, Frappe and contributors +// For license information, please see license.txt + +// frappe.ui.form.on("Press Workflow Task", { +// refresh(frm) { + +// }, +// }); diff --git a/press/workflow_engine/doctype/press_workflow_task/press_workflow_task.json b/press/workflow_engine/doctype/press_workflow_task/press_workflow_task.json new file mode 100644 index 00000000000..f997c7d37cd --- /dev/null +++ b/press/workflow_engine/doctype/press_workflow_task/press_workflow_task.json @@ -0,0 +1,194 @@ +{ + "actions": [], + "autoname": "hash", + "creation": "2026-02-28 18:31:49.055987", + "doctype": "DocType", + "engine": "InnoDB", + "field_order": [ + "workflow", + "status", + "parent_task", + "column_break_frrn", + "method_name", + "method_title", + "args", + "kwargs", + "column_break_fiyw", + "output", + "exception", + "signature", + "section_break_jvoo", + "start", + "column_break_fyrh", + "end", + "column_break_azky", + "duration", + "section_break_stdout", + "stdout" + ], + "fields": [ + { + "fieldname": "workflow", + "fieldtype": "Link", + "in_list_view": 1, + "in_standard_filter": 1, + "label": "Workflow", + "options": "Press Workflow", + "read_only": 1, + "reqd": 1, + "search_index": 1 + }, + { + "fieldname": "parent_task", + "fieldtype": "Link", + "in_list_view": 1, + "in_standard_filter": 1, + "label": "Parent Task", + "options": "Press Workflow Task", + "read_only": 1 + }, + { + "fieldname": "method_name", + "fieldtype": "Data", + "in_list_view": 1, + "label": "Method Name", + "length": 512, + "read_only": 1, + "reqd": 1, + "search_index": 1 + }, + { + "fieldname": "method_title", + "fieldtype": "Data", + "label": "Method Title", + "length": 512, + "read_only": 1, + "reqd": 1 + }, + { + "fieldname": "signature", + "fieldtype": "Data", + "label": "Signature", + "length": 256, + "read_only": 1, + "reqd": 1, + "search_index": 1 + }, + { + "fieldname": "section_break_jvoo", + "fieldtype": "Section Break" + }, + { + "fieldname": "start", + "fieldtype": "Datetime", + "label": "Start", + "read_only": 1 + }, + { + "fieldname": "column_break_fyrh", + "fieldtype": "Column Break" + }, + { + "fieldname": "end", + "fieldtype": "Datetime", + "label": "End", + "read_only": 1 + }, + { + "fieldname": "column_break_azky", + "fieldtype": "Column Break" + }, + { + "fieldname": "duration", + "fieldtype": "Duration", + "label": "Duration", + "read_only": 1 + }, + { + "default": "Queued", + "fieldname": "status", + "fieldtype": "Select", + "in_list_view": 1, + "in_standard_filter": 1, + "label": "Status", + "options": "Queued\nRunning\nSuccess\nFailure", + "read_only": 1, + "reqd": 1, + "search_index": 1 + }, + { + "fieldname": "exception", + "fieldtype": "Link", + "label": "Exception", + "options": "Press Workflow Object", + "read_only": 1 + }, + { + "fieldname": "output", + "fieldtype": "Link", + "label": "Output", + "options": "Press Workflow Object", + "read_only": 1 + }, + { + "fieldname": "column_break_frrn", + "fieldtype": "Column Break" + }, + { + "fieldname": "column_break_fiyw", + "fieldtype": "Column Break" + }, + { + "fieldname": "args", + "fieldtype": "Link", + "label": "Args", + "options": "Press Workflow Object", + "set_only_once": 1 + }, + { + "fieldname": "kwargs", + "fieldtype": "Link", + "label": "Kwargs", + "options": "Press Workflow Object", + "set_only_once": 1 + }, + { + "fieldname": "section_break_stdout", + "fieldtype": "Section Break", + "label": "Output" + }, + { + "fieldname": "stdout", + "fieldtype": "Long Text", + "label": "Stdout", + "read_only": 1 + } + ], + "grid_page_length": 50, + "links": [], + "modified": "2026-03-11 02:26:44.473928", + "modified_by": "Administrator", + "module": "Workflow Engine", + "name": "Press Workflow Task", + "naming_rule": "Random", + "owner": "Administrator", + "permissions": [ + { + "create": 1, + "delete": 1, + "email": 1, + "export": 1, + "print": 1, + "read": 1, + "report": 1, + "role": "System Manager", + "share": 1, + "write": 1 + } + ], + "row_format": "Dynamic", + "rows_threshold_for_grid_search": 20, + "sort_field": "creation", + "sort_order": "DESC", + "states": [] +} diff --git a/press/workflow_engine/doctype/press_workflow_task/press_workflow_task.py b/press/workflow_engine/doctype/press_workflow_task/press_workflow_task.py new file mode 100644 index 00000000000..f5dbbdf7c88 --- /dev/null +++ b/press/workflow_engine/doctype/press_workflow_task/press_workflow_task.py @@ -0,0 +1,223 @@ +# Copyright (c) 2026, Frappe and contributors +# For license information, please see license.txt + +import io +from contextlib import redirect_stdout +from typing import TYPE_CHECKING + +import frappe +from frappe.model.document import Document +from frappe.utils import now_datetime + +from press.workflow_engine.doctype.press_workflow.exceptions import PressWorkflowTaskEnqueued +from press.workflow_engine.doctype.press_workflow.press_workflow import enqueue_workflow +from press.workflow_engine.doctype.press_workflow_object.press_workflow_object import ( + PressWorkflowObject, +) +from press.workflow_engine.utils import calculate_duration + +if TYPE_CHECKING: + from press.workflow_engine.doctype.press_workflow.workflow_builder import WorkflowBuilder + + +class PressWorkflowTask(Document): + # begin: auto-generated types + # This code is auto-generated. Do not modify anything in this block. + + from typing import TYPE_CHECKING + + if TYPE_CHECKING: + from frappe.types import DF + + args: DF.Link | None + duration: DF.Duration | None + end: DF.Datetime | None + exception: DF.Link | None + kwargs: DF.Link | None + method_name: DF.Data + method_title: DF.Data + output: DF.Link | None + parent_task: DF.Link | None + signature: DF.Data + start: DF.Datetime | None + status: DF.Literal["Queued", "Running", "Success", "Failure"] + stdout: DF.LongText | None + workflow: DF.Link + # end: auto-generated types + + def after_insert(self): + enqueue_task(self.name) # type: ignore + + def on_update(self): + self.update_tracked_step_status() + + def update_tracked_step_status(self): + if self.is_new(): + return + + if not self.has_value_changed("status"): + return + + frappe.db.set_value( + "Press Workflow Step", + {"task": self.name}, + "status", + { + "Queued": "Pending", + "Running": "Running", + "Success": "Success", + "Failure": "Failure", + }.get(self.status, "Pending"), + ) + + def run(self): # noqa: C901 - Best to keep workflow execution logic in one place + assert self.name, "Task must be saved before it can be run" + frappe.get_value( + self.doctype, self.name, "name", for_update=(not frappe.flags.in_test) + ) # Lock the document for update to prevent concurrent runs + + workflow_info = frappe.get_value( + "Press Workflow", + self.workflow, + ["name", "status", "linked_docname", "linked_doctype"], + as_dict=True, + ) + + reference_doc: WorkflowBuilder = frappe.get_doc( + workflow_info.linked_doctype, workflow_info.linked_docname + ) # type: ignore + reference_doc.workflow_name = self.workflow + reference_doc.flags.in_press_workflow_execution = True + reference_doc.flags.current_press_workflow_task = self.name + + try: + args = PressWorkflowObject.get_object(self.args) if self.args else () + kwargs = PressWorkflowObject.get_object(self.kwargs) if self.kwargs else {} + except Exception as e: + self.exception = PressWorkflowObject.store(e, throw_on_error=False) + self.status = "Failure" + self.save() + self._resume_workflow() + return + + if not hasattr(reference_doc, self.method_name): + self.exception = PressWorkflowObject.store( + Exception( + f"Method {self.method_name} not found in {reference_doc.doctype} {reference_doc.name}" + ) + ) + self.status = "Failure" + self.save() + self._resume_workflow() + return + + # Mark as Running immediately so the scheduler won't re-enqueue it + # while this execution is in progress. + if not self.start: + self.start = now_datetime() + + self.status = "Running" + self.save() + if not frappe.flags.in_test: + frappe.db.commit() # nosemgrep + + output = None + exception = None + status = "Running" + buffer = io.StringIO() + + existing_task_signature = reference_doc.current_task_signature + try: + reference_doc.current_task_signature = self.signature + with redirect_stdout(buffer): + result = getattr(reference_doc, self.method_name)(*args, **kwargs) + + if result is not None: + output = PressWorkflowObject.store(result) + + status = "Success" + except PressWorkflowTaskEnqueued: + # A nested task was enqueued while executing this task. + # This is expected behaviour: exit without marking a terminal state + # so this task will be retried later (same as the parent workflow). + pass + except Exception as e: + exception = PressWorkflowObject.store(e, throw_on_error=False) + status = "Failure" + + finally: + reference_doc.current_task_signature = existing_task_signature + self.reload() + + if status in ["Success", "Failure"] and not self.end: + self.end = now_datetime() + + if self.start and self.end and not self.duration: + self.duration = calculate_duration(self.start, self.end) + + self.status = status + self.output = output + self.exception = exception + self.stdout = (self.stdout or "") + buffer.getvalue() + + if frappe.flags.in_test and self.stdout: + print(self.stdout) + + self.save() + + if self.status in ["Success", "Failure"]: + # On termination, resume the parent task or workflow. + self._resume_workflow() + else: + # A nested task was enqueued; re-enqueue ourselves for retry. + enqueue_task(self.name) + + def _resume_workflow(self): + if self.parent_task: + # This task was triggered by another task — re-enqueue the parent + # so it can continue from where it was paused. + enqueue_task(self.parent_task) + else: + # Top-level task -- resume the parent workflow. + enqueue_workflow(self.workflow) + + +def on_doctype_update(): + frappe.db.add_index("Press Workflow Task", ["creation"]) + + +def enqueue_task(task_name: str) -> None: + if frappe.flags.in_test: + from press.utils.test import foreground_enqueue_task + + foreground_enqueue_task(task_name) + return + frappe.enqueue_doc( + "Press Workflow Task", + task_name, + method="run", + queue=frappe.conf.get("press_workflow_task_queue", "default"), + timeout=3600, + deduplicate=True, + enqueue_after_commit=True, + job_id=f"press_workflow_task||{task_name}||run", + ) + + +def retry_tasks(): + tasks = frappe.get_all( + "Press Workflow Task", + filters={"status": ("in", ["Queued", "Running"])}, + pluck="name", + order_by="modified asc", + ) + for task_name in tasks: + try: + enqueue_task(task_name) + except Exception as e: + frappe.log_error( + "Error Processing task", + message=str(e), + reference_doctype="Press Workflow Task", + reference_name=task_name, + ) diff --git a/press/workflow_engine/doctype/press_workflow_task/test_press_workflow_task.py b/press/workflow_engine/doctype/press_workflow_task/test_press_workflow_task.py new file mode 100644 index 00000000000..a73f1d47daf --- /dev/null +++ b/press/workflow_engine/doctype/press_workflow_task/test_press_workflow_task.py @@ -0,0 +1,20 @@ +# Copyright (c) 2026, Frappe and Contributors +# See license.txt + +# import frappe +from frappe.tests import IntegrationTestCase + +# On IntegrationTestCase, the doctype test records and all +# link-field test record dependencies are recursively loaded +# Use these module variables to add/remove to/from that list +EXTRA_TEST_RECORD_DEPENDENCIES = [] # eg. ["User"] +IGNORE_TEST_RECORD_DEPENDENCIES = [] # eg. ["User"] + + +class IntegrationTestPressWorkflowTask(IntegrationTestCase): + """ + Integration tests for PressWorkflowTask. + Use this class for testing interactions between multiple components. + """ + + pass diff --git a/press/workflow_engine/doctype/press_workflow_test/__init__.py b/press/workflow_engine/doctype/press_workflow_test/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/press/workflow_engine/doctype/press_workflow_test/press_workflow_test.js b/press/workflow_engine/doctype/press_workflow_test/press_workflow_test.js new file mode 100644 index 00000000000..e25184bdc19 --- /dev/null +++ b/press/workflow_engine/doctype/press_workflow_test/press_workflow_test.js @@ -0,0 +1,8 @@ +// Copyright (c) 2026, Frappe and contributors +// For license information, please see license.txt + +// frappe.ui.form.on("Press Workflow Test", { +// refresh(frm) { + +// }, +// }); diff --git a/press/workflow_engine/doctype/press_workflow_test/press_workflow_test.json b/press/workflow_engine/doctype/press_workflow_test/press_workflow_test.json new file mode 100644 index 00000000000..9246f1a8b79 --- /dev/null +++ b/press/workflow_engine/doctype/press_workflow_test/press_workflow_test.json @@ -0,0 +1,80 @@ +{ + "actions": [], + "autoname": "hash", + "creation": "2026-03-11 22:18:10.652168", + "doctype": "DocType", + "engine": "InnoDB", + "field_order": [ + "input_a", + "input_c", + "input_e", + "column_break_sdtq", + "input_b", + "input_d", + "input_f" + ], + "fields": [ + { + "fieldname": "input_a", + "fieldtype": "Int", + "label": "Input A" + }, + { + "fieldname": "input_c", + "fieldtype": "Int", + "label": "Input C" + }, + { + "fieldname": "input_e", + "fieldtype": "Int", + "label": "Input E" + }, + { + "fieldname": "column_break_sdtq", + "fieldtype": "Column Break" + }, + { + "fieldname": "input_b", + "fieldtype": "Int", + "label": "Input B" + }, + { + "fieldname": "input_d", + "fieldtype": "Int", + "label": "Input D" + }, + { + "fieldname": "input_f", + "fieldtype": "Int", + "label": "Input F" + } + ], + "grid_page_length": 50, + "index_web_pages_for_search": 1, + "links": [], + "modified": "2026-03-11 22:19:13.824971", + "modified_by": "Administrator", + "module": "Workflow Engine", + "name": "Press Workflow Test", + "naming_rule": "Random", + "owner": "Administrator", + "permissions": [ + { + "create": 1, + "delete": 1, + "email": 1, + "export": 1, + "print": 1, + "read": 1, + "report": 1, + "role": "System Manager", + "share": 1, + "write": 1 + } + ], + "row_format": "Dynamic", + "rows_threshold_for_grid_search": 20, + "sort_field": "creation", + "sort_order": "DESC", + "states": [] +} diff --git a/press/workflow_engine/doctype/press_workflow_test/press_workflow_test.py b/press/workflow_engine/doctype/press_workflow_test/press_workflow_test.py new file mode 100644 index 00000000000..45a6a3b4d2c --- /dev/null +++ b/press/workflow_engine/doctype/press_workflow_test/press_workflow_test.py @@ -0,0 +1,137 @@ +# Copyright (c) 2026, Frappe and contributors +# For license information, please see license.txt + +import frappe + +from press.workflow_engine.doctype.press_workflow.decorators import flow, task +from press.workflow_engine.doctype.press_workflow.workflow_builder import WorkflowBuilder + + +class PressWorkflowTest(WorkflowBuilder): + # begin: auto-generated types + # This code is auto-generated. Do not modify anything in this block. + + from typing import TYPE_CHECKING + + if TYPE_CHECKING: + from frappe.types import DF + + input_a: DF.Int + input_b: DF.Int + input_c: DF.Int + input_d: DF.Int + input_e: DF.Int + input_f: DF.Int + # end: auto-generated types + + def validate(self): + if not frappe.in_test: + frappe.throw("PressWorkflowTest doctype can be used only in Unit Tests") + + @flow + def main_success(self): + return "success output" + + @flow + def main_fail(self): + raise ValueError("mock failure") + + @task + def sample_task(self): + return "task done" + + @task + def sample_failing_task(self): + raise ValueError("task failed") + + @task + def sample_nested_task(self): + return self.sample_task() + + @flow + def main_with_task(self): + return self.sample_task() + + @flow + def main_with_nested_task(self): + return self.sample_nested_task() + + @flow + def main_with_failing_task(self): + return self.sample_failing_task() + + @task + def add(self, a: int, b: int) -> int: + return a + b + + @task + def multiply(self, a: int, b: int) -> int: + return a * b + + @flow + def main_with_args_task(self): + return self.add(self.input_a, self.input_b) + + @task + def power(self, base: int, exponent: int) -> int: + """Raise base to exponent using repeated multiply calls.""" + result = 1 + for i in range(exponent): + result = self.multiply.with_task_id(f"mult_{i}")(result, base) + return result + + @flow + def main_with_task_id_loop(self): + return self.power(self.input_a, self.input_b) + + @task + def multiply_passthrough(self, a: int, b: int, task_id: str | None = None) -> int: + """Multiply that receives task_id so it can be inspected.""" + print(f"[multiply_passthrough] task_id={task_id}") # nosemgrep + return a * b + + @task + def power_passthrough(self, base: int, exponent: int, task_id: str | None = None) -> int: + """Power that forwards its own task_id to nested multiply calls.""" + result = 1 + for i in range(exponent): + result = self.multiply_passthrough.with_task_id( + f"{task_id}_mult_{i}" if task_id else f"mult_{i}" + )(result, base) + return result + + @flow + def main_with_task_id_passthrough(self): + return self.power_passthrough.with_task_id("power")(self.input_a, self.input_b) + + @task + def noisy_task(self) -> str: + print("hello from noisy_task") # nosemgrep + return "done" + + @flow + def main_with_noisy_task(self): + return self.noisy_task() + + @flow + def main_as_flow(self): + self.sample_task() + self.sample_nested_task() + return "flow done" + + @flow + def flow_with_args(self, x: int, y: int) -> int: + return self.add(x, y) + + @flow + def missing_method_flow(self): + return "nothing" + + @flow + def skipped_steps_flow(self): + # We define it locally but won't call any of the tasks. + # Alternatively we can conditionally call them. + if False: + self.sample_task() + self.sample_failing_task() + return "skipped" diff --git a/press/workflow_engine/doctype/press_workflow_test/test_press_workflow_test.py b/press/workflow_engine/doctype/press_workflow_test/test_press_workflow_test.py new file mode 100644 index 00000000000..9f9e22f354e --- /dev/null +++ b/press/workflow_engine/doctype/press_workflow_test/test_press_workflow_test.py @@ -0,0 +1,20 @@ +# Copyright (c) 2026, Frappe and Contributors +# See license.txt + +# import frappe +from frappe.tests import IntegrationTestCase + +# On IntegrationTestCase, the doctype test records and all +# link-field test record dependencies are recursively loaded +# Use these module variables to add/remove to/from that list +EXTRA_TEST_RECORD_DEPENDENCIES = [] # eg. ["User"] +IGNORE_TEST_RECORD_DEPENDENCIES = [] # eg. ["User"] + + +class IntegrationTestPressWorkflowTest(IntegrationTestCase): + """ + Integration tests for PressWorkflowTest. + Use this class for testing interactions between multiple components. + """ + + pass diff --git a/press/workflow_engine/test_utils.py b/press/workflow_engine/test_utils.py new file mode 100644 index 00000000000..7a9e83e47b2 --- /dev/null +++ b/press/workflow_engine/test_utils.py @@ -0,0 +1,146 @@ +# Copyright (c) 2026, Frappe and contributors +# For license information, please see license.txt + +import dataclasses +from datetime import datetime, timedelta + +from frappe.tests.utils import FrappeTestCase + +from press.workflow_engine.utils import ( + _canonicalize, + calculate_duration, + called_methods_in_order, + generate_function_signature, + is_func_accept_task_id, + method_title, +) + + +@dataclasses.dataclass +class DummyDataclass: + a: int + b: str + + +class DummyClassForCallVisitor: + def method_one(self): + pass + + def method_two(self): + """Custom Title For Two""" + pass + + def method_three(self): + self.method_one() + self.method_two() + + +class TestWorkflowEngineUtils(FrappeTestCase): + def test_method_title(self): + # fmt: off + def func_with_doc(): + """This is a Docstring + Second Line + """ + pass + # fmt: on + + def func_without_doc(): + pass + + self.assertEqual(method_title(func_with_doc), "This is a Docstring") + self.assertEqual(method_title(func_without_doc), "Func Without Doc") + + def test_called_methods_in_order(self): + calls = called_methods_in_order(DummyClassForCallVisitor, "method_three") + expected = [ + ("method_one", "Method One"), + ("method_two", "Custom Title For Two"), + ] + self.assertEqual(calls, expected) + + def test_calculate_duration(self): + start = datetime(2026, 1, 1, 12, 0, 0) + end = start + timedelta(seconds=150) + + # Test with datetime objects + self.assertEqual(calculate_duration(start, end), 150) + + # Test with string objects + self.assertEqual(calculate_duration(str(start), str(end)), 150) + + def test_canonicalize_basic_types(self): + self.assertIsNone(_canonicalize(None)) + self.assertEqual(_canonicalize(1), 1) + self.assertEqual(_canonicalize("test"), "test") + self.assertEqual(_canonicalize(True), True) + + def test_canonicalize_floats(self): + self.assertEqual(_canonicalize(1.5), 1.5) + self.assertEqual(_canonicalize(float("inf")), "__Inf__") + self.assertEqual(_canonicalize(float("-inf")), "__-Inf__") + self.assertEqual(_canonicalize(float("nan")), "__NaN__") + + def test_canonicalize_collections(self): + # List + self.assertEqual( + _canonicalize([1, "a", None]), + {"__type__": "list", "values": [1, "a", None]}, + ) + + # Tuple + self.assertEqual( + _canonicalize((1, 2)), + {"__type__": "tuple", "values": [1, 2]}, + ) + + # Dict + self.assertEqual( + _canonicalize({"b": 2, "a": 1}), + {"__type__": "dict", "values": {"a": 1, "b": 2}}, + ) + + # Set + self.assertEqual( + _canonicalize({2, 1}), + {"__type__": "set", "values": [1, 2]}, # Sorted + ) + + def test_canonicalize_dataclass(self): + obj = DummyDataclass(a=1, b="2") + result = _canonicalize(obj) + + self.assertEqual(result["__type__"], f"{__name__}.DummyDataclass") + self.assertEqual(result["fields"], {"__type__": "dict", "values": {"a": 1, "b": "2"}}) + + def test_canonicalize_circular_reference(self): + circular_list = [] + circular_list.append(circular_list) + with self.assertRaisesRegex(ValueError, "Circular reference detected"): + _canonicalize(circular_list) + + def test_is_func_accept_task_id(self): + def func_with_task_id(a, b, task_id): + pass + + def func_without_task_id(a, b): + pass + + self.assertTrue(is_func_accept_task_id(func_with_task_id)) + self.assertFalse(is_func_accept_task_id(func_without_task_id)) + + def test_generate_function_signature(self): + def my_func(a, b=2, task_id=None): + pass + + sig1 = generate_function_signature(my_func, args=(1,), kwargs={}) + sig2 = generate_function_signature(my_func, args=(1,), kwargs={"b": 2}) + sig3 = generate_function_signature(my_func, args=(1, 2), kwargs={}) + + self.assertEqual(sig1, sig2) + self.assertEqual(sig1, sig3) + + # task_id should be ignored in the signature arguments, but included in payload. + sig4 = generate_function_signature(my_func, args=(1,), kwargs={"task_id": "123"}) + # In this implementation, the payload structure incorporates task_id so the digest will be different. + self.assertNotEqual(sig1, sig4) diff --git a/press/workflow_engine/utils.py b/press/workflow_engine/utils.py new file mode 100644 index 00000000000..c97d82048b7 --- /dev/null +++ b/press/workflow_engine/utils.py @@ -0,0 +1,152 @@ +# Copyright (c) 2026, Frappe and contributors +# For license information, please see license.txt + +import ast +import dataclasses +import hashlib +import inspect +import json +import math +import textwrap +from collections.abc import Callable +from datetime import datetime +from typing import Any + +from frappe.model.document import Document +from frappe.utils import get_datetime + + +def method_title(func: Callable[..., Any]) -> str: + if func.__doc__: + return func.__doc__.strip().split("\n")[0] + return func.__name__.replace("_", " ").title() + + +class SelfCallVisitor(ast.NodeVisitor): + def __init__(self, cls: type) -> None: + self.cls = cls + self.calls: list[tuple[str, str]] = [] + self._own_methods: frozenset[str] = frozenset(cls.__dict__) + + def visit_Call(self, node: ast.Call) -> None: + func = node.func + if ( + isinstance(func, ast.Attribute) + and isinstance(func.value, ast.Name) + and func.value.id == "self" + and func.attr in self._own_methods + ): + self.calls.append((func.attr, method_title(getattr(self.cls, func.attr)))) + self.generic_visit(node) + + +def called_methods_in_order(cls: type, func_or_name: str | Callable[..., Any]) -> list[tuple[str, str]]: + func = func_or_name if callable(func_or_name) else inspect.unwrap(getattr(cls, func_or_name)) + source = textwrap.dedent(inspect.getsource(func)) + visitor = SelfCallVisitor(cls) + visitor.visit(ast.parse(source)) + return visitor.calls + + +def calculate_duration(start: str | datetime, end: str | datetime) -> int: + start_dt = get_datetime(start) + end_dt = get_datetime(end) + assert start_dt is not None and end_dt is not None + return int((end_dt - start_dt).total_seconds()) + + +def _canonicalize(obj: Any, visited: set | None = None) -> Any: # noqa: C901 - some crazy internal use better to not touch? + if visited is None: + visited = set() + + if obj is None or isinstance(obj, bool | int | str): + return obj + + if isinstance(obj, float): + if math.isnan(obj): + return "__NaN__" + if obj == float("inf"): + return "__Inf__" + if obj == float("-inf"): + return "__-Inf__" + return obj + + obj_id = id(obj) + if obj_id in visited: + raise ValueError(f"Circular reference detected in object of type {type(obj).__qualname__!r}") + + visited.add(obj_id) + + try: + if isinstance(obj, list | tuple): + return { + "__type__": type(obj).__name__, + "values": [_canonicalize(x, visited) for x in obj], + } + + if isinstance(obj, dict): + return { + "__type__": "dict", + "values": { + str(k): _canonicalize(v, visited) for k, v in sorted(obj.items(), key=lambda x: str(x[0])) + }, + } + + if isinstance(obj, set | frozenset): + canonicalized = [_canonicalize(x, visited) for x in obj] + sorted_values = sorted(canonicalized, key=lambda x: json.dumps(x, sort_keys=True)) + return {"__type__": type(obj).__name__, "values": sorted_values} + + if isinstance(obj, Document): + return { + "__type__": obj.doctype, + "doctype": obj.doctype, + "name": obj.name, + } + + if dataclasses.is_dataclass(obj) and not isinstance(obj, type): + return { + "__type__": f"{type(obj).__module__}.{type(obj).__qualname__}", + "fields": _canonicalize(dataclasses.asdict(obj), visited), + } + + raise TypeError( + f"Cannot canonicalize type {type(obj).__qualname__!r}. " + f"Convert to a supported type or extend _canonicalize before calling generate_signature." + ) + + finally: + visited.discard(obj_id) + + +def is_func_accept_task_id(func: Callable) -> bool: + """Return True if `func` has an explicit `task_id` parameter.""" + sig = inspect.signature(func) + return "task_id" in sig.parameters + + +def generate_function_signature(func: Callable, args: tuple, kwargs: dict) -> str: + kwargs = kwargs.copy() + task_id = kwargs.pop("task_id", None) + + sig = inspect.signature(func) + # Drop `self` - args never includes the instance (it's the unbound function) + params = list(sig.parameters.values()) + if params and params[0].name == "self": + sig = sig.replace(parameters=params[1:]) + bound = sig.bind(*args, **kwargs) + bound.apply_defaults() + + payload = { + "func": { + "module": func.__module__, + "qualname": func.__qualname__, + }, + "arguments": _canonicalize(dict(bound.arguments)), + } + + if task_id is not None: + payload["task_id"] = task_id + + blob = json.dumps(payload, sort_keys=True, separators=(",", ":")).encode("utf-8") + return hashlib.sha256(blob).hexdigest()