From 4ccd0fb4ac656f6702c09098b539c562c5e0ded6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Maty=C3=A1=C5=A1=20Jir=C3=A1t?= Date: Sat, 13 Dec 2025 09:28:38 +0100 Subject: [PATCH 1/3] Message truncation --- src/component.py | 24 +++++++++++++++++++----- 1 file changed, 19 insertions(+), 5 deletions(-) diff --git a/src/component.py b/src/component.py index aeb7725..7521c6e 100644 --- a/src/component.py +++ b/src/component.py @@ -21,6 +21,16 @@ from subprocess_runner import SubprocessRunner from venv_manager import VenvManager +MAX_MESSAGE_LENGTH = 3500 +MAX_DETAIL_LENGTH = 50000 + + +def truncate_message(message: str, max_length: int, suffix: str = "... [truncated]") -> str: + """Truncate a message to max_length, adding suffix if truncated.""" + if len(message) <= max_length: + return message + return message[: max_length - len(suffix)] + suffix + class Component(ComponentBase): """ @@ -85,13 +95,16 @@ def execute_script_file(self, file_path: Path): logging.info("Executing script:\n%s", self.script_excerpt(script)) args = ["uv", "run", str(file_path)] SubprocessRunner.run(args, "Script executed successfully.", "Script execution failed.") + except UserException: + raise except Exception as err: _, _, tb = sys.exc_info() stack_len = len(traceback.extract_tb(tb)[4:]) stack_trace_records = self._get_stack_trace_records(*sys.exc_info(), -stack_len, chain=True) stack_cropped = "\n".join(stack_trace_records) - - raise UserException(f"Script failed. {err}. Detail: {stack_cropped}") from err + error_msg = truncate_message(str(err), MAX_MESSAGE_LENGTH) + detail = truncate_message(stack_cropped, MAX_DETAIL_LENGTH) + raise UserException(f"Script failed. {error_msg}", detail) from err @staticmethod def _get_stack_trace_records(etype, value, tb, limit=None, chain=True): @@ -158,10 +171,11 @@ def get_repository_files(self): # this triggers the run method by default and is controlled by the configuration.action parameter comp.execute_action() except UserException as exc: - detail = "" + error_msg = truncate_message(str(exc.args[0]) if exc.args else str(exc), MAX_MESSAGE_LENGTH) + logging.error(error_msg) if len(exc.args) > 1: - detail = exc.args[1] - logging.exception(exc, extra={"full_message": detail}) + detail = truncate_message(str(exc.args[1]), MAX_DETAIL_LENGTH) + logging.error("Error details:", extra={"full_message": detail}) exit(1) except Exception as exc: logging.exception(exc) From 00f81f394e1dc4daf34c162f9c5c83fca13ca790 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Maty=C3=A1=C5=A1=20Jir=C3=A1t?= Date: Sat, 13 Dec 2025 10:18:50 +0100 Subject: [PATCH 2/3] WIP - Buffer flusher --- src/subprocess_runner.py | 71 +++++++++++++++++++++++++++++++++++----- 1 file changed, 62 insertions(+), 9 deletions(-) diff --git a/src/subprocess_runner.py b/src/subprocess_runner.py index 0e1682c..7a90ebd 100644 --- a/src/subprocess_runner.py +++ b/src/subprocess_runner.py @@ -1,9 +1,61 @@ import logging import subprocess import threading +import time +from collections import deque from keboola.component.exceptions import UserException +BUFFER_FLUSH_INTERVAL = 0.5 +MAX_BUFFER_SIZE = 50000 +MAX_STDERR_LINES = 1000 + + +class LogBuffer: + """Thread-safe buffer for batching log messages.""" + + def __init__(self, prefix: str = "", flush_interval: float = BUFFER_FLUSH_INTERVAL): + self._buffer: list[str] = [] + self._buffer_size = 0 + self._lock = threading.Lock() + self._prefix = prefix + self._flush_interval = flush_interval + self._last_flush = time.time() + + def add_line(self, line: str) -> None: + """Add a line to the buffer, flushing if needed.""" + with self._lock: + self._buffer.append(line) + self._buffer_size += len(line) + 1 + if self._should_flush(): + self._flush_unlocked() + + def _should_flush(self) -> bool: + """Check if buffer should be flushed based on size or time.""" + if self._buffer_size >= MAX_BUFFER_SIZE: + return True + if time.time() - self._last_flush >= self._flush_interval: + return True + return False + + def _flush_unlocked(self) -> None: + """Flush buffer to log (must hold lock).""" + if not self._buffer: + return + content = "\n".join(self._buffer) + if self._prefix: + logging.info("%s:\n%s", self._prefix, content) + else: + logging.info(content) + self._buffer = [] + self._buffer_size = 0 + self._last_flush = time.time() + + def flush(self) -> None: + """Flush any remaining content in the buffer.""" + with self._lock: + self._flush_unlocked() + class SubprocessRunner: @staticmethod @@ -20,33 +72,34 @@ def run( text=True, ) - stderr_output = [] + stderr_output: deque[str] = deque(maxlen=MAX_STDERR_LINES) + stdout_buffer = LogBuffer() + stderr_buffer = LogBuffer(prefix="Command stderr") def read_stderr(): if process.stderr: for line in iter(process.stderr.readline, ""): - stderr_output.append(line.strip()) - logging.info("Command stderr: %s", line.strip()) + stripped = line.strip() + stderr_output.append(stripped) + stderr_buffer.add_line(stripped) process.stderr.close() + stderr_buffer.flush() - # Start stderr reader thread stderr_thread = threading.Thread(target=read_stderr) stderr_thread.start() - # Read stdout in main thread - stdout_lines = [] if process.stdout: for line in iter(process.stdout.readline, ""): - stdout_lines.append(line.strip()) - logging.info(line.strip()) + stdout_buffer.add_line(line.strip()) process.stdout.close() + stdout_buffer.flush() stderr_thread.join() process.wait() stderr_str = "\n".join(stderr_output) if stderr_output else "Unknown error." if process.returncode != 0: raise UserException(f"{err_message} Log in event detail.", stderr_str) - elif stderr_str: + elif stderr_output: logging.info("%s Full log in detail.", ok_message, extra={"full_message": stderr_str}) else: logging.info(ok_message) From ac4db2cf1e2a2f2369059837f4f93de5ba70eaa9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Maty=C3=A1=C5=A1=20Jir=C3=A1t?= Date: Sat, 13 Dec 2025 10:27:59 +0100 Subject: [PATCH 3/3] Typo fix --- src/subprocess_runner.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/subprocess_runner.py b/src/subprocess_runner.py index 7a90ebd..f6e68f2 100644 --- a/src/subprocess_runner.py +++ b/src/subprocess_runner.py @@ -61,7 +61,7 @@ class SubprocessRunner: @staticmethod def run( args: list[str], - ok_message: str = "Command finished sucessfully.", + ok_message: str = "Command finished successfully.", err_message: str = "Command failed.", ): logging.debug("Running command: %s", " ".join(args))