Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import typing
import dataclasses

import octobot_commons.dsl_interpreter
import octobot_commons.logging

import octobot_flow.entities.actions.action_details as action_details
import octobot_flow.enums
Expand Down Expand Up @@ -93,7 +95,21 @@ def read_dependency_result(
)
value = dependency_action.result
if dependency.result_path:
value = _navigate_dict_path(value, dependency.result_path)
try:
value = _navigate_dict_path(value, dependency.result_path)
except octobot_flow.errors.ActionDependencyError as err:
dependency_dict = dataclasses.asdict(dependency)
result_keys = (
list(value) if isinstance(value, dict)
else octobot_commons.logging.get_private_minimized_message_if_necessary(value)
)
error_message = (
f"Impossible to resolve dependency {dependency_dict} "
f"result path {dependency.result_path} from action {dependency_action.id} "
f"with action result shape: {result_keys}"
) + f": {err}"
# add as much info as possible to the error message
raise octobot_flow.errors.ActionDependencyError(error_message) from err
return value

def _dependency_values_for_dynamic_dependencies(
Expand Down
16 changes: 11 additions & 5 deletions packages/flow/octobot_flow/jobs/automation_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import octobot_commons.json_util as json_util
import octobot_commons.logging as common_logging
import octobot.community
import octobot.community.wallet_backend.errors as wallet_backend_errors
import octobot_commons.profiles.profile_data as profile_data_import

import octobot_copy.constants as copy_constants
Expand Down Expand Up @@ -413,12 +414,17 @@ async def _emit_trading_signals(
trading_signals_repository = octobot_flow.repositories.community.TradingSignalsRepository.from_community_repository(
maybe_community_repository
)
await trading_signals_repository.insert_trading_signal(
octobot_flow.entities.TradingSignal(
strategy_id=automation.metadata.strategy_id, account=account
try:
await trading_signals_repository.insert_trading_signal(
octobot_flow.entities.TradingSignal(
strategy_id=automation.metadata.strategy_id, account=account
)
)
)

except wallet_backend_errors.WalletNotFoundError as err:
self._logger.error(
f"Skipping trading signal emission: {err}"
)

def _get_actions_to_execute(self) -> tuple[list[octobot_flow.entities.AbstractActionDetails], bool]:
if pending_priority_actions := self._get_pending_priority_actions():
return pending_priority_actions, True
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,11 @@
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library.
import mock
import pytest

import octobot_commons.constants as commons_constants
import octobot_commons.logging

import octobot_flow.entities.actions.action_details as action_details
import octobot_flow.entities.actions.actions_dependencies as actions_dependencies
Expand Down Expand Up @@ -188,6 +190,70 @@ def test_raises_when_dependency_action_errored(self):
with pytest.raises(flow_errors.ActionDependencyError):
resolver.read_dependency_result(dependency)

def test_raises_extended_error_when_result_path_key_missing(self):
dependency_action = action_details.DSLScriptActionDetails(
id="action_source",
dsl_script="exchange_action()",
dependencies=[],
)
dependency_action.complete(result={"nested": {"order_id": "abc-123"}})
dependency = action_details.ActionDependency(
action_id="action_source",
parameter="exchange_order_id",
result_path=["nested", "missing_key"],
)
resolver = actions_dependencies.ActionsDependenciesResolver(_actions_by_id(dependency_action))

with mock.patch.object(
octobot_commons.logging,
"get_private_minimized_message_if_necessary",
) as minimize_message_mock:
with pytest.raises(flow_errors.ActionDependencyError) as error_info:
resolver.read_dependency_result(dependency)

minimize_message_mock.assert_not_called()
error_message = str(error_info.value)
assert "Impossible to resolve dependency" in error_message
assert "action_id': 'action_source'" in error_message
assert "result_path': ['nested', 'missing_key']" in error_message
assert "from action action_source" in error_message
assert "with action result shape: ['nested']" in error_message
assert "missing key: missing_key" in error_message
assert isinstance(error_info.value.__cause__, flow_errors.ActionDependencyError)

def test_raises_extended_error_when_result_path_list_index_out_of_range(self):
dependency_action = action_details.DSLScriptActionDetails(
id="action_source",
dsl_script="exchange_action()",
dependencies=[],
)
dependency_action.complete(result=[{"id": "a"}])
list_result = dependency_action.result
dependency = action_details.ActionDependency(
action_id="action_source",
parameter="exchange_order_id",
result_path=["5"],
)
resolver = actions_dependencies.ActionsDependenciesResolver(_actions_by_id(dependency_action))

with mock.patch.object(
octobot_commons.logging,
"get_private_minimized_message_if_necessary",
return_value="minimized-list-shape",
) as minimize_message_mock:
with pytest.raises(flow_errors.ActionDependencyError) as error_info:
resolver.read_dependency_result(dependency)

minimize_message_mock.assert_called_once_with(list_result)
error_message = str(error_info.value)
assert "Impossible to resolve dependency" in error_message
assert "action_id': 'action_source'" in error_message
assert "result_path': ['5']" in error_message
assert "from action action_source" in error_message
assert "with action result shape: minimized-list-shape" in error_message
assert "list index 5 out of range" in error_message
assert isinstance(error_info.value.__cause__, flow_errors.ActionDependencyError)


class TestActionsDependenciesResolverFilledAllDependencies:
def test_returns_true_when_all_dependencies_completed(self):
Expand Down
93 changes: 93 additions & 0 deletions packages/flow/tests/jobs/test_automation_job.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
import mock
import pytest
import time

import octobot.community.wallet_backend.errors as wallet_backend_errors
import octobot_copy.constants as copy_constants
import octobot_protocol.models as protocol_models

import octobot_flow.entities
import octobot_flow.errors
import octobot_flow.jobs.automation_job as automation_job_module
import octobot_flow.logic.actions
import octobot_flow.logic.configuration
import octobot_flow.repositories.community

from tests.functionnal_tests import auth_details, global_state


STRATEGY_ID = "test-strategy-id"


def _minimal_automation_job() -> automation_job_module.AutomationJob:
automation_state = {
"automation": {
"metadata": {"automation_id": "automation_1"},
"actions_dag": {"actions": []},
}
}
auth_details = octobot_flow.entities.UserAuthentication(wallet_address="0xtest")
return automation_job_module.AutomationJob(automation_state, [], [], auth_details)


def _minimal_copied_account() -> protocol_models.CopiedAccount:
return protocol_models.CopiedAccount(
version=copy_constants.COPIED_ACCOUNT_VERSION,
updated_at=time.time(),
copied_assets=[],
)


class TestValidateInput:
@pytest.mark.asyncio
async def test_not_automations_configured(
self,
global_state: dict,
auth_details: octobot_flow.entities.UserAuthentication,
):
global_state["automation"] = {}
with pytest.raises(octobot_flow.errors.NoAutomationError):
async with automation_job_module.AutomationJob(global_state, [], [], auth_details):
pass


class TestEmitTradingSignals:
@pytest.mark.asyncio
async def test_skips_emission_and_logs_when_wallet_not_found(self):
automation_job = _minimal_automation_job()
automation = automation_job.automation_state.automation
automation.metadata.strategy_id = STRATEGY_ID
automation.exchange_account_elements = octobot_flow.entities.ExchangeAccountElements()
fetched_dependencies = octobot_flow.entities.FetchedDependencies()
community_repository = mock.Mock()
wallet_error = wallet_backend_errors.WalletNotFoundError("Wallet not found")
insert_trading_signal_mock = mock.AsyncMock(side_effect=wallet_error)
copied_account = _minimal_copied_account()

with mock.patch.object(
octobot_flow.repositories.community.TradingSignalsRepository,
"from_community_repository",
return_value=mock.Mock(insert_trading_signal=insert_trading_signal_mock),
), mock.patch.object(
octobot_flow.logic.configuration,
"infer_reference_market",
return_value="USDT",
), mock.patch.object(
octobot_flow.logic.actions,
"reference_exchange_elements_to_account",
return_value=copied_account,
), mock.patch.object(
automation_job._logger,
"error",
) as error_log_mock:
await automation_job._emit_trading_signals(
community_repository,
automation,
fetched_dependencies,
)

insert_trading_signal_mock.assert_awaited_once()
emitted_signal = insert_trading_signal_mock.await_args.args[0]
assert emitted_signal.strategy_id == STRATEGY_ID
assert emitted_signal.account is copied_account
error_log_mock.assert_called_once_with(f"Skipping trading signal emission: {wallet_error}")
15 changes: 0 additions & 15 deletions packages/flow/tests/jobs/test_automations_job.py

This file was deleted.

4 changes: 2 additions & 2 deletions packages/node/octobot_node/scheduler/octobot_flow_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@
import typing
import dataclasses
import json
import logging

import octobot_commons.dataclasses
import octobot_commons.logging

import octobot_node.errors as errors
import octobot_node.scheduler.workflows_util as workflows_util
Expand Down Expand Up @@ -149,7 +149,7 @@ async def run(self) -> None:
self.priority_user_actions
or automation_job.automation_state.automation.actions_dag.get_executable_actions()
)
logging.getLogger(self.__class__.__name__).info(f"Running automation actions: {selected_actions}")
octobot_commons.logging.get_logger(self.__class__.__name__).info(f"Running automation actions: {selected_actions}")
executed_actions = await automation_job.run()
self.after_execution_state = automation_job.automation_state
post_execution_state_dump = automation_job.dump()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,13 +130,22 @@ async def execute_automation(inputs: dict) -> typing.Optional[str]:
)
return json.dumps(output.to_dict(include_default_values=False)) if output else None

@staticmethod
def _should_retry(error: BaseException) -> bool:
return not isinstance(error, (
# workflow stopping errors
errors.WorkflowError,
octobot_flow.errors.ConfigurationError,
))

@staticmethod
@SCHEDULER.INSTANCE.step(
name="execute_iteration",
retries_allowed=True,
interval_seconds=constants.AUTOMATION_WORKFLOW_RETRY_INTERVAL_SECONDS,
max_attempts=constants.AUTOMATION_WORKFLOW_MAX_ITERATION_RETRIES,
backoff_rate=constants.AUTOMATION_WORKFLOW_BACKOFF_RATE,
should_retry=_should_retry,
)
async def execute_iteration(inputs: dict, actions_update: typing.Optional[dict]) -> dict:
"""
Expand Down Expand Up @@ -190,6 +199,12 @@ async def execute_iteration(inputs: dict, actions_update: typing.Optional[dict])
has_next_actions_override = True
next_iteration_description_override = parsed_inputs.task.content
next_iteration_description_metadata_override = parsed_inputs.task.content_metadata
except Exception as err:
# log propagated errors to also associate them to the automation's error tracking
AutomationWorkflow.get_logger(parsed_inputs).exception(
err, True, f"Error while running automation job: {err}"
)
raise
if result.processed_actions:
if latest_step := AutomationWorkflow._get_actions_summary(result.processed_actions, minimal=True):
executed_step = latest_step
Expand Down
Loading
Loading