From bbffced52cfb046b28ef9a3c1f501ddae1aa280c Mon Sep 17 00:00:00 2001 From: Guillaume De Saint Martin Date: Fri, 26 Jun 2026 09:29:08 +0200 Subject: [PATCH 1/2] [Node] restore non retryable errors --- .../flow/octobot_flow/jobs/automation_job.py | 16 +++- .../flow/tests/jobs/test_automation_job.py | 93 +++++++++++++++++++ .../flow/tests/jobs/test_automations_job.py | 15 --- .../workflows/automation_workflow.py | 9 ++ .../workflows/test_automation_workflow.py | 14 ++- 5 files changed, 123 insertions(+), 24 deletions(-) create mode 100644 packages/flow/tests/jobs/test_automation_job.py delete mode 100644 packages/flow/tests/jobs/test_automations_job.py diff --git a/packages/flow/octobot_flow/jobs/automation_job.py b/packages/flow/octobot_flow/jobs/automation_job.py index 7f9521061c..1fa3f381df 100644 --- a/packages/flow/octobot_flow/jobs/automation_job.py +++ b/packages/flow/octobot_flow/jobs/automation_job.py @@ -5,6 +5,7 @@ import octobot_commons.json_util as json_util import octobot_commons.logging as common_logging import octobot.community +import octobot.community.wallet_backend.errors as wallet_backend_errors import octobot_commons.profiles.profile_data as profile_data_import import octobot_copy.constants as copy_constants @@ -413,12 +414,17 @@ async def _emit_trading_signals( trading_signals_repository = octobot_flow.repositories.community.TradingSignalsRepository.from_community_repository( maybe_community_repository ) - await trading_signals_repository.insert_trading_signal( - octobot_flow.entities.TradingSignal( - strategy_id=automation.metadata.strategy_id, account=account + try: + await trading_signals_repository.insert_trading_signal( + octobot_flow.entities.TradingSignal( + strategy_id=automation.metadata.strategy_id, account=account + ) ) - ) - + except wallet_backend_errors.WalletNotFoundError as err: + self._logger.error( + f"Skipping trading signal emission: {err}" + ) + def _get_actions_to_execute(self) -> tuple[list[octobot_flow.entities.AbstractActionDetails], bool]: if pending_priority_actions := self._get_pending_priority_actions(): return pending_priority_actions, True diff --git a/packages/flow/tests/jobs/test_automation_job.py b/packages/flow/tests/jobs/test_automation_job.py new file mode 100644 index 0000000000..4e50da9cc6 --- /dev/null +++ b/packages/flow/tests/jobs/test_automation_job.py @@ -0,0 +1,93 @@ +import mock +import pytest +import time + +import octobot.community.wallet_backend.errors as wallet_backend_errors +import octobot_copy.constants as copy_constants +import octobot_protocol.models as protocol_models + +import octobot_flow.entities +import octobot_flow.errors +import octobot_flow.jobs.automation_job as automation_job_module +import octobot_flow.logic.actions +import octobot_flow.logic.configuration +import octobot_flow.repositories.community + +from tests.functionnal_tests import auth_details, global_state + + +STRATEGY_ID = "test-strategy-id" + + +def _minimal_automation_job() -> automation_job_module.AutomationJob: + automation_state = { + "automation": { + "metadata": {"automation_id": "automation_1"}, + "actions_dag": {"actions": []}, + } + } + auth_details = octobot_flow.entities.UserAuthentication(wallet_address="0xtest") + return automation_job_module.AutomationJob(automation_state, [], [], auth_details) + + +def _minimal_copied_account() -> protocol_models.CopiedAccount: + return protocol_models.CopiedAccount( + version=copy_constants.COPIED_ACCOUNT_VERSION, + updated_at=time.time(), + copied_assets=[], + ) + + +class TestValidateInput: + @pytest.mark.asyncio + async def test_not_automations_configured( + self, + global_state: dict, + auth_details: octobot_flow.entities.UserAuthentication, + ): + global_state["automation"] = {} + with pytest.raises(octobot_flow.errors.NoAutomationError): + async with automation_job_module.AutomationJob(global_state, [], [], auth_details): + pass + + +class TestEmitTradingSignals: + @pytest.mark.asyncio + async def test_skips_emission_and_logs_when_wallet_not_found(self): + automation_job = _minimal_automation_job() + automation = automation_job.automation_state.automation + automation.metadata.strategy_id = STRATEGY_ID + automation.exchange_account_elements = octobot_flow.entities.ExchangeAccountElements() + fetched_dependencies = octobot_flow.entities.FetchedDependencies() + community_repository = mock.Mock() + wallet_error = wallet_backend_errors.WalletNotFoundError("Wallet not found") + insert_trading_signal_mock = mock.AsyncMock(side_effect=wallet_error) + copied_account = _minimal_copied_account() + + with mock.patch.object( + octobot_flow.repositories.community.TradingSignalsRepository, + "from_community_repository", + return_value=mock.Mock(insert_trading_signal=insert_trading_signal_mock), + ), mock.patch.object( + octobot_flow.logic.configuration, + "infer_reference_market", + return_value="USDT", + ), mock.patch.object( + octobot_flow.logic.actions, + "reference_exchange_elements_to_account", + return_value=copied_account, + ), mock.patch.object( + automation_job._logger, + "error", + ) as error_log_mock: + await automation_job._emit_trading_signals( + community_repository, + automation, + fetched_dependencies, + ) + + insert_trading_signal_mock.assert_awaited_once() + emitted_signal = insert_trading_signal_mock.await_args.args[0] + assert emitted_signal.strategy_id == STRATEGY_ID + assert emitted_signal.account is copied_account + error_log_mock.assert_called_once_with(f"Skipping trading signal emission: {wallet_error}") diff --git a/packages/flow/tests/jobs/test_automations_job.py b/packages/flow/tests/jobs/test_automations_job.py deleted file mode 100644 index 448fa5a8e1..0000000000 --- a/packages/flow/tests/jobs/test_automations_job.py +++ /dev/null @@ -1,15 +0,0 @@ -import pytest - -import octobot_flow.jobs -import octobot_flow.entities -import octobot_flow.errors - -from tests.functionnal_tests import global_state, auth_details - - -@pytest.mark.asyncio -async def test_not_automations_configured(global_state: dict, auth_details: octobot_flow.entities.UserAuthentication): - global_state["automation"] = {} - with pytest.raises(octobot_flow.errors.NoAutomationError): - async with octobot_flow.jobs.AutomationJob(global_state, [], [], auth_details): - pass diff --git a/packages/node/octobot_node/scheduler/workflows/automation_workflow.py b/packages/node/octobot_node/scheduler/workflows/automation_workflow.py index e3d6862e50..210c4ad025 100644 --- a/packages/node/octobot_node/scheduler/workflows/automation_workflow.py +++ b/packages/node/octobot_node/scheduler/workflows/automation_workflow.py @@ -130,6 +130,14 @@ async def execute_automation(inputs: dict) -> typing.Optional[str]: ) return json.dumps(output.to_dict(include_default_values=False)) if output else None + @staticmethod + def _should_retry(error: BaseException) -> bool: + return not isinstance(error, ( + # workflow stopping errors + errors.WorkflowError, + octobot_flow.errors.ConfigurationError, + )) + @staticmethod @SCHEDULER.INSTANCE.step( name="execute_iteration", @@ -137,6 +145,7 @@ async def execute_automation(inputs: dict) -> typing.Optional[str]: interval_seconds=constants.AUTOMATION_WORKFLOW_RETRY_INTERVAL_SECONDS, max_attempts=constants.AUTOMATION_WORKFLOW_MAX_ITERATION_RETRIES, backoff_rate=constants.AUTOMATION_WORKFLOW_BACKOFF_RATE, + should_retry=_should_retry, ) async def execute_iteration(inputs: dict, actions_update: typing.Optional[dict]) -> dict: """ diff --git a/packages/node/tests/scheduler/workflows/test_automation_workflow.py b/packages/node/tests/scheduler/workflows/test_automation_workflow.py index 3df64bf896..c9f542354b 100644 --- a/packages/node/tests/scheduler/workflows/test_automation_workflow.py +++ b/packages/node/tests/scheduler/workflows/test_automation_workflow.py @@ -394,10 +394,16 @@ async def test_execute_automation( max_attempts, ), ( - octobot_flow.errors.InvalidAutomationActionError("invalid action config"), - octobot_flow.enums.AutomationWorkflowErrorStatus.INVALID_ACTION_CONFIGURATION.value, - dbos_retries_exhausted_message, - max_attempts, + octobot_flow.errors.InvalidAutomationActionError("invalid action config"), # non retryable ConfigurationError + octobot_flow.enums.AutomationWorkflowErrorStatus.EXCEPTION_DURING_ITERATION.value, + "invalid action config", + 1, # only 1 attempt: this raises a non retryable error + ), + ( + errors.WorkflowInputError("invalid action config"), # non retryable WorkflowError + octobot_flow.enums.AutomationWorkflowErrorStatus.EXCEPTION_DURING_ITERATION.value, + "invalid action config", + 1, # only 1 attempt: this raises a non retryable error ), ] for ( From 7a34c09bcd94931d45b1a68da5a4c7e47b0449b9 Mon Sep 17 00:00:00 2001 From: Guillaume De Saint Martin Date: Fri, 26 Jun 2026 09:53:09 +0200 Subject: [PATCH 2/2] [Node] improve error logging --- .../entities/actions/actions_dependencies.py | 18 +++- .../actions/test_actions_dependencies.py | 66 +++++++++++++++ .../scheduler/octobot_flow_client.py | 4 +- .../workflows/automation_workflow.py | 6 ++ ...ent_lib.py => test_octobot_flow_client.py} | 0 .../workflows/test_automation_workflow.py | 84 ++++++++++++++++++- 6 files changed, 171 insertions(+), 7 deletions(-) rename packages/node/tests/scheduler/{test_octobot_flow_client_lib.py => test_octobot_flow_client.py} (100%) diff --git a/packages/flow/octobot_flow/entities/actions/actions_dependencies.py b/packages/flow/octobot_flow/entities/actions/actions_dependencies.py index 062cbfe6fe..2e2484e9a1 100644 --- a/packages/flow/octobot_flow/entities/actions/actions_dependencies.py +++ b/packages/flow/octobot_flow/entities/actions/actions_dependencies.py @@ -1,6 +1,8 @@ import typing +import dataclasses import octobot_commons.dsl_interpreter +import octobot_commons.logging import octobot_flow.entities.actions.action_details as action_details import octobot_flow.enums @@ -93,7 +95,21 @@ def read_dependency_result( ) value = dependency_action.result if dependency.result_path: - value = _navigate_dict_path(value, dependency.result_path) + try: + value = _navigate_dict_path(value, dependency.result_path) + except octobot_flow.errors.ActionDependencyError as err: + dependency_dict = dataclasses.asdict(dependency) + result_keys = ( + list(value) if isinstance(value, dict) + else octobot_commons.logging.get_private_minimized_message_if_necessary(value) + ) + error_message = ( + f"Impossible to resolve dependency {dependency_dict} " + f"result path {dependency.result_path} from action {dependency_action.id} " + f"with action result shape: {result_keys}" + ) + f": {err}" + # add as much info as possible to the error message + raise octobot_flow.errors.ActionDependencyError(error_message) from err return value def _dependency_values_for_dynamic_dependencies( diff --git a/packages/flow/tests/entities/actions/test_actions_dependencies.py b/packages/flow/tests/entities/actions/test_actions_dependencies.py index bcd7d21ed8..7b73505619 100644 --- a/packages/flow/tests/entities/actions/test_actions_dependencies.py +++ b/packages/flow/tests/entities/actions/test_actions_dependencies.py @@ -13,9 +13,11 @@ # # You should have received a copy of the GNU Lesser General Public # License along with this library. +import mock import pytest import octobot_commons.constants as commons_constants +import octobot_commons.logging import octobot_flow.entities.actions.action_details as action_details import octobot_flow.entities.actions.actions_dependencies as actions_dependencies @@ -188,6 +190,70 @@ def test_raises_when_dependency_action_errored(self): with pytest.raises(flow_errors.ActionDependencyError): resolver.read_dependency_result(dependency) + def test_raises_extended_error_when_result_path_key_missing(self): + dependency_action = action_details.DSLScriptActionDetails( + id="action_source", + dsl_script="exchange_action()", + dependencies=[], + ) + dependency_action.complete(result={"nested": {"order_id": "abc-123"}}) + dependency = action_details.ActionDependency( + action_id="action_source", + parameter="exchange_order_id", + result_path=["nested", "missing_key"], + ) + resolver = actions_dependencies.ActionsDependenciesResolver(_actions_by_id(dependency_action)) + + with mock.patch.object( + octobot_commons.logging, + "get_private_minimized_message_if_necessary", + ) as minimize_message_mock: + with pytest.raises(flow_errors.ActionDependencyError) as error_info: + resolver.read_dependency_result(dependency) + + minimize_message_mock.assert_not_called() + error_message = str(error_info.value) + assert "Impossible to resolve dependency" in error_message + assert "action_id': 'action_source'" in error_message + assert "result_path': ['nested', 'missing_key']" in error_message + assert "from action action_source" in error_message + assert "with action result shape: ['nested']" in error_message + assert "missing key: missing_key" in error_message + assert isinstance(error_info.value.__cause__, flow_errors.ActionDependencyError) + + def test_raises_extended_error_when_result_path_list_index_out_of_range(self): + dependency_action = action_details.DSLScriptActionDetails( + id="action_source", + dsl_script="exchange_action()", + dependencies=[], + ) + dependency_action.complete(result=[{"id": "a"}]) + list_result = dependency_action.result + dependency = action_details.ActionDependency( + action_id="action_source", + parameter="exchange_order_id", + result_path=["5"], + ) + resolver = actions_dependencies.ActionsDependenciesResolver(_actions_by_id(dependency_action)) + + with mock.patch.object( + octobot_commons.logging, + "get_private_minimized_message_if_necessary", + return_value="minimized-list-shape", + ) as minimize_message_mock: + with pytest.raises(flow_errors.ActionDependencyError) as error_info: + resolver.read_dependency_result(dependency) + + minimize_message_mock.assert_called_once_with(list_result) + error_message = str(error_info.value) + assert "Impossible to resolve dependency" in error_message + assert "action_id': 'action_source'" in error_message + assert "result_path': ['5']" in error_message + assert "from action action_source" in error_message + assert "with action result shape: minimized-list-shape" in error_message + assert "list index 5 out of range" in error_message + assert isinstance(error_info.value.__cause__, flow_errors.ActionDependencyError) + class TestActionsDependenciesResolverFilledAllDependencies: def test_returns_true_when_all_dependencies_completed(self): diff --git a/packages/node/octobot_node/scheduler/octobot_flow_client.py b/packages/node/octobot_node/scheduler/octobot_flow_client.py index 574c0cabcf..3cfd579438 100644 --- a/packages/node/octobot_node/scheduler/octobot_flow_client.py +++ b/packages/node/octobot_node/scheduler/octobot_flow_client.py @@ -16,9 +16,9 @@ import typing import dataclasses import json -import logging import octobot_commons.dataclasses +import octobot_commons.logging import octobot_node.errors as errors import octobot_node.scheduler.workflows_util as workflows_util @@ -149,7 +149,7 @@ async def run(self) -> None: self.priority_user_actions or automation_job.automation_state.automation.actions_dag.get_executable_actions() ) - logging.getLogger(self.__class__.__name__).info(f"Running automation actions: {selected_actions}") + octobot_commons.logging.get_logger(self.__class__.__name__).info(f"Running automation actions: {selected_actions}") executed_actions = await automation_job.run() self.after_execution_state = automation_job.automation_state post_execution_state_dump = automation_job.dump() diff --git a/packages/node/octobot_node/scheduler/workflows/automation_workflow.py b/packages/node/octobot_node/scheduler/workflows/automation_workflow.py index 210c4ad025..ca2c04a230 100644 --- a/packages/node/octobot_node/scheduler/workflows/automation_workflow.py +++ b/packages/node/octobot_node/scheduler/workflows/automation_workflow.py @@ -199,6 +199,12 @@ async def execute_iteration(inputs: dict, actions_update: typing.Optional[dict]) has_next_actions_override = True next_iteration_description_override = parsed_inputs.task.content next_iteration_description_metadata_override = parsed_inputs.task.content_metadata + except Exception as err: + # log propagated errors to also associate them to the automation's error tracking + AutomationWorkflow.get_logger(parsed_inputs).exception( + err, True, f"Error while running automation job: {err}" + ) + raise if result.processed_actions: if latest_step := AutomationWorkflow._get_actions_summary(result.processed_actions, minimal=True): executed_step = latest_step diff --git a/packages/node/tests/scheduler/test_octobot_flow_client_lib.py b/packages/node/tests/scheduler/test_octobot_flow_client.py similarity index 100% rename from packages/node/tests/scheduler/test_octobot_flow_client_lib.py rename to packages/node/tests/scheduler/test_octobot_flow_client.py diff --git a/packages/node/tests/scheduler/workflows/test_automation_workflow.py b/packages/node/tests/scheduler/workflows/test_automation_workflow.py index c9f542354b..c1c0f67202 100644 --- a/packages/node/tests/scheduler/workflows/test_automation_workflow.py +++ b/packages/node/tests/scheduler/workflows/test_automation_workflow.py @@ -128,6 +128,27 @@ def _apply_octobot_actions_job_result_template( target.should_stop = template.should_stop +def _assert_iteration_job_errors_logged( + mock_logger: mock.Mock, + raised_exception: BaseException, + *, + iteration_failure_count: int, + expect_workflow_interrupted_log: bool = False, +) -> None: + expected_call_count = iteration_failure_count + (1 if expect_workflow_interrupted_log else 0) + assert mock_logger.exception.call_count == expected_call_count + for call_index in range(iteration_failure_count): + logged_exception, publish_error, error_message = mock_logger.exception.call_args_list[call_index][0] + assert isinstance(logged_exception, type(raised_exception)) + assert str(logged_exception) == str(raised_exception) + assert publish_error is True + assert error_message == f"Error while running automation job: {logged_exception}" + if expect_workflow_interrupted_log: + assert "Interrupted workflow: unexpected critical error: " in str( + mock_logger.exception.call_args_list[-1][0][2] + ) + + def _octobot_actions_job_mock_class( *, run_on_result: typing.Callable[[octobot_flow_client.OctoBotActionsJobResult], typing.Any] | None = None, @@ -452,7 +473,20 @@ async def test_execute_automation( assert parsed_output.error == expected_error_status assert parsed_output.error_message == expected_error_message assert run_mock.await_count == expected_run_await_count - mock_logger.exception.assert_called_once() + if expected_run_await_count > 1: + _assert_iteration_job_errors_logged( + mock_logger, + raised_exception, + iteration_failure_count=expected_run_await_count, + expect_workflow_interrupted_log=True, + ) + else: + _assert_iteration_job_errors_logged( + mock_logger, + raised_exception, + iteration_failure_count=1, + expect_workflow_interrupted_log=True, + ) mock_process.assert_not_called() @@ -567,6 +601,40 @@ async def test_execute_iteration_missing_trading_signal_sets_no_trading_signal_e assert parsed_progress_status.error == octobot_flow.enums.ActionErrorStatus.NO_TRADING_SIGNAL.value assert parsed_progress_status.error_message == error_message + @pytest.mark.asyncio + @required_imports + async def test_execute_iteration_logs_and_reraises_when_octobot_actions_job_fails( + self, import_automation_workflow, task + ): + task.content = json.dumps({"params": {"ACTIONS": "trade", "EXCHANGE_FROM": "binance", + "ORDER_SYMBOL": "ETH/BTC", "ORDER_AMOUNT": 1, "ORDER_TYPE": "market", + "ORDER_SIDE": "BUY", "SIMULATED_PORTFOLIO": {"BTC": 1}}}) + inputs = params.AutomationWorkflowInputs(task=task, execution_time=0).to_dict(include_default_values=False) + run_error = RuntimeError("automation failed") + mock_octobot_actions_job_class, _ = _octobot_actions_job_mock_class( + run_side_effect=run_error, + ) + mock_logger = mock.Mock() + automation_workflow = octobot_node.scheduler.workflows.automation_workflow.AutomationWorkflow + + with mock.patch.object( + octobot_flow_client, + "OctoBotActionsJob", + mock_octobot_actions_job_class, + ), mock.patch.object( + automation_workflow, + "get_logger", + return_value=mock_logger, + ): + with pytest.raises(RuntimeError, match="automation failed"): + await automation_workflow.execute_iteration(inputs, None) + + mock_logger.exception.assert_called_once_with( + run_error, + True, + f"Error while running automation job: {run_error}", + ) + @pytest.mark.asyncio @required_imports async def test_execute_iteration_authentication_error_sets_postponed_iteration( @@ -1680,7 +1748,11 @@ async def run_with_retries_then_apply_success(*args, **kwargs) -> None: assert parsed_output == _parse_automation_workflow_output(wf_status.output) assert run_mock.await_count == max_attempts - mock_logger.exception.assert_not_called() + _assert_iteration_job_errors_logged( + mock_logger, + RuntimeError("simulated transient failure"), + iteration_failure_count=max_attempts - 1, + ) @pytest.mark.asyncio @required_imports @@ -1745,8 +1817,12 @@ async def test_execute_automation_execute_iteration_exhausts_retries_when_octobo assert wf_status.output == workflow_result assert run_mock.await_count == max_attempts - mock_logger.exception.assert_called_once() - assert "Interrupted workflow: unexpected critical error: " in str(mock_logger.exception.call_args[0][2]) + _assert_iteration_job_errors_logged( + mock_logger, + RuntimeError("persistent failure"), + iteration_failure_count=max_attempts, + expect_workflow_interrupted_log=True, + ) @pytest.mark.asyncio @required_imports