Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/actions/changelog/.nvmrc
Original file line number Diff line number Diff line change
@@ -1 +1 @@
v16.15.1
v22.20.0
2 changes: 1 addition & 1 deletion .github/actions/describe/.nvmrc
Original file line number Diff line number Diff line change
@@ -1 +1 @@
16.10.0
v22.20.0
2 changes: 1 addition & 1 deletion .github/actions/stable-changelog/.nvmrc
Original file line number Diff line number Diff line change
@@ -1 +1 @@
16.10.0
v22.20.0
2 changes: 1 addition & 1 deletion .github/actions/version-transform/.nvmrc
Original file line number Diff line number Diff line change
@@ -1 +1 @@
16.10.0
v22.20.0
2 changes: 1 addition & 1 deletion .github/workflows/action_build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ jobs:
- name: Install npm, ncc, and jq on Ubuntu
run: |
apt-get update && apt-get install -y curl
curl -fsSL https://deb.nodesource.com/setup_16.x | bash -
curl -fsSL https://deb.nodesource.com/setup_22.x | bash -
apt-get install -y nodejs
npm install -g @vercel/ncc
apt-get install -y jq
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ jobs:
- name: Set up Node
uses: actions/setup-node@v2
with:
node-version: 20
node-version: 22

- name: Build Webapp
run: |
Expand Down
56 changes: 44 additions & 12 deletions .github/workflows/test_matrix.json
Original file line number Diff line number Diff line change
@@ -1,48 +1,64 @@
[
{
"name": "Debian 12 / Python 3.10 / Dask",
"name": "Debian 13 / Python 3.10 / Dask",
"os": "ubuntu-latest",
"container": "ghcr.io/agnostiqhq/covalent-dev/debian12-py310:latest",
"container": "ghcr.io/agnostiqhq/covalent-dev/debian13-py310:latest",
"backend": "dask",
"experimental": false,
"trigger": ["schedule", "workflow_dispatch", "pull_request"]
},
{
"name": "Debian 12 / Python 3.10 / Local",
"name": "Debian 13 / Python 3.10 / Local",
"os": "ubuntu-latest",
"container": "ghcr.io/agnostiqhq/covalent-dev/debian12-py310:latest",
"container": "ghcr.io/agnostiqhq/covalent-dev/debian13-py310:latest",
"backend": "local",
"experimental": false,
"trigger": ["schedule", "workflow_dispatch", "pull_request"]
},
{
"name": "Debian 12 / Python 3.11 / Dask",
"name": "Debian 13 / Python 3.11 / Dask",
"os": "ubuntu-latest",
"container": "ghcr.io/agnostiqhq/covalent-dev/debian12-py311:latest",
"container": "ghcr.io/agnostiqhq/covalent-dev/debian13-py311:latest",
"backend": "dask",
"experimental": false,
"trigger": ["push", "schedule", "workflow_dispatch", "pull_request"]
},
{
"name": "Debian 12 / Python 3.11 / Local",
"name": "Debian 13 / Python 3.11 / Local",
"os": "ubuntu-latest",
"container": "ghcr.io/agnostiqhq/covalent-dev/debian12-py311:latest",
"container": "ghcr.io/agnostiqhq/covalent-dev/debian13-py311:latest",
"backend": "local",
"experimental": false,
"trigger": ["schedule", "workflow_dispatch", "pull_request"]
},
{
"name": "Debian 12 / Python 3.12 / Dask",
"name": "Debian 13 / Python 3.12 / Dask",
"os": "ubuntu-latest",
"container": "ghcr.io/agnostiqhq/covalent-dev/debian12-py312:latest",
"container": "ghcr.io/agnostiqhq/covalent-dev/debian13-py312:latest",
"backend": "dask",
"experimental": false,
"trigger": ["push", "schedule", "workflow_dispatch", "pull_request"]
},
{
"name": "Debian 12 / Python 3.12 / Local",
"name": "Debian 13 / Python 3.12 / Local",
"os": "ubuntu-latest",
"container": "ghcr.io/agnostiqhq/covalent-dev/debian12-py312:latest",
"container": "ghcr.io/agnostiqhq/covalent-dev/debian13-py312:latest",
"backend": "local",
"experimental": false,
"trigger": ["schedule", "workflow_dispatch", "pull_request"]
},
{
"name": "Debian 13 / Python 3.13 / Dask",
"os": "ubuntu-latest",
"container": "ghcr.io/agnostiqhq/covalent-dev/debian13-py313:latest",
"backend": "dask",
"experimental": false,
"trigger": ["push", "schedule", "workflow_dispatch", "pull_request"]
},
{
"name": "Debian 13 / Python 3.13 / Local",
"os": "ubuntu-latest",
"container": "ghcr.io/agnostiqhq/covalent-dev/debian13-py313:latest",
"backend": "local",
"experimental": false,
"trigger": ["schedule", "workflow_dispatch", "pull_request"]
Expand Down Expand Up @@ -190,5 +206,21 @@
"backend": "local",
"experimental": false,
"trigger": ["schedule", "workflow_dispatch"]
},
{
"name": "MacOS 15 / Python 3.13 / Dask",
"os": "macos-15",
"python-version": "3.13",
"backend": "dask",
"experimental": false,
"trigger": ["schedule", "workflow_dispatch"]
},
{
"name": "MacOS 15 / Python 3.13 / Local",
"os": "macos-15",
"python-version": "3.13",
"backend": "local",
"experimental": false,
"trigger": ["schedule", "workflow_dispatch"]
}
]
2 changes: 1 addition & 1 deletion .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ jobs:
- name: Generate environment variables
run: |
if ${{ matrix.os == 'ubuntu-latest'
&& contains(matrix.container, 'debian12-py310')
&& contains(matrix.container, 'debian13-py312')
&& matrix.backend == 'dask' }} ; then
RECOMMENDED_PLATFORM=true
fi
Expand Down
3 changes: 2 additions & 1 deletion covalent/_shared_files/defaults.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@

prefix_separator = ":"

parameter_prefix = f"{prefix_separator}parameter{prefix_separator}"
NODE_TYPE_PARAMETER = "parameter"
parameter_prefix = f"{prefix_separator}{NODE_TYPE_PARAMETER}{prefix_separator}"
electron_list_prefix = f"{prefix_separator}electron_list{prefix_separator}"
electron_dict_prefix = f"{prefix_separator}electron_dict{prefix_separator}"
subscript_prefix = f"{prefix_separator}subscripted{prefix_separator}"
Expand Down
19 changes: 1 addition & 18 deletions covalent_dispatcher/_dal/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
"""Base class for server-side analogues of workflow data types"""

from abc import abstractmethod
from typing import Any, Dict, Generator, Generic, List, Type, TypeVar, Union
from typing import Any, Dict, Generator, Generic, List, TypeVar, Union

from sqlalchemy import select
from sqlalchemy.orm import Session
Expand All @@ -42,23 +42,6 @@ class DispatchedObject(Generic[MetaType, AssetLinkType]):

"""

@classmethod
@property
def meta_type(cls) -> Type[MetaType]:
"""Returns the metadata controller class."""
raise NotImplementedError

@classmethod
@property
def asset_link_type(cls) -> Type[AssetLinkType]:
"""Returns the asset link controller class"""
raise NotImplementedError

@classmethod
@property
def metadata_keys(cls) -> set:
raise NotImplementedError

@property
@abstractmethod
def query_keys(self) -> set:
Expand Down
15 changes: 15 additions & 0 deletions covalent_dispatcher/_dal/tg_ops.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import networkx as nx

from covalent._shared_files import logger
from covalent._shared_files.defaults import NODE_TYPE_PARAMETER
from covalent._shared_files.util_classes import RESULT_STATUS

from .asset import copy_asset_meta
Expand All @@ -31,6 +32,9 @@
app_log = logger.app_log


GENERATED_ASSETS = {"output", "stdout", "stderr", "error"}


class TransportGraphOps:
def __init__(self, tg: _TransportGraph):
self.tg = tg
Expand Down Expand Up @@ -83,6 +87,7 @@ def copy_nodes_from(
for n in nodes:
old_node = tg.get_node(n)
old_status = tg.get_node_value(n, "status")
node_type = tg.get_node_value(n, "type")

if copy_metadata and old_status == RESULT_STATUS.COMPLETED:
# Only previously completed nodes can actually be
Expand All @@ -101,6 +106,16 @@ def copy_nodes_from(
# truth instead of these hardcoded values
for k in ASSET_KEYS:
# Copy asset metadata
# For non-parameter nodes, skip output, stdout, stderr, error if not reuse_previous_results
# since re-run of reusable nodes will overwrite artifacts
# produced by the previous workflow run
if (
(not copy_metadata)
and k in GENERATED_ASSETS
and node_type != NODE_TYPE_PARAMETER
):
app_log.debug(f"Not copying asset {k} for node {n}")
continue
app_log.debug(f"Copying asset {k} for node {n}")
with old_node.session() as session:
old = old_node.get_asset(k, session)
Expand Down
2 changes: 1 addition & 1 deletion covalent_ui/webapp/.nvmrc
Original file line number Diff line number Diff line change
@@ -1 +1 @@
v20.18.2
v22.20.2
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ def test_export_result(mocker, test_db):
tempfile.TemporaryDirectory(prefix="covalent-") as srv_dir,
):
manifest = get_mock_manifest(dispatch_id, sdk_dir)
received_manifest = manifest.copy(deep=True)
received_manifest = manifest.model_copy(deep=True)
filtered_res = import_result(received_manifest, srv_dir, None)

srvres = Result.from_dispatch_id(dispatch_id, bare=False)
Expand Down
6 changes: 3 additions & 3 deletions tests/covalent_dispatcher_tests/_dal/import_export_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,14 +73,14 @@ def test_import_export_manifest(test_db, mocker):
tempfile.TemporaryDirectory() as srv_tmp_dir,
):
manifest = serialize_result(res, sdk_tmp_dir)
received_manifest = manifest.copy(deep=True)
received_manifest = manifest.model_copy(deep=True)

import_result(received_manifest, srv_tmp_dir, None)

export_manifest = export_result_manifest(dispatch_id)

submitted = manifest.dict()
exported = export_manifest.dict()
submitted = manifest.model_dump()
exported = export_manifest.model_dump()

# Check that workflow metadata are preserved
for key in submitted["metadata"]:
Expand Down
22 changes: 13 additions & 9 deletions tests/covalent_dispatcher_tests/_service/app_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,9 +161,9 @@ def test_register(mocker, app, client, mock_manifest):
"covalent_dispatcher._service.app.dispatcher.register_dispatch", return_value=mock_manifest
)
mocker.patch("covalent_dispatcher._service.app.cancel_all_with_status")
resp = client.post("/api/v2/dispatches", data=mock_manifest.json())
resp = client.post("/api/v2/dispatches", data=mock_manifest.model_dump_json())

assert resp.json() == json.loads(mock_manifest.json())
assert resp.json() == json.loads(mock_manifest.model_dump_json())
mock_register_dispatch.assert_awaited_with(mock_manifest, None)


Expand All @@ -172,7 +172,7 @@ def test_register_exception(mocker, app, client, mock_manifest):
"covalent_dispatcher._service.app.dispatcher.register_dispatch", side_effect=RuntimeError()
)
mocker.patch("covalent_dispatcher._service.app.cancel_all_with_status")
resp = client.post("/api/v2/dispatches", data=mock_manifest.json())
resp = client.post("/api/v2/dispatches", data=mock_manifest.model_dump_json())
assert resp.status_code == 400


Expand All @@ -183,9 +183,11 @@ def test_register_redispatch(mocker, app, client, mock_manifest):
return_value=mock_manifest,
)
mocker.patch("covalent_dispatcher._service.app.cancel_all_with_status")
resp = client.post(f"/api/v2/dispatches/{dispatch_id}/redispatches", data=mock_manifest.json())
resp = client.post(
f"/api/v2/dispatches/{dispatch_id}/redispatches", data=mock_manifest.model_dump_json()
)
mock_register_redispatch.assert_awaited_with(mock_manifest, dispatch_id, False)
assert resp.json() == json.loads(mock_manifest.json())
assert resp.json() == json.loads(mock_manifest.model_dump_json())


def test_register_redispatch_reuse(mocker, app, client, mock_manifest):
Expand All @@ -197,11 +199,11 @@ def test_register_redispatch_reuse(mocker, app, client, mock_manifest):
mocker.patch("covalent_dispatcher._service.app.cancel_all_with_status")
resp = client.post(
f"/api/v2/dispatches/{dispatch_id}/redispatches",
data=mock_manifest.json(),
data=mock_manifest.model_dump_json(),
params={"reuse_previous_results": True},
)
mock_register_redispatch.assert_awaited_with(mock_manifest, dispatch_id, True)
assert resp.json() == json.loads(mock_manifest.json())
assert resp.json() == json.loads(mock_manifest.model_dump_json())


def test_register_redispatch_exception(mocker, app, client, mock_manifest):
Expand All @@ -211,7 +213,9 @@ def test_register_redispatch_exception(mocker, app, client, mock_manifest):
side_effect=RuntimeError(),
)
mocker.patch("covalent_dispatcher._service.app.cancel_all_with_status")
resp = client.post(f"/api/v2/dispatches/{dispatch_id}/redispatches", data=mock_manifest.json())
resp = client.post(
f"/api/v2/dispatches/{dispatch_id}/redispatches", data=mock_manifest.model_dump_json()
)
assert resp.status_code == 400


Expand All @@ -236,7 +240,7 @@ def test_export_manifest(mocker, app, client, mock_manifest):
)
mocker.patch("covalent_dispatcher._service.app.cancel_all_with_status")
resp = client.get(f"/api/v2/dispatches/{dispatch_id}")
assert resp.json() == json.loads(mock_manifest.json())
assert resp.json() == json.loads(mock_manifest.model_dump_json())


def test_export_manifest_bad_dispatch_id(mocker, app, client, mock_manifest):
Expand Down
36 changes: 36 additions & 0 deletions tests/functional_tests/workflow_stack_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -816,6 +816,42 @@ def workflow(x, y):
assert res_obj_5.result == 49


def test_redispatch_doesnt_overwrite_previous_outputs():

# Use an impure function
@ct.lattice
@ct.electron
def read_from_file(filename: str) -> str:
with open(filename, "r") as f:
return f.read()

t = tempfile.NamedTemporaryFile(mode="w+", delete=False, delete_on_close=False)

t.write("run_1")
t.flush()

dispatch_id = ct.dispatch(read_from_file)(t.name)
orig_res = ct.get_result(dispatch_id, wait=True)
assert orig_res.status == "COMPLETED"
assert orig_res.result == "run_1"

# Modify the file and redispatch
t.seek(0)
t.write("run_2")
t.flush()

redispatch_id = ct.redispatch(dispatch_id, reuse_previous_results=False)()
redispatch_res = ct.get_result(redispatch_id, wait=True)
assert redispatch_res.status == "COMPLETED"
assert redispatch_res.result == "run_2"

# Check that the original dispatch's outputs remain unchanged
# Redownload the results
orig_res = ct.get_result(dispatch_id)
assert orig_res.get_node_result(0)["output"].get_deserialized() == "run_1"
assert orig_res.result == "run_1"


def test_redispatch_reusing_previous_results():
"""Test reusing previous results for redispatching"""

Expand Down
Loading