diff --git a/zag/engines/action_engine/completer.py b/zag/engines/action_engine/completer.py index 80eb2710..e2e62ff5 100644 --- a/zag/engines/action_engine/completer.py +++ b/zag/engines/action_engine/completer.py @@ -15,6 +15,7 @@ # under the License. import abc +import itertools import weakref from oslo_utils import reflection @@ -23,6 +24,7 @@ from zag.engines.action_engine import compiler as co from zag.engines.action_engine import executor as ex +from zag.engines.action_engine import traversal as tr from zag import logging from zag import retry as retry_atom from zag import states as st @@ -36,8 +38,9 @@ class Strategy(object): strategy = None - def __init__(self, runtime): + def __init__(self, runtime, atom=None): self._runtime = runtime + self._atom = atom @abc.abstractmethod def apply(self): @@ -57,14 +60,10 @@ class RevertAndRetry(Strategy): strategy = retry_atom.RETRY - def __init__(self, runtime, retry): - super(RevertAndRetry, self).__init__(runtime) - self._retry = retry - def apply(self): - tweaked = self._runtime.reset_atoms([self._retry], state=None, + tweaked = self._runtime.reset_atoms([self._atom], state=None, intention=st.RETRY) - tweaked.extend(self._runtime.reset_subgraph(self._retry, state=None, + tweaked.extend(self._runtime.reset_subgraph(self._atom, state=None, intention=st.REVERT)) return tweaked @@ -74,9 +73,6 @@ class RevertAll(Strategy): strategy = retry_atom.REVERT_ALL - def __init__(self, runtime): - super(RevertAll, self).__init__(runtime) - def apply(self): return self._runtime.reset_atoms( self._runtime.iterate_nodes(co.ATOMS), @@ -88,10 +84,6 @@ class Revert(Strategy): strategy = retry_atom.REVERT - def __init__(self, runtime, atom): - super(Revert, self).__init__(runtime) - self._atom = atom - def apply(self): tweaked = self._runtime.reset_atoms([self._atom], state=None, intention=st.REVERT) @@ -100,6 +92,46 @@ def apply(self): return tweaked +class Abort(Strategy): + """Sets atom and any unfinished nodes to the ``IGNORE`` intention.""" + + strategy = retry_atom.ABORT + + def apply(self): + execution_graph = self._runtime.compilation.execution_graph + successors_iter = tr.depth_first_iterate( + execution_graph, + self._atom, + tr.Direction.FORWARD + ) + return self._runtime.reset_atoms( + itertools.chain([self._atom], successors_iter), + state=st.IGNORE, + intention=st.IGNORE, + ) + + +class Ignore(Strategy): + """Sets atom and *associated* nodes to the ``IGNORE`` intention.""" + + strategy = retry_atom.IGNORE + + def apply(self): + execution_graph = self._runtime.compilation.execution_graph + successors_iter = tr.depth_first_iterate( + execution_graph, + self._atom, + tr.Direction.FORWARD, + through_flows=False, + through_retries=False, + ) + return self._runtime.reset_atoms( + itertools.chain([self._atom], successors_iter), + state=st.IGNORE, + intention=st.IGNORE, + ) + + class Completer(object): """Completes atoms using actions to complete them.""" @@ -176,9 +208,7 @@ def _determine_resolution(self, atom, failure): # Ask retry controller what to do in case of failure. handler = self._runtime.fetch_action(retry) strategy = handler.on_failure(retry, atom, failure) - if strategy == retry_atom.RETRY: - return RevertAndRetry(self._runtime, retry) - elif strategy == retry_atom.REVERT: + if strategy == retry_atom.REVERT: # Ask parent retry and figure out what to do... parent_resolver = self._determine_resolution(retry, failure) # In the future, this will be the only behavior. REVERT @@ -192,9 +222,16 @@ def _determine_resolution(self, atom, failure): if parent_resolver is not self._undefined_resolver: if parent_resolver.strategy != retry_atom.REVERT: return parent_resolver - return Revert(self._runtime, retry) - elif strategy == retry_atom.REVERT_ALL: - return RevertAll(self._runtime) + + # find the strategy subclass that matches the strategy and use it + strategy_cls = None + for subclass in Strategy.__subclasses__(): + if subclass.strategy == strategy: + strategy_cls = subclass + break + + if strategy_cls: + return strategy_cls(self._runtime, retry) else: raise ValueError("Unknown atom failure resolution" " action/strategy '%s'" % strategy) diff --git a/zag/engines/action_engine/engine.py b/zag/engines/action_engine/engine.py index 15f97110..5140c8fd 100644 --- a/zag/engines/action_engine/engine.py +++ b/zag/engines/action_engine/engine.py @@ -321,7 +321,7 @@ def run_iter(self, timeout=None): except Exception: with excutils.save_and_reraise_exception(): LOG.exception("Engine execution has failed, something" - " bad must of happened (last" + " bad must have happened (last" " %s machine transitions were %s)", last_transitions.maxlen, list(last_transitions)) diff --git a/zag/engines/action_engine/selector.py b/zag/engines/action_engine/selector.py index a36ad7a0..31ef30bb 100644 --- a/zag/engines/action_engine/selector.py +++ b/zag/engines/action_engine/selector.py @@ -59,6 +59,8 @@ def iter_next_atoms(self, atom=None): return self._browse_atoms_for_execute(atom=atom) else: return iter([]) + elif state == st.IGNORE and intention == st.IGNORE: + return self._browse_atoms_for_execute(atom=atom) elif state == st.REVERTED: return self._browse_atoms_for_revert(atom=atom) elif state == st.FAILURE: diff --git a/zag/retry.py b/zag/retry.py index b862f371..634d0a35 100644 --- a/zag/retry.py +++ b/zag/retry.py @@ -54,13 +54,35 @@ class Decision(misc.StrEnum): retry strategy associated with it. """ - #: Retries the surrounding/associated subflow again. RETRY = "RETRY" + """Retries the surrounding/associated subflow. + + This strategy will revert tasks within the associated subflow, then + re-run all the tasks again. + """ + + ABORT = "ABORT" + """Ends the entire flow immediately without reverting anything. + + This strategy will just end the flow without marking anything as failed. + If you want a task to be able to short-circuit a flow without triggering + any error handling, this is what you want. + """ + + IGNORE = "IGNORE" + """Ends the subflow immediately and continues on to the remaining flow. + + This strategy will abandon the associated subflow but continue processing + the outer flow as if the subflow had succeeded. + """ + # Retain these aliases for a number of releases... REVERT = Decision.REVERT REVERT_ALL = Decision.REVERT_ALL RETRY = Decision.RETRY +ABORT = Decision.ABORT +IGNORE = Decision.IGNORE # Constants passed into revert/execute kwargs. # @@ -379,3 +401,23 @@ def on_failure(self, values, history, *args, **kwargs): def execute(self, values, history, *args, **kwargs): return self._get_next_value(values, history) + + +class AlwaysAbort(Retry): + """Retry that always aborts entire flow.""" + + def on_failure(self, *args, **kwargs): + return ABORT + + def execute(self, *args, **kwargs): + pass + + +class AlwaysIgnore(Retry): + """Retry that always ignores subflow.""" + + def on_failure(self, *args, **kwargs): + return IGNORE + + def execute(self, *args, **kwargs): + pass diff --git a/zag/tests/unit/test_retries.py b/zag/tests/unit/test_retries.py index e4307aba..4def22ea 100644 --- a/zag/tests/unit/test_retries.py +++ b/zag/tests/unit/test_retries.py @@ -1188,6 +1188,63 @@ def test_nested_provides_graph_retried_correctly(self): self.assertItemsEqual(expected, capturer.values[4:]) self.assertEqual(st.SUCCESS, engine.storage.get_flow_state()) + def test_abort(self): + retry1 = retry.AlwaysAbort('r1') + flow = lf.Flow('flow-1').add( + utils.ProgressingTask('task-1'), + lf.Flow('flow-2', retry1).add( + utils.FailingTask('task-2'), + utils.ProgressingTask('task-3'), + ), + utils.ProgressingTask('task-4'), + ) + engine = self._make_engine(flow) + with utils.CaptureListener(engine) as capturer: + engine.run() + expected = ['flow-1.f RUNNING', + 'task-1.t RUNNING', + 'task-1.t SUCCESS(5)', + 'r1.r RUNNING', + 'r1.r SUCCESS(None)', + 'task-2.t RUNNING', + 'task-2.t FAILURE(Failure: RuntimeError: Woot!)', + 'r1.r IGNORE', + 'task-2.t IGNORE', + 'task-3.t IGNORE', + 'task-4.t IGNORE', + 'flow-1.f SUCCESS'] + + self.assertEqual(expected, capturer.values) + + def test_ignore(self): + retry1 = retry.AlwaysIgnore('r1') + flow = lf.Flow('flow-1').add( + utils.ProgressingTask('task-1'), + lf.Flow('flow-2', retry1).add( + utils.FailingTask('task-2'), + utils.ProgressingTask('task-3'), + ), + utils.ProgressingTask('task-4'), + ) + engine = self._make_engine(flow) + with utils.CaptureListener(engine) as capturer: + engine.run() + expected = ['flow-1.f RUNNING', + 'task-1.t RUNNING', + 'task-1.t SUCCESS(5)', + 'r1.r RUNNING', + 'r1.r SUCCESS(None)', + 'task-2.t RUNNING', + 'task-2.t FAILURE(Failure: RuntimeError: Woot!)', + 'r1.r IGNORE', + 'task-2.t IGNORE', + 'task-3.t IGNORE', + 'task-4.t RUNNING', + 'task-4.t SUCCESS(5)', + 'flow-1.f SUCCESS'] + + self.assertEqual(expected, capturer.values) + class RetryParallelExecutionTest(utils.EngineTestBase): # FIXME(harlowja): fix this class so that it doesn't use events or uses