Skip to content

Commit 7b357c5

Browse files
[Feature] Update plot routines for compatibility to the Python Workflow Definition (#881)
* Update plot routines for compatibility to the Python Workflow Definition * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * reduce lines * update tests * fix * more fixes * fix input labels * update notebook --------- Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
1 parent 3bd6463 commit 7b357c5

File tree

5 files changed

+71
-27
lines changed

5 files changed

+71
-27
lines changed

notebooks/1-single-node.ipynb

Lines changed: 1 addition & 1 deletion
Large diffs are not rendered by default.

src/executorlib/task_scheduler/interactive/dependency_plot.py

Lines changed: 59 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import inspect
12
import os.path
23
from concurrent.futures import Future
34
from typing import Optional
@@ -24,6 +25,12 @@ def generate_nodes_and_edges_for_plotting(
2425
edge_lst: list = []
2526
hash_id_dict: dict = {}
2627

28+
def extend_args(funct_dict):
29+
sig = inspect.signature(funct_dict["fn"])
30+
args = sig.bind(*funct_dict["args"], **funct_dict["kwargs"])
31+
funct_dict["signature"] = args.arguments
32+
return funct_dict
33+
2734
def add_element(arg, link_to, label=""):
2835
"""
2936
Add element to the node and edge lists.
@@ -39,6 +46,8 @@ def add_element(arg, link_to, label=""):
3946
"start": hash_id_dict[future_hash_inverse_dict[arg._future]],
4047
"end": link_to,
4148
"label": label + str(arg._selector),
49+
"end_label": label,
50+
"start_label": str(arg._selector),
4251
}
4352
)
4453
elif isinstance(arg, Future):
@@ -53,39 +62,71 @@ def add_element(arg, link_to, label=""):
5362
lst_no_future = [a if not isinstance(a, Future) else "$" for a in arg]
5463
node_id = len(node_lst)
5564
node_lst.append(
56-
{"name": str(lst_no_future), "id": node_id, "shape": "circle"}
65+
{
66+
"name": str(lst_no_future),
67+
"value": "python_workflow_definition.shared.get_list",
68+
"id": node_id,
69+
"type": "function",
70+
"shape": "box",
71+
}
5772
)
5873
edge_lst.append({"start": node_id, "end": link_to, "label": label})
5974
for i, a in enumerate(arg):
6075
if isinstance(a, Future):
61-
add_element(arg=a, link_to=node_id, label="ind: " + str(i))
76+
add_element(arg=a, link_to=node_id, label=str(i))
6277
elif isinstance(arg, dict) and any(isinstance(a, Future) for a in arg.values()):
6378
dict_no_future = {
6479
kt: vt if not isinstance(vt, Future) else "$" for kt, vt in arg.items()
6580
}
6681
node_id = len(node_lst)
6782
node_lst.append(
68-
{"name": str(dict_no_future), "id": node_id, "shape": "circle"}
83+
{
84+
"name": str(dict_no_future),
85+
"value": "python_workflow_definition.shared.get_dict",
86+
"id": node_id,
87+
"type": "function",
88+
"shape": "box",
89+
}
6990
)
7091
edge_lst.append({"start": node_id, "end": link_to, "label": label})
7192
for kt, vt in arg.items():
72-
if isinstance(vt, Future):
73-
add_element(arg=vt, link_to=node_id, label="key: " + kt)
93+
add_element(arg=vt, link_to=node_id, label=kt)
7494
else:
75-
node_id = len(node_lst)
76-
node_lst.append({"name": str(arg), "id": node_id, "shape": "circle"})
95+
value_dict = {
96+
str(n["value"]): n["id"] for n in node_lst if n["type"] == "input"
97+
}
98+
if str(arg) not in value_dict:
99+
node_id = len(node_lst)
100+
node_lst.append(
101+
{
102+
"name": label,
103+
"value": arg,
104+
"id": node_id,
105+
"type": "input",
106+
"shape": "circle",
107+
}
108+
)
109+
else:
110+
node_id = value_dict[str(arg)]
77111
edge_lst.append({"start": node_id, "end": link_to, "label": label})
78112

79-
for k, v in task_hash_dict.items():
113+
task_hash_modified_dict = {
114+
k: extend_args(funct_dict=v) for k, v in task_hash_dict.items()
115+
}
116+
117+
for k, v in task_hash_modified_dict.items():
80118
hash_id_dict[k] = len(node_lst)
81119
node_lst.append(
82-
{"name": v["fn"].__name__, "id": hash_id_dict[k], "shape": "box"}
120+
{
121+
"name": v["fn"].__name__,
122+
"type": "function",
123+
"value": v["fn"].__module__ + "." + v["fn"].__name__,
124+
"id": hash_id_dict[k],
125+
"shape": "box",
126+
}
83127
)
84-
for k, task_dict in task_hash_dict.items():
85-
for arg in task_dict["args"]:
86-
add_element(arg=arg, link_to=hash_id_dict[k], label="")
87-
88-
for kw, v in task_dict["kwargs"].items():
128+
for k, task_dict in task_hash_modified_dict.items():
129+
for kw, v in task_dict["signature"].items():
89130
add_element(arg=v, link_to=hash_id_dict[k], label=str(kw))
90131

91132
return node_lst, edge_lst
@@ -175,7 +216,10 @@ def plot_dependency_graph_function(
175216

176217
graph = nx.DiGraph()
177218
for node in node_lst:
178-
graph.add_node(node["id"], label=node["name"], shape=node["shape"])
219+
if node["type"] == "input":
220+
graph.add_node(node["id"], label=str(node["value"]), shape=node["shape"])
221+
else:
222+
graph.add_node(node["id"], label=str(node["name"]), shape=node["shape"])
179223
for edge in edge_lst:
180224
graph.add_edge(edge["start"], edge["end"], label=edge["label"])
181225
if filename is not None:

tests/test_fluxjobexecutor_plot.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ def test_executor_dependency_plot(self):
6161
v: k for k, v in exe._task_scheduler._future_hash_dict.items()
6262
},
6363
)
64-
self.assertEqual(len(nodes), 5)
64+
self.assertEqual(len(nodes), 4)
6565
self.assertEqual(len(edges), 4)
6666

6767
def test_many_to_one_plot(self):
@@ -106,7 +106,7 @@ def test_many_to_one_plot(self):
106106
v: k for k, v in exe._task_scheduler._future_hash_dict.items()
107107
},
108108
)
109-
self.assertEqual(len(nodes), 19)
109+
self.assertEqual(len(nodes), 14)
110110
self.assertEqual(len(edges), 22)
111111

112112

@@ -132,7 +132,7 @@ def test_executor_dependency_plot(self):
132132
v: k for k, v in exe._task_scheduler._future_hash_dict.items()
133133
},
134134
)
135-
self.assertEqual(len(nodes), 5)
135+
self.assertEqual(len(nodes), 4)
136136
self.assertEqual(len(edges), 4)
137137

138138
def test_many_to_one_plot(self):
@@ -175,5 +175,5 @@ def test_many_to_one_plot(self):
175175
v: k for k, v in exe._task_scheduler._future_hash_dict.items()
176176
},
177177
)
178-
self.assertEqual(len(nodes), 19)
178+
self.assertEqual(len(nodes), 14)
179179
self.assertEqual(len(edges), 22)

tests/test_singlenodeexecutor_plot_dependency.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ def test_executor_dependency_plot(self):
8080
v: k for k, v in exe._task_scheduler._future_hash_dict.items()
8181
},
8282
)
83-
self.assertEqual(len(nodes), 5)
83+
self.assertEqual(len(nodes), 4)
8484
self.assertEqual(len(edges), 4)
8585

8686
def test_executor_dependency_plot_filename(self):
@@ -141,7 +141,7 @@ def test_many_to_one_plot(self):
141141
v: k for k, v in exe._task_scheduler._future_hash_dict.items()
142142
},
143143
)
144-
self.assertEqual(len(nodes), 19)
144+
self.assertEqual(len(nodes), 14)
145145
self.assertEqual(len(edges), 22)
146146

147147
def test_future_input_dict(self):
@@ -186,7 +186,7 @@ def test_executor_dependency_plot(self):
186186
v: k for k, v in exe._task_scheduler._future_hash_dict.items()
187187
},
188188
)
189-
self.assertEqual(len(nodes), 5)
189+
self.assertEqual(len(nodes), 4)
190190
self.assertEqual(len(edges), 4)
191191

192192
def test_many_to_one_plot(self):
@@ -231,7 +231,7 @@ def test_many_to_one_plot(self):
231231
v: k for k, v in exe._task_scheduler._future_hash_dict.items()
232232
},
233233
)
234-
self.assertEqual(len(nodes), 19)
234+
self.assertEqual(len(nodes), 14)
235235
self.assertEqual(len(edges), 22)
236236

237237

@@ -257,7 +257,7 @@ def test_executor_dependency_plot(self):
257257
v: k for k, v in exe._task_scheduler._future_hash_dict.items()
258258
},
259259
)
260-
self.assertEqual(len(nodes), 5)
260+
self.assertEqual(len(nodes), 4)
261261
self.assertEqual(len(edges), 4)
262262

263263
def test_many_to_one_plot(self):
@@ -300,7 +300,7 @@ def test_many_to_one_plot(self):
300300
v: k for k, v in exe._task_scheduler._future_hash_dict.items()
301301
},
302302
)
303-
self.assertEqual(len(nodes), 19)
303+
self.assertEqual(len(nodes), 14)
304304
self.assertEqual(len(edges), 22)
305305

306306

tests/test_testclusterexecutor.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ def test_executor_dependency_plot(self):
8989
v: k for k, v in exe._task_scheduler._future_hash_dict.items()
9090
},
9191
)
92-
self.assertEqual(len(nodes), 5)
92+
self.assertEqual(len(nodes), 4)
9393
self.assertEqual(len(edges), 4)
9494

9595
def tearDown(self):

0 commit comments

Comments
 (0)