-
Notifications
You must be signed in to change notification settings - Fork 20
Description
Just to preface this with I'm coming from using the library in Dapr Workflows.
I'm a bit confused with what the intended behaviour should be with the WhenAllTask and what happens when one of the tasks raises an exception. As I'm having trouble catching an exception when using when_all() - the trouble being I can't.
I've narrowed it down to the fact that there are a couple of instances in this code here:
durabletask-python/durabletask/task.py
Lines 276 to 302 in 8598f6b
| class WhenAllTask(CompositeTask[List[T]]): | |
| """A task that completes when all of its child tasks complete.""" | |
| def __init__(self, tasks: List[Task[T]]): | |
| super().__init__(tasks) | |
| self._completed_tasks = 0 | |
| self._failed_tasks = 0 | |
| @property | |
| def pending_tasks(self) -> int: | |
| """Returns the number of tasks that have not yet completed.""" | |
| return len(self._tasks) - self._completed_tasks | |
| def on_child_completed(self, task: Task[T]): | |
| if self.is_complete: | |
| raise ValueError('The task has already completed.') | |
| self._completed_tasks += 1 | |
| if task.is_failed and self._exception is None: | |
| self._exception = task.get_exception() | |
| self._is_complete = True | |
| if self._completed_tasks == len(self._tasks): | |
| # The order of the result MUST match the order of the tasks provided to the constructor. | |
| self._result = [task.get_result() for task in self._tasks] | |
| self._is_complete = True | |
| def get_completed_tasks(self) -> int: | |
| return self._completed_tasks |
Where an Exception is raised directly:
durabletask-python/durabletask/task.py
Line 291 in 8598f6b
raise ValueError('The task has already completed.') durabletask-python/durabletask/task.py
Line 298 in 8598f6b
self._result = [task.get_result() for task in self._tasks]
Number 2. is raised as get_result() for a task raises an associated exception if present as far as I can see.
When either of these are raised directly then the workflow immediately transitions to a FAILED status with no chance to catch.
While I can see that having the workflow just raise the exception is probably by design, I've come across a use case where I wanted to be able to handle a fan out failure as I need to do a clean up step before failing the workflow.
However if you allow for the _exception to be set and avoid raising these errors directly with something like:
def on_child_completed(self, task: Task[T]):
if self.is_complete:
if self._exception is None:
self._exception = ValueError('The task has already completed.')
self._result = [] # I don't know what this should be
self._completed_tasks += 1
if task.is_failed and self._exception is None:
self._failed_tasks += 1
self._exception = task.get_exception()
self._is_complete = True
if self._completed_tasks == len(self._tasks):
# The order of the result MUST match the order of the tasks provided to the constructor.
self._result = [task.get_result() if not task.is_failed else task.get_exception() for task in self._tasks]
self._is_complete = TrueI am able to get the behaviour I expect where the workflow task then raises the exception that has been assigned to it and I can catch it and do what I need to do following the failure.
Though I'm not sure about that first is_complete check, is it necessary? I'm not sure if all tasks should be allowed to complete and just give the user the choice of whether to have the exception raised or have the exceptions included in the result (a bit like asyncio gather tasks I suppose).
I would be happy to open a PR for this but I'm not sure what would be wanted here?