Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 24 additions & 1 deletion packages/commons/octobot_commons/os_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,27 @@ def tcp_port_is_free(bind_host: str, port: int) -> bool:
return True


_HOST_WIDE_LISTENER_PROBE_HOSTS = ("0.0.0.0", "127.0.0.1")


def tcp_port_has_listener_on_host(port: int) -> bool:
"""
:return: True if any TCP listener on this machine uses ``port``
"""
try:
for connection in psutil.net_connections(kind="tcp"):
if connection.status != psutil.CONN_LISTEN:
continue
local_address = connection.laddr
if local_address is not None and local_address.port == port:
return True
except (psutil.AccessDenied, psutil.Error):
for probe_host in _HOST_WIDE_LISTENER_PROBE_HOSTS:
if not tcp_port_is_free(probe_host, port):
return True
return False


def find_first_free_listen_port_after_base(
bind_host_for_probe: str,
listen_port_base: int,
Expand All @@ -202,13 +223,15 @@ def find_first_free_listen_port_after_base(
) -> int:
"""
First offset where ``listen_port_base + offset`` is TCP-free on ``bind_host_for_probe``
(optional: require ``paired_listen_port_base + offset`` free as well, same scan step).
and not in use by any listener on this host.
Returns ``listen_port``.
"""
for offset_from_base in range(max_offset):
listen_port = listen_port_base + offset_from_base
if blocklist and listen_port in blocklist:
continue
if tcp_port_has_listener_on_host(listen_port):
continue
if not tcp_port_is_free(bind_host_for_probe, listen_port):
continue
return listen_port
Expand Down
41 changes: 37 additions & 4 deletions packages/commons/tests/test_os_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,27 +45,60 @@ def test_returns_true_after_listener_released(self):
assert os_util.tcp_port_is_free("127.0.0.1", bound_port) is True


class TestTcpPortHasListenerOnHost:
def test_returns_true_when_listener_holds_port(self):
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as listener:
listener.bind(("127.0.0.1", 0))
listener.listen(1)
bound_port = listener.getsockname()[1]
assert os_util.tcp_port_has_listener_on_host(bound_port) is True

def test_returns_false_after_listener_released(self):
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as listener:
listener.bind(("127.0.0.1", 0))
bound_port = listener.getsockname()[1]
assert os_util.tcp_port_has_listener_on_host(bound_port) is False


class TestFindFirstFreeListenPortAfterBase:
def test_returns_base_when_free(self):
with mock.patch.object(os_util, "tcp_port_is_free", return_value=True):
with mock.patch.object(os_util, "tcp_port_has_listener_on_host", return_value=False), mock.patch.object(
os_util, "tcp_port_is_free", return_value=True
):
listen_port = os_util.find_first_free_listen_port_after_base("127.0.0.1", 50000)
assert listen_port == 50000

def test_skips_until_first_free_port(self):
with mock.patch.object(os_util, "tcp_port_is_free", side_effect=[False, False, True]):
with mock.patch.object(os_util, "tcp_port_has_listener_on_host", return_value=False), mock.patch.object(
os_util, "tcp_port_is_free", side_effect=[False, False, True]
):
listen_port = os_util.find_first_free_listen_port_after_base("127.0.0.1", 50100)
assert listen_port == 50102

def test_skips_blocklisted_ports(self):
with mock.patch.object(os_util, "tcp_port_is_free", return_value=True):
with mock.patch.object(os_util, "tcp_port_has_listener_on_host", return_value=False), mock.patch.object(
os_util, "tcp_port_is_free", return_value=True
):
listen_port = os_util.find_first_free_listen_port_after_base(
"127.0.0.1",
50200,
blocklist=[50200],
)
assert listen_port == 50201

def test_skips_port_with_host_wide_listener(self):
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as listener:
listener.bind(("127.0.0.1", 0))
base_port = listener.getsockname()[1]
listener.listen(1)
listen_port = os_util.find_first_free_listen_port_after_base(
"127.0.0.1", base_port, max_offset=5
)
assert listen_port == base_port + 1

def test_raises_when_scan_exhausted(self):
with mock.patch.object(os_util, "tcp_port_is_free", return_value=False):
with mock.patch.object(os_util, "tcp_port_has_listener_on_host", return_value=False), mock.patch.object(
os_util, "tcp_port_is_free", return_value=False
):
with pytest.raises(ValueError, match="No free listen port"):
os_util.find_first_free_listen_port_after_base("127.0.0.1", 50300, max_offset=2)
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
EXPECTED_PROCESS_BOT_DUMP_INTERVAL_SEC = 5.0

# Same as `waiting_time=` in run_octobot_process(...) DSL for this file's tests.
WAITING_TIME_RUN_OCTOBOT_PROCESS_SEC = 2.0
WAITING_TIME_RUN_OCTOBOT_PROCESS_SEC = 2
RECALL_SCHEDULE_TOLERANCE_SEC = 1.5

EXCHANGE_BINANCEUS = "binanceus"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ async def test_run_octobot_process_grid_refresh_four_to_six_orders(
run_dsl = (
"run_octobot_process("
f"{user_folder!r}, {repr(profile_2x2)}, "
"waiting_time=2.0, ping_timeout=30.0)"
f"waiting_time={octobot_process_functional_shared.WAITING_TIME_RUN_OCTOBOT_PROCESS_SEC}, ping_timeout=30.0)"
)
run_action = {
"id": octobot_process_functional_shared.ACTION_ID_RUN_OCTOBOT,
Expand Down Expand Up @@ -231,7 +231,7 @@ async def test_run_octobot_process_grid_refresh_four_to_six_orders(
new_run_dsl = (
"run_octobot_process("
f"{user_folder!r}, {repr(profile_3x3)}, "
"waiting_time=2.0, ping_timeout=30.0)"
f"waiting_time={octobot_process_functional_shared.WAITING_TIME_RUN_OCTOBOT_PROCESS_SEC}, ping_timeout=30.0)"
)
update_config_priority_action = {
"id": ACTION_ID_UPDATE_AUTOMATION_CONFIGURATION,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,17 @@
import asyncio
import json
import os
import pathlib
import shutil
import time
import typing
import uuid

import mock
import octobot.constants as octobot_app_constants
import octobot_commons.configuration as configuration_module
import octobot_commons.constants as common_constants
import octobot_commons.dsl_interpreter as dsl_interpreter
import octobot_commons.process_util as process_util
import octobot_node.constants as octobot_node_constants
import pytest
Expand Down Expand Up @@ -43,7 +46,7 @@ async def test_run_octobot_process_lifecycle_grid_trading(
run_dsl = (
"run_octobot_process("
f"{user_folder!r}, {repr(octobot_process_functional_shared.GRID_BINANCEUS_PROFILE_DATA)}, "
"waiting_time=2.0, ping_timeout=30.0)"
f"waiting_time={octobot_process_functional_shared.WAITING_TIME_RUN_OCTOBOT_PROCESS_SEC}, ping_timeout=30.0)"
)
run_action = {
"id": octobot_process_functional_shared.ACTION_ID_RUN_OCTOBOT,
Expand Down Expand Up @@ -298,3 +301,210 @@ async def test_run_octobot_process_lifecycle_grid_trading(
shutil.rmtree(user_root_guess, ignore_errors=True)
if os.path.isdir(log_folder_guess):
shutil.rmtree(log_folder_guess, ignore_errors=True)


async def test_run_octobot_process_lifecycle_default_config_no_profile_data(
init_action: dict,
monkeypatch: pytest.MonkeyPatch,
):
if not os.path.isfile(os.path.join(os.getcwd(), "start.py")):
pytest.skip("start.py missing: run pytest with cwd set to the OctoBot project root")

non_trading_profile_json = os.path.join(
os.getcwd(),
common_constants.USER_FOLDER,
common_constants.PROFILES_FOLDER,
"non-trading",
common_constants.PROFILE_CONFIG_FILE,
)
if not os.path.isfile(non_trading_profile_json):
pytest.skip("non-trading profile missing under OctoBot user/profiles")

monkeypatch.setenv(octobot_app_constants.ENV_PROCESS_BOT_STATE_DUMP_INTERVAL_SECONDS, "5")

user_folder = f"functionnal_tests/octlife_default_{uuid.uuid4().hex[:12]}"
exchange_auth = [
{
"internal_name": octobot_process_functional_shared.EXCHANGE_BINANCEUS,
"api_key": "functional-default-config-key",
"api_secret": "functional-default-config-secret",
"sandboxed": True,
"exchange_type": common_constants.CONFIG_EXCHANGE_SPOT,
}
]
run_dsl = (
f"run_octobot_process({user_folder!r}, "
f"exchange_auth_data={dsl_interpreter.format_parameter_value(exchange_auth)}, "
f"waiting_time={octobot_process_functional_shared.WAITING_TIME_RUN_OCTOBOT_PROCESS_SEC}, ping_timeout=30.0)"
)
run_action = {
"id": octobot_process_functional_shared.ACTION_ID_RUN_OCTOBOT,
"dsl_script": run_dsl,
"dependencies": [{"action_id": octobot_process_functional_shared.ACTION_ID_INIT}],
}
stop_automation_action = {
"id": octobot_process_functional_shared.ACTION_ID_STOP_AUTOMATION,
"dsl_script": "stop_automation()",
"dependencies": [{"action_id": octobot_process_functional_shared.ACTION_ID_INIT}],
}

popen_calls = {"count": 0}
tracked_spawn_managed = (
octobot_process_functional_shared._make_tracked_spawn_managed_with_forward_terminal_output(
process_util.spawn_managed_subprocess,
popen_calls,
)
)

user_root_guess = os.path.normpath(
os.path.join(
os.getcwd(),
*common_constants.USER_AUTOMATIONS_FOLDER.split("/"),
*user_folder.replace("\\", "/").split("/"),
)
)
log_folder_guess = os.path.normpath(
os.path.join(
os.getcwd(),
*octobot_node_constants.AUTOMATION_LOGS_FOLDER.split("/"),
*[segment for segment in user_folder.replace("\\", "/").split("/") if segment],
)
)

try:
with (
functionnal_tests.mocked_community_authentication(),
functionnal_tests.mocked_community_repository(),
mock.patch.object(
process_util,
"spawn_managed_subprocess",
side_effect=tracked_spawn_managed,
),
):
state = functionnal_tests.automation_state_dict(
functionnal_tests.resolved_actions([init_action])
)
async with octobot_flow.jobs.AutomationJob(state, [], [], {}) as init_job:
await init_job.run()
state = init_job.dump()

async with octobot_flow.jobs.AutomationJob(state, [], [], {}) as job:
job.automation_state.upsert_automation_actions(
functionnal_tests.resolved_actions([run_action])
)
state = job.dump()

deadline = time.monotonic() + octobot_process_functional_shared.GLOBAL_START_TIMEOUT_SEC
inner: typing.Optional[dict] = None
async with octobot_flow.jobs.AutomationJob(state, [], [], {}) as first_poll:
await first_poll.run()
octobot_process_functional_shared._assert_run_octobot_process_recall_scheduled_to_in_dump(
first_poll.dump()
)
first_run = octobot_process_functional_shared._get_action_by_id(
first_poll, octobot_process_functional_shared.ACTION_ID_RUN_OCTOBOT
)
assert first_run is not None
inner = octobot_process_functional_shared._recall_inner_from_dsl_action(first_run)
state = first_poll.dump()
if not (inner and inner.get("init_state_ok") is True):
while time.monotonic() < deadline:
await asyncio.sleep(octobot_process_functional_shared.SLEEP_BETWEEN_JOB_POLLS_SEC)
async with octobot_flow.jobs.AutomationJob(state, [], [], {}) as poll_job:
await poll_job.run()
octobot_process_functional_shared._assert_run_octobot_process_recall_scheduled_to_in_dump(
poll_job.dump()
)
run_details = octobot_process_functional_shared._get_action_by_id(
poll_job, octobot_process_functional_shared.ACTION_ID_RUN_OCTOBOT
)
assert run_details is not None
inner = octobot_process_functional_shared._recall_inner_from_dsl_action(run_details)
if inner and inner.get("init_state_ok") is True:
state = poll_job.dump()
break
state = poll_job.dump()
else:
pytest.fail(
f"OctoBot did not become ready (init_state_ok) within "
f"{octobot_process_functional_shared.GLOBAL_START_TIMEOUT_SEC}s"
)

assert inner is not None
assert inner.get("pid"), "expected child pid in ensure state"
assert popen_calls["count"] >= 1
child_pid = int(inner["pid"])
assert process_util.pid_is_running(child_pid)

user_root = pathlib.Path(inner["user_root"])
assert inner.get("profile_id") == "non-trading"
root_cfg = json.loads((user_root / common_constants.CONFIG_FILE).read_text(encoding="utf-8"))
exchange_cfg = root_cfg[common_constants.CONFIG_EXCHANGES][
octobot_process_functional_shared.EXCHANGE_BINANCEUS
]
exchange_auth_entry = exchange_auth[0]
stored_api_key = exchange_cfg[common_constants.CONFIG_EXCHANGE_KEY]
stored_api_secret = exchange_cfg[common_constants.CONFIG_EXCHANGE_SECRET]
assert configuration_module.decrypt(stored_api_key) == exchange_auth_entry["api_key"]
assert configuration_module.decrypt(stored_api_secret) == exchange_auth_entry["api_secret"]
profile_json_path = (
user_root
/ common_constants.PROFILES_FOLDER
/ "non-trading"
/ common_constants.PROFILE_CONFIG_FILE
)
assert profile_json_path.is_file()

state_path = os.path.normpath(
os.path.join(
inner["user_root"],
octobot_app_constants.PROCESS_BOT_STATE_FILE_NAME,
)
)
assert os.path.isfile(state_path)
with open(state_path, encoding="utf-8") as process_state_file:
file_metadata_payload = json.load(process_state_file)
process_metadata = process_bot_state_import.Metadata.from_dict(
file_metadata_payload["metadata"]
)
assert isinstance(process_metadata, process_bot_state_import.Metadata)
assert isinstance(process_metadata.updated_at, (int, float))
assert isinstance(process_metadata.next_updated_at, (int, float))
assert process_metadata.updated_at <= time.time()
assert process_metadata.next_updated_at >= process_metadata.updated_at
assert abs(
(process_metadata.next_updated_at - process_metadata.updated_at)
- octobot_process_functional_shared.EXPECTED_PROCESS_BOT_DUMP_INTERVAL_SEC
) < 1.0

priority_actions = functionnal_tests.resolved_actions([stop_automation_action])
async with octobot_flow.jobs.AutomationJob(state, priority_actions, [], {}) as stop_phase:
await stop_phase.run()
octobot_process_functional_shared._assert_run_octobot_process_recall_scheduled_to_in_dump(
stop_phase.dump(),
assert_delay_matches_waiting_time=False,
)
assert stop_phase.automation_state.automation.post_actions.stop_automation is True
run_stopped = octobot_process_functional_shared._get_action_by_id(
stop_phase, octobot_process_functional_shared.ACTION_ID_RUN_OCTOBOT
)
assert run_stopped is not None
assert isinstance(run_stopped.result, dict)
assert run_stopped.result.get("status") in ("stopped", "already_stopped")

process_deadline = time.monotonic() + octobot_process_functional_shared.CHILD_STOP_WAIT_SEC
while time.monotonic() < process_deadline:
if not process_util.pid_is_running(child_pid):
break
await asyncio.sleep(0.5)
else:
pytest.fail(
f"expected child pid {child_pid} to be stopped after stop_automation/execution_stop "
f"within {octobot_process_functional_shared.CHILD_STOP_WAIT_SEC}s"
)

finally:
if os.path.isdir(user_root_guess):
shutil.rmtree(user_root_guess, ignore_errors=True)
if os.path.isdir(log_folder_guess):
shutil.rmtree(log_folder_guess, ignore_errors=True)
8 changes: 8 additions & 0 deletions packages/node/octobot_node/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,14 @@
USER_ACTION_WORKFLOW_MAX_ITERATION_RETRIES = int(os.getenv("USER_ACTION_WORKFLOW_MAX_ITERATION_RETRIES", 6))
USER_ACTION_WORKFLOW_BACKOFF_RATE = float(os.getenv("USER_ACTION_WORKFLOW_BACKOFF_RATE", 2))

# run_octobot_process DSL recall interval and init-state timeout for child OctoBot spawns.
RUN_OCTOBOT_PROCESS_WAITING_TIME_SECONDS = float(
os.getenv("RUN_OCTOBOT_PROCESS_WAITING_TIME_SECONDS", 60.0)
)
RUN_OCTOBOT_PROCESS_PING_TIMEOUT_SECONDS = float(
os.getenv("RUN_OCTOBOT_PROCESS_PING_TIMEOUT_SECONDS", 150.0)
)

TASKS_ENCRYPTION_ENV_VARS = [
"TASKS_SERVER_RSA_PRIVATE_KEY",
"TASKS_SERVER_ECDSA_PRIVATE_KEY",
Expand Down
Loading
Loading