diff --git a/bec_lib/bec_lib/file_utils.py b/bec_lib/bec_lib/file_utils.py index 44a1cdee6..9d3687128 100644 --- a/bec_lib/bec_lib/file_utils.py +++ b/bec_lib/bec_lib/file_utils.py @@ -3,6 +3,7 @@ from __future__ import annotations import os +import time import warnings from typing import TYPE_CHECKING @@ -163,13 +164,19 @@ def compile_file_components( return (file_path_component, file_extension) -def get_full_path(scan_status_msg: ScanStatusMessage, name: str, create_dir: bool = True) -> str: +def get_full_path( + scan_status_msg: ScanStatusMessage, + name: str, + create_dir: bool = True, + log_if_dir_does_not_exist: bool = True, +) -> str: """Get the full file path for a given scan status message and additional name. Args: scan_status_msg (ScanStatusMessage): Scan status message name (str): Additional name (i.e. device name) to add to the file path create_dir (bool, optional): Create the directory if it does not exist. Defaults to True. + log_if_dir_does_not_exist (bool, optional): Log a warning if the directory does not exist. Defaults to True. """ if name == "": @@ -192,10 +199,33 @@ def get_full_path(scan_status_msg: ScanStatusMessage, name: str, create_dir: boo # Compile full file path full_path = f"{file_base_path}_{name}.{file_extension}" if create_dir: + if log_if_dir_does_not_exist and not os.path.exists(os.path.dirname(full_path)): + logger.warning(f"Directory {os.path.dirname(full_path)} does not exist. Creating it.") os.makedirs(os.path.dirname(full_path), exist_ok=True) return full_path +def wait_for_directory(path: str, timeout: float = 10.0, interval: float = 0.1) -> None: + """ + Wait for a directory to be created. + + Args: + path (str): Path to the directory to wait for. + timeout (float, optional): Maximum time to wait in seconds. Defaults to 10. + interval (float, optional): Time to wait between checks in seconds. Defaults to 0.1. + + Raises: + FileWriterError: If the timeout is reached before the directory is created. + + """ + deadline = time.monotonic() + timeout + while time.monotonic() < deadline: + if os.path.isdir(path): + return + time.sleep(interval) + raise FileWriterError(f"Timeout reached while waiting for directory {path} to be created.") + + class FileWriter: """FileWriter for creating file paths and directories for services and devices.""" diff --git a/bec_lib/tests/test_file_utils.py b/bec_lib/tests/test_file_utils.py index cbb8b908e..07e339975 100644 --- a/bec_lib/tests/test_file_utils.py +++ b/bec_lib/tests/test_file_utils.py @@ -2,6 +2,8 @@ # pylint: skip-file import os +import threading +import time from unittest import mock import pytest @@ -15,6 +17,7 @@ LogWriter, compile_file_components, get_full_path, + wait_for_directory, ) from bec_lib.messages import ScanStatusMessage from bec_lib.tests.utils import ConnectorMock @@ -259,6 +262,30 @@ def test_compile_file_components_valid_paths(kwargs, expected_path, description) assert file_path == expected_path, description +def test_wait_for_directory_returns_when_directory_appears(tmpdir): + """wait_for_directory should stop polling once the directory exists.""" + dir_path = tmpdir.join("created-later") + + def _create_directory(): + time.sleep(0.02) + dir_path.mkdir() + + creator = threading.Thread(target=_create_directory) + creator.start() + try: + wait_for_directory(str(dir_path), timeout=1.0, interval=0.01) + finally: + creator.join() + + +def test_wait_for_directory_raises_on_timeout(tmpdir): + """wait_for_directory should raise when the directory never appears.""" + dir_path = tmpdir.join("never-created") + + with pytest.raises(FileWriterError, match="Timeout reached while waiting for directory"): + wait_for_directory(str(dir_path), timeout=0.05, interval=0.01) + + @pytest.mark.parametrize( "scan_info", [ diff --git a/bec_server/bec_server/file_writer/file_writer_manager.py b/bec_server/bec_server/file_writer/file_writer_manager.py index 0fc61a44c..4a5349e59 100644 --- a/bec_server/bec_server/file_writer/file_writer_manager.py +++ b/bec_server/bec_server/file_writer/file_writer_manager.py @@ -251,7 +251,7 @@ def update_scan_storage_with_status(self, msg: messages.ScanStatusMessage) -> No if status == "open" and not scan_storage.start_time: scan_storage.start_time = msg.content.get("timestamp") scan_storage.async_writer = AsyncWriter( - get_full_path(scan_status_msg=msg, name="master"), + get_full_path(scan_status_msg=msg, name="master", log_if_dir_does_not_exist=False), scan_id=scan_id, scan_number=msg.scan_number, connector=self.connector, @@ -391,7 +391,11 @@ def write_file(self, scan_id: str) -> None: start_time = time.time() try: - file_path = get_full_path(scan_status_msg=storage.status_msg, name=file_suffix) + file_path = get_full_path( + scan_status_msg=storage.status_msg, + name=file_suffix, + log_if_dir_does_not_exist=False, + ) successful = True # If we've already written device data, we need to append to the file