Skip to content

Commit 3ea87a0

Browse files
jan-janssenpre-commit-ci[bot]pyiron-runner
authored
[Feature] Export to Python Workflow Definition (#882)
* Export to Python Workflow Definition * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * Format black * Update dependency.py * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * Add test * extend test * fix --------- Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com> Co-authored-by: pyiron-runner <[email protected]>
1 parent e8f0890 commit 3ea87a0

File tree

6 files changed

+163
-6
lines changed

6 files changed

+163
-6
lines changed

src/executorlib/executor/flux.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ class FluxJobExecutor(BaseExecutor):
6565
plot_dependency_graph (bool): Plot the dependencies of multiple future objects without executing them. For
6666
debugging purposes and to get an overview of the specified dependencies.
6767
plot_dependency_graph_filename (str): Name of the file to store the plotted graph in.
68+
export_workflow_filename (str): Name of the file to store the exported workflow graph in.
6869
log_obj_size (bool): Enable debug mode which reports the size of the communicated objects.
6970
7071
Examples:
@@ -105,6 +106,7 @@ def __init__(
105106
refresh_rate: float = 0.01,
106107
plot_dependency_graph: bool = False,
107108
plot_dependency_graph_filename: Optional[str] = None,
109+
export_workflow_filename: Optional[str] = None,
108110
log_obj_size: bool = False,
109111
):
110112
"""
@@ -152,6 +154,7 @@ def __init__(
152154
plot_dependency_graph (bool): Plot the dependencies of multiple future objects without executing them. For
153155
debugging purposes and to get an overview of the specified dependencies.
154156
plot_dependency_graph_filename (str): Name of the file to store the plotted graph in.
157+
export_workflow_filename (str): Name of the file to store the exported workflow graph in.
155158
log_obj_size (bool): Enable debug mode which reports the size of the communicated objects.
156159
157160
"""
@@ -189,6 +192,7 @@ def __init__(
189192
refresh_rate=refresh_rate,
190193
plot_dependency_graph=plot_dependency_graph,
191194
plot_dependency_graph_filename=plot_dependency_graph_filename,
195+
export_workflow_filename=export_workflow_filename,
192196
)
193197
)
194198
else:
@@ -255,6 +259,7 @@ class FluxClusterExecutor(BaseExecutor):
255259
plot_dependency_graph (bool): Plot the dependencies of multiple future objects without executing them. For
256260
debugging purposes and to get an overview of the specified dependencies.
257261
plot_dependency_graph_filename (str): Name of the file to store the plotted graph in.
262+
export_workflow_filename (str): Name of the file to store the exported workflow graph in.
258263
log_obj_size (bool): Enable debug mode which reports the size of the communicated objects.
259264
260265
Examples:
@@ -293,6 +298,7 @@ def __init__(
293298
refresh_rate: float = 0.01,
294299
plot_dependency_graph: bool = False,
295300
plot_dependency_graph_filename: Optional[str] = None,
301+
export_workflow_filename: Optional[str] = None,
296302
log_obj_size: bool = False,
297303
):
298304
"""
@@ -338,6 +344,7 @@ def __init__(
338344
plot_dependency_graph (bool): Plot the dependencies of multiple future objects without executing them. For
339345
debugging purposes and to get an overview of the specified dependencies.
340346
plot_dependency_graph_filename (str): Name of the file to store the plotted graph in.
347+
export_workflow_filename (str): Name of the file to store the exported workflow graph in.
341348
log_obj_size (bool): Enable debug mode which reports the size of the communicated objects.
342349
343350
"""
@@ -420,6 +427,7 @@ def __init__(
420427
refresh_rate=refresh_rate,
421428
plot_dependency_graph=plot_dependency_graph,
422429
plot_dependency_graph_filename=plot_dependency_graph_filename,
430+
export_workflow_filename=export_workflow_filename,
423431
)
424432
)
425433

src/executorlib/executor/single.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ class SingleNodeExecutor(BaseExecutor):
5858
plot_dependency_graph (bool): Plot the dependencies of multiple future objects without executing them. For
5959
debugging purposes and to get an overview of the specified dependencies.
6060
plot_dependency_graph_filename (str): Name of the file to store the plotted graph in.
61+
export_workflow_filename (str): Name of the file to store the exported workflow graph in.
6162
log_obj_size (bool): Enable debug mode which reports the size of the communicated objects.
6263
6364
Examples:
@@ -94,6 +95,7 @@ def __init__(
9495
refresh_rate: float = 0.01,
9596
plot_dependency_graph: bool = False,
9697
plot_dependency_graph_filename: Optional[str] = None,
98+
export_workflow_filename: Optional[str] = None,
9799
log_obj_size: bool = False,
98100
):
99101
"""
@@ -138,6 +140,7 @@ def __init__(
138140
plot_dependency_graph (bool): Plot the dependencies of multiple future objects without executing them. For
139141
debugging purposes and to get an overview of the specified dependencies.
140142
plot_dependency_graph_filename (str): Name of the file to store the plotted graph in.
143+
export_workflow_filename (str): Name of the file to store the exported workflow graph in.
141144
log_obj_size (bool): Enable debug mode which reports the size of the communicated objects.
142145
143146
"""
@@ -171,6 +174,7 @@ def __init__(
171174
refresh_rate=refresh_rate,
172175
plot_dependency_graph=plot_dependency_graph,
173176
plot_dependency_graph_filename=plot_dependency_graph_filename,
177+
export_workflow_filename=export_workflow_filename,
174178
)
175179
)
176180
else:
@@ -226,6 +230,7 @@ class TestClusterExecutor(BaseExecutor):
226230
plot_dependency_graph (bool): Plot the dependencies of multiple future objects without executing them. For
227231
debugging purposes and to get an overview of the specified dependencies.
228232
plot_dependency_graph_filename (str): Name of the file to store the plotted graph in.
233+
export_workflow_filename (str): Name of the file to store the exported workflow graph in.
229234
log_obj_size (bool): Enable debug mode which reports the size of the communicated objects.
230235
231236
Examples:
@@ -262,6 +267,7 @@ def __init__(
262267
refresh_rate: float = 0.01,
263268
plot_dependency_graph: bool = False,
264269
plot_dependency_graph_filename: Optional[str] = None,
270+
export_workflow_filename: Optional[str] = None,
265271
log_obj_size: bool = False,
266272
):
267273
"""
@@ -299,6 +305,7 @@ def __init__(
299305
plot_dependency_graph (bool): Plot the dependencies of multiple future objects without executing them. For
300306
debugging purposes and to get an overview of the specified dependencies.
301307
plot_dependency_graph_filename (str): Name of the file to store the plotted graph in.
308+
export_workflow_filename (str): Name of the file to store the exported workflow graph in.
302309
log_obj_size (bool): Enable debug mode which reports the size of the communicated objects.
303310
304311
"""
@@ -358,6 +365,7 @@ def __init__(
358365
refresh_rate=refresh_rate,
359366
plot_dependency_graph=plot_dependency_graph,
360367
plot_dependency_graph_filename=plot_dependency_graph_filename,
368+
export_workflow_filename=export_workflow_filename,
361369
)
362370
)
363371

src/executorlib/executor/slurm.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@ class SlurmClusterExecutor(BaseExecutor):
6363
plot_dependency_graph (bool): Plot the dependencies of multiple future objects without executing them. For
6464
debugging purposes and to get an overview of the specified dependencies.
6565
plot_dependency_graph_filename (str): Name of the file to store the plotted graph in.
66+
export_workflow_filename (str): Name of the file to store the exported workflow graph in.
6667
log_obj_size (bool): Enable debug mode which reports the size of the communicated objects.
6768
6869
Examples:
@@ -101,6 +102,7 @@ def __init__(
101102
refresh_rate: float = 0.01,
102103
plot_dependency_graph: bool = False,
103104
plot_dependency_graph_filename: Optional[str] = None,
105+
export_workflow_filename: Optional[str] = None,
104106
log_obj_size: bool = False,
105107
):
106108
"""
@@ -146,6 +148,7 @@ def __init__(
146148
plot_dependency_graph (bool): Plot the dependencies of multiple future objects without executing them. For
147149
debugging purposes and to get an overview of the specified dependencies.
148150
plot_dependency_graph_filename (str): Name of the file to store the plotted graph in.
151+
export_workflow_filename (str): Name of the file to store the exported workflow graph in.
149152
log_obj_size (bool): Enable debug mode which reports the size of the communicated objects.
150153
151154
"""
@@ -225,6 +228,7 @@ def __init__(
225228
refresh_rate=refresh_rate,
226229
plot_dependency_graph=plot_dependency_graph,
227230
plot_dependency_graph_filename=plot_dependency_graph_filename,
231+
export_workflow_filename=export_workflow_filename,
228232
)
229233
)
230234

@@ -275,6 +279,7 @@ class SlurmJobExecutor(BaseExecutor):
275279
plot_dependency_graph (bool): Plot the dependencies of multiple future objects without executing them. For
276280
debugging purposes and to get an overview of the specified dependencies.
277281
plot_dependency_graph_filename (str): Name of the file to store the plotted graph in.
282+
export_workflow_filename (str): Name of the file to store the exported workflow graph in.
278283
log_obj_size (bool): Enable debug mode which reports the size of the communicated objects.
279284
280285
Examples:
@@ -312,6 +317,7 @@ def __init__(
312317
refresh_rate: float = 0.01,
313318
plot_dependency_graph: bool = False,
314319
plot_dependency_graph_filename: Optional[str] = None,
320+
export_workflow_filename: Optional[str] = None,
315321
log_obj_size: bool = False,
316322
):
317323
"""
@@ -360,6 +366,7 @@ def __init__(
360366
plot_dependency_graph (bool): Plot the dependencies of multiple future objects without executing them. For
361367
debugging purposes and to get an overview of the specified dependencies.
362368
plot_dependency_graph_filename (str): Name of the file to store the plotted graph in.
369+
export_workflow_filename (str): Name of the file to store the exported workflow graph in.
363370
log_obj_size (bool): Enable debug mode which reports the size of the communicated objects.
364371
365372
"""
@@ -394,6 +401,7 @@ def __init__(
394401
refresh_rate=refresh_rate,
395402
plot_dependency_graph=plot_dependency_graph,
396403
plot_dependency_graph_filename=plot_dependency_graph_filename,
404+
export_workflow_filename=export_workflow_filename,
397405
)
398406
)
399407
else:

src/executorlib/task_scheduler/interactive/dependency.py

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
)
1414
from executorlib.task_scheduler.base import TaskSchedulerBase
1515
from executorlib.task_scheduler.interactive.dependency_plot import (
16+
export_dependency_graph_function,
1617
generate_nodes_and_edges_for_plotting,
1718
generate_task_hash_for_plotting,
1819
plot_dependency_graph_function,
@@ -28,6 +29,7 @@ class DependencyTaskScheduler(TaskSchedulerBase):
2829
refresh_rate (float, optional): The refresh rate for updating the executor queue. Defaults to 0.01.
2930
plot_dependency_graph (bool, optional): Whether to generate and plot the dependency graph. Defaults to False.
3031
plot_dependency_graph_filename (str): Name of the file to store the plotted graph in.
32+
export_workflow_filename (str): Name of the file to store the exported workflow graph in.
3133
3234
Attributes:
3335
_future_hash_dict (Dict[str, Future]): A dictionary mapping task hash to future object.
@@ -44,6 +46,7 @@ def __init__(
4446
refresh_rate: float = 0.01,
4547
plot_dependency_graph: bool = False,
4648
plot_dependency_graph_filename: Optional[str] = None,
49+
export_workflow_filename: Optional[str] = None,
4750
) -> None:
4851
super().__init__(max_cores=max_cores)
4952
self._process_kwargs = {
@@ -61,7 +64,8 @@ def __init__(
6164
self._future_hash_dict: dict = {}
6265
self._task_hash_dict: dict = {}
6366
self._plot_dependency_graph_filename = plot_dependency_graph_filename
64-
if plot_dependency_graph_filename is None:
67+
self._export_workflow_filename = export_workflow_filename
68+
if plot_dependency_graph_filename is None and export_workflow_filename is None:
6569
self._generate_dependency_graph = plot_dependency_graph
6670
else:
6771
self._generate_dependency_graph = True
@@ -209,11 +213,18 @@ def __exit__(
209213
v: k for k, v in self._future_hash_dict.items()
210214
},
211215
)
212-
return plot_dependency_graph_function(
213-
node_lst=node_lst,
214-
edge_lst=edge_lst,
215-
filename=self._plot_dependency_graph_filename,
216-
)
216+
if self._export_workflow_filename is not None:
217+
return export_dependency_graph_function(
218+
node_lst=node_lst,
219+
edge_lst=edge_lst,
220+
file_name=self._export_workflow_filename,
221+
)
222+
else:
223+
return plot_dependency_graph_function(
224+
node_lst=node_lst,
225+
edge_lst=edge_lst,
226+
filename=self._plot_dependency_graph_filename,
227+
)
217228
else:
218229
return None
219230

src/executorlib/task_scheduler/interactive/dependency_plot.py

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
11
import inspect
2+
import json
23
import os.path
34
from concurrent.futures import Future
45
from typing import Optional
56

67
import cloudpickle
8+
import numpy as np
79

810
from executorlib.standalone.select import FutureSelector
911

@@ -230,3 +232,76 @@ def plot_dependency_graph_function(
230232
from IPython.display import SVG, display # noqa
231233

232234
display(SVG(nx.nx_agraph.to_agraph(graph).draw(prog="dot", format="svg")))
235+
236+
237+
def export_dependency_graph_function(
238+
node_lst: list, edge_lst: list, file_name: str = "workflow.json"
239+
):
240+
"""
241+
Export the graph visualization of nodes and edges as a JSON dictionary.
242+
243+
Args:
244+
node_lst (list): List of nodes.
245+
edge_lst (list): List of edges.
246+
file_name (str): Name of the file to store the exported graph in.
247+
"""
248+
pwd_nodes_lst = []
249+
for n in node_lst:
250+
if n["type"] == "function":
251+
pwd_nodes_lst.append(
252+
{"id": n["id"], "type": n["type"], "value": n["value"]}
253+
)
254+
elif n["type"] == "input" and isinstance(n["value"], np.ndarray):
255+
pwd_nodes_lst.append(
256+
{
257+
"id": n["id"],
258+
"type": n["type"],
259+
"value": n["value"].tolist(),
260+
"name": n["name"],
261+
}
262+
)
263+
else:
264+
pwd_nodes_lst.append(
265+
{
266+
"id": n["id"],
267+
"type": n["type"],
268+
"value": n["value"],
269+
"name": n["name"],
270+
}
271+
)
272+
273+
final_node = {"id": len(pwd_nodes_lst), "type": "output", "name": "result"}
274+
pwd_nodes_lst.append(final_node)
275+
pwd_edges_lst = [
276+
(
277+
{
278+
"target": e["end"],
279+
"targetPort": e["label"],
280+
"source": e["start"],
281+
"sourcePort": None,
282+
}
283+
if "start_label" not in e
284+
else {
285+
"target": e["end"],
286+
"targetPort": e["end_label"],
287+
"source": e["start"],
288+
"sourcePort": e["start_label"],
289+
}
290+
)
291+
for e in edge_lst
292+
]
293+
pwd_edges_lst.append(
294+
{
295+
"target": final_node["id"],
296+
"targetPort": None,
297+
"source": max([e["target"] for e in pwd_edges_lst]),
298+
"sourcePort": None,
299+
}
300+
)
301+
pwd_dict = {
302+
"version": "0.1.0",
303+
"nodes": pwd_nodes_lst,
304+
"edges": pwd_edges_lst,
305+
}
306+
with open(file_name, "w") as f:
307+
json.dump(pwd_dict, f, indent=4)
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
import json
2+
import os
3+
import unittest
4+
import numpy as np
5+
from executorlib import SingleNodeExecutor, get_item_from_future
6+
7+
8+
def get_sum(x, y):
9+
return x + y
10+
11+
def get_prod_and_div(x, y):
12+
return {"prod": x * y, "div": x / y}
13+
14+
def get_square(x):
15+
return x ** 2
16+
17+
18+
class TestPythonWorkflowDefinition(unittest.TestCase):
19+
def tearDown(self):
20+
if os.path.exists("workflow.json"):
21+
os.remove("workflow.json")
22+
23+
def test_arithmetic(self):
24+
with SingleNodeExecutor(export_workflow_filename="workflow.json") as exe:
25+
future_prod_and_div = exe.submit(get_prod_and_div, x=1, y=2)
26+
future_prod = get_item_from_future(future_prod_and_div, key="prod")
27+
future_div = get_item_from_future(future_prod_and_div, key="div")
28+
future_sum = exe.submit(get_sum, x=future_prod, y=future_div)
29+
future_result = exe.submit(get_square, x=future_sum)
30+
self.assertIsNone(future_result.result())
31+
32+
with open("workflow.json", "r") as f:
33+
content = json.load(f)
34+
35+
self.assertEqual(len(content["nodes"]), 6)
36+
self.assertEqual(len(content["edges"]), 6)
37+
38+
def test_numpy_array(self):
39+
with SingleNodeExecutor(export_workflow_filename="workflow.json") as exe:
40+
future_sum = exe.submit(get_sum, x=np.array([1,2]), y=np.array([3,4]))
41+
self.assertIsNone(future_sum.result())
42+
43+
with open("workflow.json", "r") as f:
44+
content = json.load(f)
45+
46+
self.assertEqual(len(content["nodes"]), 4)
47+
self.assertEqual(len(content["edges"]), 3)

0 commit comments

Comments
 (0)