From a4b82659acf28cd1077593f710453673e39cf602 Mon Sep 17 00:00:00 2001 From: Greg Hill Date: Wed, 9 Jan 2019 10:05:32 -0600 Subject: [PATCH] Add ABORT and IGNORE retry strategies ABORT will abandon the entire flow without marking anything as failed IGNORE will ignore the error and continue on with the remainder of the flow beyond the current subflow. Fixes #26 --- zag/engines/action_engine/completer.py | 77 +++++++++++++++++++------- zag/engines/action_engine/engine.py | 2 +- zag/engines/action_engine/selector.py | 2 + zag/retry.py | 44 ++++++++++++++- zag/tests/unit/test_retries.py | 57 +++++++++++++++++++ 5 files changed, 160 insertions(+), 22 deletions(-) diff --git a/zag/engines/action_engine/completer.py b/zag/engines/action_engine/completer.py index 80eb2710..e2e62ff5 100644 --- a/zag/engines/action_engine/completer.py +++ b/zag/engines/action_engine/completer.py @@ -15,6 +15,7 @@ # under the License. import abc +import itertools import weakref from oslo_utils import reflection @@ -23,6 +24,7 @@ from zag.engines.action_engine import compiler as co from zag.engines.action_engine import executor as ex +from zag.engines.action_engine import traversal as tr from zag import logging from zag import retry as retry_atom from zag import states as st @@ -36,8 +38,9 @@ class Strategy(object): strategy = None - def __init__(self, runtime): + def __init__(self, runtime, atom=None): self._runtime = runtime + self._atom = atom @abc.abstractmethod def apply(self): @@ -57,14 +60,10 @@ class RevertAndRetry(Strategy): strategy = retry_atom.RETRY - def __init__(self, runtime, retry): - super(RevertAndRetry, self).__init__(runtime) - self._retry = retry - def apply(self): - tweaked = self._runtime.reset_atoms([self._retry], state=None, + tweaked = self._runtime.reset_atoms([self._atom], state=None, intention=st.RETRY) - tweaked.extend(self._runtime.reset_subgraph(self._retry, state=None, + tweaked.extend(self._runtime.reset_subgraph(self._atom, state=None, intention=st.REVERT)) return tweaked @@ -74,9 +73,6 @@ class RevertAll(Strategy): strategy = retry_atom.REVERT_ALL - def __init__(self, runtime): - super(RevertAll, self).__init__(runtime) - def apply(self): return self._runtime.reset_atoms( self._runtime.iterate_nodes(co.ATOMS), @@ -88,10 +84,6 @@ class Revert(Strategy): strategy = retry_atom.REVERT - def __init__(self, runtime, atom): - super(Revert, self).__init__(runtime) - self._atom = atom - def apply(self): tweaked = self._runtime.reset_atoms([self._atom], state=None, intention=st.REVERT) @@ -100,6 +92,46 @@ def apply(self): return tweaked +class Abort(Strategy): + """Sets atom and any unfinished nodes to the ``IGNORE`` intention.""" + + strategy = retry_atom.ABORT + + def apply(self): + execution_graph = self._runtime.compilation.execution_graph + successors_iter = tr.depth_first_iterate( + execution_graph, + self._atom, + tr.Direction.FORWARD + ) + return self._runtime.reset_atoms( + itertools.chain([self._atom], successors_iter), + state=st.IGNORE, + intention=st.IGNORE, + ) + + +class Ignore(Strategy): + """Sets atom and *associated* nodes to the ``IGNORE`` intention.""" + + strategy = retry_atom.IGNORE + + def apply(self): + execution_graph = self._runtime.compilation.execution_graph + successors_iter = tr.depth_first_iterate( + execution_graph, + self._atom, + tr.Direction.FORWARD, + through_flows=False, + through_retries=False, + ) + return self._runtime.reset_atoms( + itertools.chain([self._atom], successors_iter), + state=st.IGNORE, + intention=st.IGNORE, + ) + + class Completer(object): """Completes atoms using actions to complete them.""" @@ -176,9 +208,7 @@ def _determine_resolution(self, atom, failure): # Ask retry controller what to do in case of failure. handler = self._runtime.fetch_action(retry) strategy = handler.on_failure(retry, atom, failure) - if strategy == retry_atom.RETRY: - return RevertAndRetry(self._runtime, retry) - elif strategy == retry_atom.REVERT: + if strategy == retry_atom.REVERT: # Ask parent retry and figure out what to do... parent_resolver = self._determine_resolution(retry, failure) # In the future, this will be the only behavior. REVERT @@ -192,9 +222,16 @@ def _determine_resolution(self, atom, failure): if parent_resolver is not self._undefined_resolver: if parent_resolver.strategy != retry_atom.REVERT: return parent_resolver - return Revert(self._runtime, retry) - elif strategy == retry_atom.REVERT_ALL: - return RevertAll(self._runtime) + + # find the strategy subclass that matches the strategy and use it + strategy_cls = None + for subclass in Strategy.__subclasses__(): + if subclass.strategy == strategy: + strategy_cls = subclass + break + + if strategy_cls: + return strategy_cls(self._runtime, retry) else: raise ValueError("Unknown atom failure resolution" " action/strategy '%s'" % strategy) diff --git a/zag/engines/action_engine/engine.py b/zag/engines/action_engine/engine.py index 15f97110..5140c8fd 100644 --- a/zag/engines/action_engine/engine.py +++ b/zag/engines/action_engine/engine.py @@ -321,7 +321,7 @@ def run_iter(self, timeout=None): except Exception: with excutils.save_and_reraise_exception(): LOG.exception("Engine execution has failed, something" - " bad must of happened (last" + " bad must have happened (last" " %s machine transitions were %s)", last_transitions.maxlen, list(last_transitions)) diff --git a/zag/engines/action_engine/selector.py b/zag/engines/action_engine/selector.py index a36ad7a0..31ef30bb 100644 --- a/zag/engines/action_engine/selector.py +++ b/zag/engines/action_engine/selector.py @@ -59,6 +59,8 @@ def iter_next_atoms(self, atom=None): return self._browse_atoms_for_execute(atom=atom) else: return iter([]) + elif state == st.IGNORE and intention == st.IGNORE: + return self._browse_atoms_for_execute(atom=atom) elif state == st.REVERTED: return self._browse_atoms_for_revert(atom=atom) elif state == st.FAILURE: diff --git a/zag/retry.py b/zag/retry.py index b862f371..634d0a35 100644 --- a/zag/retry.py +++ b/zag/retry.py @@ -54,13 +54,35 @@ class Decision(misc.StrEnum): retry strategy associated with it. """ - #: Retries the surrounding/associated subflow again. RETRY = "RETRY" + """Retries the surrounding/associated subflow. + + This strategy will revert tasks within the associated subflow, then + re-run all the tasks again. + """ + + ABORT = "ABORT" + """Ends the entire flow immediately without reverting anything. + + This strategy will just end the flow without marking anything as failed. + If you want a task to be able to short-circuit a flow without triggering + any error handling, this is what you want. + """ + + IGNORE = "IGNORE" + """Ends the subflow immediately and continues on to the remaining flow. + + This strategy will abandon the associated subflow but continue processing + the outer flow as if the subflow had succeeded. + """ + # Retain these aliases for a number of releases... REVERT = Decision.REVERT REVERT_ALL = Decision.REVERT_ALL RETRY = Decision.RETRY +ABORT = Decision.ABORT +IGNORE = Decision.IGNORE # Constants passed into revert/execute kwargs. # @@ -379,3 +401,23 @@ def on_failure(self, values, history, *args, **kwargs): def execute(self, values, history, *args, **kwargs): return self._get_next_value(values, history) + + +class AlwaysAbort(Retry): + """Retry that always aborts entire flow.""" + + def on_failure(self, *args, **kwargs): + return ABORT + + def execute(self, *args, **kwargs): + pass + + +class AlwaysIgnore(Retry): + """Retry that always ignores subflow.""" + + def on_failure(self, *args, **kwargs): + return IGNORE + + def execute(self, *args, **kwargs): + pass diff --git a/zag/tests/unit/test_retries.py b/zag/tests/unit/test_retries.py index e4307aba..4def22ea 100644 --- a/zag/tests/unit/test_retries.py +++ b/zag/tests/unit/test_retries.py @@ -1188,6 +1188,63 @@ def test_nested_provides_graph_retried_correctly(self): self.assertItemsEqual(expected, capturer.values[4:]) self.assertEqual(st.SUCCESS, engine.storage.get_flow_state()) + def test_abort(self): + retry1 = retry.AlwaysAbort('r1') + flow = lf.Flow('flow-1').add( + utils.ProgressingTask('task-1'), + lf.Flow('flow-2', retry1).add( + utils.FailingTask('task-2'), + utils.ProgressingTask('task-3'), + ), + utils.ProgressingTask('task-4'), + ) + engine = self._make_engine(flow) + with utils.CaptureListener(engine) as capturer: + engine.run() + expected = ['flow-1.f RUNNING', + 'task-1.t RUNNING', + 'task-1.t SUCCESS(5)', + 'r1.r RUNNING', + 'r1.r SUCCESS(None)', + 'task-2.t RUNNING', + 'task-2.t FAILURE(Failure: RuntimeError: Woot!)', + 'r1.r IGNORE', + 'task-2.t IGNORE', + 'task-3.t IGNORE', + 'task-4.t IGNORE', + 'flow-1.f SUCCESS'] + + self.assertEqual(expected, capturer.values) + + def test_ignore(self): + retry1 = retry.AlwaysIgnore('r1') + flow = lf.Flow('flow-1').add( + utils.ProgressingTask('task-1'), + lf.Flow('flow-2', retry1).add( + utils.FailingTask('task-2'), + utils.ProgressingTask('task-3'), + ), + utils.ProgressingTask('task-4'), + ) + engine = self._make_engine(flow) + with utils.CaptureListener(engine) as capturer: + engine.run() + expected = ['flow-1.f RUNNING', + 'task-1.t RUNNING', + 'task-1.t SUCCESS(5)', + 'r1.r RUNNING', + 'r1.r SUCCESS(None)', + 'task-2.t RUNNING', + 'task-2.t FAILURE(Failure: RuntimeError: Woot!)', + 'r1.r IGNORE', + 'task-2.t IGNORE', + 'task-3.t IGNORE', + 'task-4.t RUNNING', + 'task-4.t SUCCESS(5)', + 'flow-1.f SUCCESS'] + + self.assertEqual(expected, capturer.values) + class RetryParallelExecutionTest(utils.EngineTestBase): # FIXME(harlowja): fix this class so that it doesn't use events or uses