From d7b74da72d33914d7ee48e5623ca176259ed3bc6 Mon Sep 17 00:00:00 2001 From: Dylan Huang Date: Tue, 30 Sep 2025 16:10:40 -0700 Subject: [PATCH 1/7] ran "curl -fsSL https://elastic.co/start-local | sh -s -- --esonly" - from https://github.com/elastic/start-local?tab=readme-ov-file#install-only-elasticsearch --- elastic-start-local/config/telemetry.yml | 2 + elastic-start-local/docker-compose.yml | 36 ++++++++++++++++ elastic-start-local/start.sh | 47 +++++++++++++++++++++ elastic-start-local/stop.sh | 8 ++++ elastic-start-local/uninstall.sh | 54 ++++++++++++++++++++++++ 5 files changed, 147 insertions(+) create mode 100644 elastic-start-local/config/telemetry.yml create mode 100644 elastic-start-local/docker-compose.yml create mode 100755 elastic-start-local/start.sh create mode 100755 elastic-start-local/stop.sh create mode 100755 elastic-start-local/uninstall.sh diff --git a/elastic-start-local/config/telemetry.yml b/elastic-start-local/config/telemetry.yml new file mode 100644 index 00000000..d5e02584 --- /dev/null +++ b/elastic-start-local/config/telemetry.yml @@ -0,0 +1,2 @@ +start-local: + version: 0.11.0 diff --git a/elastic-start-local/docker-compose.yml b/elastic-start-local/docker-compose.yml new file mode 100644 index 00000000..bed681cb --- /dev/null +++ b/elastic-start-local/docker-compose.yml @@ -0,0 +1,36 @@ +services: + elasticsearch: + image: docker.elastic.co/elasticsearch/elasticsearch:${ES_LOCAL_VERSION} + container_name: ${ES_LOCAL_CONTAINER_NAME} + volumes: + - dev-elasticsearch:/usr/share/elasticsearch/data + ports: + - 127.0.0.1:${ES_LOCAL_PORT}:9200 + environment: + - discovery.type=single-node + - ELASTIC_PASSWORD=${ES_LOCAL_PASSWORD} + - xpack.security.enabled=true + - xpack.security.http.ssl.enabled=false + - xpack.license.self_generated.type=trial + - xpack.ml.use_auto_machine_memory_percent=true + - ES_JAVA_OPTS=${ES_LOCAL_JAVA_OPTS} + - cluster.routing.allocation.disk.watermark.low=${ES_LOCAL_DISK_SPACE_REQUIRED} + - cluster.routing.allocation.disk.watermark.high=${ES_LOCAL_DISK_SPACE_REQUIRED} + - cluster.routing.allocation.disk.watermark.flood_stage=${ES_LOCAL_DISK_SPACE_REQUIRED} + - "_JAVA_OPTIONS=-XX:UseSVE=0" + ulimits: + memlock: + soft: -1 + hard: -1 + healthcheck: + test: + [ + "CMD-SHELL", + "curl --output /dev/null --silent --head --fail -u elastic:${ES_LOCAL_PASSWORD} http://elasticsearch:9200", + ] + interval: 10s + timeout: 10s + retries: 30 + +volumes: + dev-elasticsearch: diff --git a/elastic-start-local/start.sh b/elastic-start-local/start.sh new file mode 100755 index 00000000..722f2594 --- /dev/null +++ b/elastic-start-local/start.sh @@ -0,0 +1,47 @@ +#!/bin/sh +# Start script for start-local +# More information: https://github.com/elastic/start-local +set -eu + +SCRIPT_DIR="$(cd "$(dirname "$0")" && pwd)" +cd "${SCRIPT_DIR}" +today=$(date +%s) +. ./.env +# Check disk space +available_gb=$(($(df -k / | awk 'NR==2 {print $4}') / 1024 / 1024)) +required=$(echo "${ES_LOCAL_DISK_SPACE_REQUIRED}" | grep -Eo '[0-9]+') +if [ "$available_gb" -lt "$required" ]; then + echo "----------------------------------------------------------------------------" + echo "WARNING: Disk space is below the ${required} GB limit. Elasticsearch will be" + echo "executed in read-only mode. Please free up disk space to resolve this issue." + echo "----------------------------------------------------------------------------" + echo "Press ENTER to confirm." + # shellcheck disable=SC2034 + read -r line +fi +if [ -z "${ES_LOCAL_LICENSE:-}" ] && [ "$today" -gt 1761865480 ]; then + echo "---------------------------------------------------------------------" + echo "The one-month trial period has expired. You can continue using the" + echo "Free and open Basic license or request to extend the trial for" + echo "another 30 days using this form:" + echo "https://www.elastic.co/trialextension" + echo "---------------------------------------------------------------------" + echo "For more info about the license: https://www.elastic.co/subscriptions" + echo + echo "Updating the license..." + docker compose up --wait elasticsearch >/dev/null 2>&1 + result=$(curl -s -X POST "${ES_LOCAL_URL}/_license/start_basic?acknowledge=true" -H "Authorization: ApiKey ${ES_LOCAL_API_KEY}" -o /dev/null -w '%{http_code}\n') + if [ "$result" = "200" ]; then + echo "✅ Basic license successfully installed" + echo "ES_LOCAL_LICENSE=basic" >> .env + else + echo "Error: I cannot update the license" + result=$(curl -s -X GET "${ES_LOCAL_URL}" -H "Authorization: ApiKey ${ES_LOCAL_API_KEY}" -o /dev/null -w '%{http_code}\n') + if [ "$result" != "200" ]; then + echo "Elasticsearch is not running." + fi + exit 1 + fi + echo +fi +docker compose up --wait diff --git a/elastic-start-local/stop.sh b/elastic-start-local/stop.sh new file mode 100755 index 00000000..d0918615 --- /dev/null +++ b/elastic-start-local/stop.sh @@ -0,0 +1,8 @@ +#!/bin/sh +# Stop script for start-local +# More information: https://github.com/elastic/start-local +set -eu + +SCRIPT_DIR="$(cd "$(dirname "$0")" && pwd)" +cd "${SCRIPT_DIR}" +docker compose stop diff --git a/elastic-start-local/uninstall.sh b/elastic-start-local/uninstall.sh new file mode 100755 index 00000000..95bf375c --- /dev/null +++ b/elastic-start-local/uninstall.sh @@ -0,0 +1,54 @@ +#!/bin/sh +# Uninstall script for start-local +# More information: https://github.com/elastic/start-local +set -eu + +SCRIPT_DIR="$(cd "$(dirname "$0")" && pwd)" + +ask_confirmation() { + echo "Do you confirm? (yes/no)" + read -r answer + case "$answer" in + yes|y|Y|Yes|YES) + return 0 # true + ;; + no|n|N|No|NO) + return 1 # false + ;; + *) + echo "Please answer yes or no." + ask_confirmation # Ask again if the input is invalid + ;; + esac +} + +cd "${SCRIPT_DIR}" +if [ ! -e "docker-compose.yml" ]; then + echo "Error: I cannot find the docker-compose.yml file" + echo "I cannot uninstall start-local." +fi +if [ ! -e ".env" ]; then + echo "Error: I cannot find the .env file" + echo "I cannot uninstall start-local." +fi +echo "This script will uninstall start-local." +echo "All data will be deleted and cannot be recovered." +if ask_confirmation; then + docker compose rm -fsv + docker compose down -v + rm docker-compose.yml .env uninstall.sh start.sh stop.sh config/telemetry.yml + if [ -z "$(ls -A config)" ]; then + rm -d config + fi + echo + echo "Do you want to remove the following Docker images?" + echo "- docker.elastic.co/elasticsearch/elasticsearch:9.1.4-arm64" + if ask_confirmation; then + if docker rmi "docker.elastic.co/elasticsearch/elasticsearch:9.1.4-arm64" >/dev/null 2>&1; then + echo "Image docker.elastic.co/elasticsearch/elasticsearch:9.1.4-arm64 removed successfully" + else + echo "Failed to remove image docker.elastic.co/elasticsearch/elasticsearch:9.1.4-arm64. It might be in use." + fi + fi + echo "Start-local successfully removed" +fi From b99fcc5ec17bf33984c7fb7113f983bc1b388f7c Mon Sep 17 00:00:00 2001 From: Dylan Huang Date: Tue, 30 Sep 2025 16:15:03 -0700 Subject: [PATCH 2/7] actually don't check-in elastic-start-local --- .gitignore | 2 + elastic-start-local/config/telemetry.yml | 2 - elastic-start-local/docker-compose.yml | 36 ---------------- elastic-start-local/start.sh | 47 --------------------- elastic-start-local/stop.sh | 8 ---- elastic-start-local/uninstall.sh | 54 ------------------------ 6 files changed, 2 insertions(+), 147 deletions(-) delete mode 100644 elastic-start-local/config/telemetry.yml delete mode 100644 elastic-start-local/docker-compose.yml delete mode 100755 elastic-start-local/start.sh delete mode 100755 elastic-start-local/stop.sh delete mode 100755 elastic-start-local/uninstall.sh diff --git a/.gitignore b/.gitignore index 42d2b831..352d0b55 100644 --- a/.gitignore +++ b/.gitignore @@ -239,3 +239,5 @@ package-lock.json package.json tau2-bench *.err + +elastic-start-local/ diff --git a/elastic-start-local/config/telemetry.yml b/elastic-start-local/config/telemetry.yml deleted file mode 100644 index d5e02584..00000000 --- a/elastic-start-local/config/telemetry.yml +++ /dev/null @@ -1,2 +0,0 @@ -start-local: - version: 0.11.0 diff --git a/elastic-start-local/docker-compose.yml b/elastic-start-local/docker-compose.yml deleted file mode 100644 index bed681cb..00000000 --- a/elastic-start-local/docker-compose.yml +++ /dev/null @@ -1,36 +0,0 @@ -services: - elasticsearch: - image: docker.elastic.co/elasticsearch/elasticsearch:${ES_LOCAL_VERSION} - container_name: ${ES_LOCAL_CONTAINER_NAME} - volumes: - - dev-elasticsearch:/usr/share/elasticsearch/data - ports: - - 127.0.0.1:${ES_LOCAL_PORT}:9200 - environment: - - discovery.type=single-node - - ELASTIC_PASSWORD=${ES_LOCAL_PASSWORD} - - xpack.security.enabled=true - - xpack.security.http.ssl.enabled=false - - xpack.license.self_generated.type=trial - - xpack.ml.use_auto_machine_memory_percent=true - - ES_JAVA_OPTS=${ES_LOCAL_JAVA_OPTS} - - cluster.routing.allocation.disk.watermark.low=${ES_LOCAL_DISK_SPACE_REQUIRED} - - cluster.routing.allocation.disk.watermark.high=${ES_LOCAL_DISK_SPACE_REQUIRED} - - cluster.routing.allocation.disk.watermark.flood_stage=${ES_LOCAL_DISK_SPACE_REQUIRED} - - "_JAVA_OPTIONS=-XX:UseSVE=0" - ulimits: - memlock: - soft: -1 - hard: -1 - healthcheck: - test: - [ - "CMD-SHELL", - "curl --output /dev/null --silent --head --fail -u elastic:${ES_LOCAL_PASSWORD} http://elasticsearch:9200", - ] - interval: 10s - timeout: 10s - retries: 30 - -volumes: - dev-elasticsearch: diff --git a/elastic-start-local/start.sh b/elastic-start-local/start.sh deleted file mode 100755 index 722f2594..00000000 --- a/elastic-start-local/start.sh +++ /dev/null @@ -1,47 +0,0 @@ -#!/bin/sh -# Start script for start-local -# More information: https://github.com/elastic/start-local -set -eu - -SCRIPT_DIR="$(cd "$(dirname "$0")" && pwd)" -cd "${SCRIPT_DIR}" -today=$(date +%s) -. ./.env -# Check disk space -available_gb=$(($(df -k / | awk 'NR==2 {print $4}') / 1024 / 1024)) -required=$(echo "${ES_LOCAL_DISK_SPACE_REQUIRED}" | grep -Eo '[0-9]+') -if [ "$available_gb" -lt "$required" ]; then - echo "----------------------------------------------------------------------------" - echo "WARNING: Disk space is below the ${required} GB limit. Elasticsearch will be" - echo "executed in read-only mode. Please free up disk space to resolve this issue." - echo "----------------------------------------------------------------------------" - echo "Press ENTER to confirm." - # shellcheck disable=SC2034 - read -r line -fi -if [ -z "${ES_LOCAL_LICENSE:-}" ] && [ "$today" -gt 1761865480 ]; then - echo "---------------------------------------------------------------------" - echo "The one-month trial period has expired. You can continue using the" - echo "Free and open Basic license or request to extend the trial for" - echo "another 30 days using this form:" - echo "https://www.elastic.co/trialextension" - echo "---------------------------------------------------------------------" - echo "For more info about the license: https://www.elastic.co/subscriptions" - echo - echo "Updating the license..." - docker compose up --wait elasticsearch >/dev/null 2>&1 - result=$(curl -s -X POST "${ES_LOCAL_URL}/_license/start_basic?acknowledge=true" -H "Authorization: ApiKey ${ES_LOCAL_API_KEY}" -o /dev/null -w '%{http_code}\n') - if [ "$result" = "200" ]; then - echo "✅ Basic license successfully installed" - echo "ES_LOCAL_LICENSE=basic" >> .env - else - echo "Error: I cannot update the license" - result=$(curl -s -X GET "${ES_LOCAL_URL}" -H "Authorization: ApiKey ${ES_LOCAL_API_KEY}" -o /dev/null -w '%{http_code}\n') - if [ "$result" != "200" ]; then - echo "Elasticsearch is not running." - fi - exit 1 - fi - echo -fi -docker compose up --wait diff --git a/elastic-start-local/stop.sh b/elastic-start-local/stop.sh deleted file mode 100755 index d0918615..00000000 --- a/elastic-start-local/stop.sh +++ /dev/null @@ -1,8 +0,0 @@ -#!/bin/sh -# Stop script for start-local -# More information: https://github.com/elastic/start-local -set -eu - -SCRIPT_DIR="$(cd "$(dirname "$0")" && pwd)" -cd "${SCRIPT_DIR}" -docker compose stop diff --git a/elastic-start-local/uninstall.sh b/elastic-start-local/uninstall.sh deleted file mode 100755 index 95bf375c..00000000 --- a/elastic-start-local/uninstall.sh +++ /dev/null @@ -1,54 +0,0 @@ -#!/bin/sh -# Uninstall script for start-local -# More information: https://github.com/elastic/start-local -set -eu - -SCRIPT_DIR="$(cd "$(dirname "$0")" && pwd)" - -ask_confirmation() { - echo "Do you confirm? (yes/no)" - read -r answer - case "$answer" in - yes|y|Y|Yes|YES) - return 0 # true - ;; - no|n|N|No|NO) - return 1 # false - ;; - *) - echo "Please answer yes or no." - ask_confirmation # Ask again if the input is invalid - ;; - esac -} - -cd "${SCRIPT_DIR}" -if [ ! -e "docker-compose.yml" ]; then - echo "Error: I cannot find the docker-compose.yml file" - echo "I cannot uninstall start-local." -fi -if [ ! -e ".env" ]; then - echo "Error: I cannot find the .env file" - echo "I cannot uninstall start-local." -fi -echo "This script will uninstall start-local." -echo "All data will be deleted and cannot be recovered." -if ask_confirmation; then - docker compose rm -fsv - docker compose down -v - rm docker-compose.yml .env uninstall.sh start.sh stop.sh config/telemetry.yml - if [ -z "$(ls -A config)" ]; then - rm -d config - fi - echo - echo "Do you want to remove the following Docker images?" - echo "- docker.elastic.co/elasticsearch/elasticsearch:9.1.4-arm64" - if ask_confirmation; then - if docker rmi "docker.elastic.co/elasticsearch/elasticsearch:9.1.4-arm64" >/dev/null 2>&1; then - echo "Image docker.elastic.co/elasticsearch/elasticsearch:9.1.4-arm64 removed successfully" - else - echo "Failed to remove image docker.elastic.co/elasticsearch/elasticsearch:9.1.4-arm64. It might be in use." - fi - fi - echo "Start-local successfully removed" -fi From 9d66e5f47020a1c978e0153378c9ca7f27ab3096 Mon Sep 17 00:00:00 2001 From: Dylan Huang Date: Tue, 30 Sep 2025 19:56:05 -0700 Subject: [PATCH 3/7] Add Elasticsearch setup and configuration to RemoteRolloutProcessor - Introduced setup method in RemoteRolloutProcessor to initialize Elasticsearch if not disabled. - Added ElasticSearchConfig model for managing Elasticsearch configuration. - Implemented logic to parse environment variables from a .env file and start Elasticsearch if necessary. - Updated evaluation_test to call rollout_processor.setup() for proper initialization. - Modified RolloutProcessor to include a setup method for potential overrides in subclasses. --- .gitignore | 2 - eval_protocol/pytest/evaluation_test.py | 2 + .../pytest/remote_rollout_processor.py | 64 +++++++++- eval_protocol/pytest/rollout_processor.py | 4 + .../types/remote_rollout_processor.py | 10 ++ eval_protocol/utils/subprocess_utils.py | 118 ++++++++++++++++++ 6 files changed, 197 insertions(+), 3 deletions(-) create mode 100644 eval_protocol/utils/subprocess_utils.py diff --git a/.gitignore b/.gitignore index 352d0b55..42d2b831 100644 --- a/.gitignore +++ b/.gitignore @@ -239,5 +239,3 @@ package-lock.json package.json tau2-bench *.err - -elastic-start-local/ diff --git a/eval_protocol/pytest/evaluation_test.py b/eval_protocol/pytest/evaluation_test.py index 8a468843..4b475738 100644 --- a/eval_protocol/pytest/evaluation_test.py +++ b/eval_protocol/pytest/evaluation_test.py @@ -367,6 +367,8 @@ def _log_eval_error(status: Status, rows: list[EvaluationRow] | None, passed: bo exception_handler_config=exception_handler_config, ) + rollout_processor.setup() + async def execute_run(run_idx: int, config: RolloutProcessorConfig): nonlocal all_results diff --git a/eval_protocol/pytest/remote_rollout_processor.py b/eval_protocol/pytest/remote_rollout_processor.py index e7369ce8..d573e43b 100644 --- a/eval_protocol/pytest/remote_rollout_processor.py +++ b/eval_protocol/pytest/remote_rollout_processor.py @@ -1,16 +1,23 @@ import asyncio +import subprocess import time from typing import Any, Dict, List, Optional, Callable +from dotenv import load_dotenv import requests +from eval_protocol.directory_utils import find_eval_protocol_dir from eval_protocol.models import EvaluationRow, Status from eval_protocol.data_loader.dynamic_data_loader import DynamicDataLoader -from eval_protocol.types.remote_rollout_processor import InitRequest, RolloutMetadata +from eval_protocol.types.remote_rollout_processor import ElasticSearchConfig, InitRequest, RolloutMetadata from .rollout_processor import RolloutProcessor from .types import RolloutProcessorConfig +import logging + import os +logger = logging.getLogger(__name__) + class RemoteRolloutProcessor(RolloutProcessor): """ @@ -27,6 +34,8 @@ def __init__( poll_interval: float = 1.0, timeout_seconds: float = 120.0, output_data_loader: Callable[[str], DynamicDataLoader], + disable_elastic_search: bool = False, + elastic_search_config: Optional[ElasticSearchConfig] = None, ): # Prefer constructor-provided configuration. These can be overridden via # config.kwargs at call time for backward compatibility. @@ -37,6 +46,58 @@ def __init__( self._poll_interval = poll_interval self._timeout_seconds = timeout_seconds self._output_data_loader = output_data_loader + self._disable_elastic_search = disable_elastic_search + self._elastic_search_config = elastic_search_config + + def setup(self) -> None: + if self._disable_elastic_search: + logger.info("Elasticsearch is disabled, skipping setup") + return + logger.info("Setting up Elasticsearch") + self._elastic_search_config = self._setup_elastic_search() + logger.info("Elasticsearch setup complete") + + def _parse_elastic_env_file(self, env_file_path: str) -> ElasticSearchConfig: + """Parse ES_LOCAL_API_KEY and ES_LOCAL_URL from .env file.""" + loaded = load_dotenv(env_file_path) + if not loaded: + raise RuntimeError("Failed to load .env file") + api_key = os.getenv("ES_LOCAL_API_KEY") + url = os.getenv("ES_LOCAL_URL") + if not url or not api_key: + raise RuntimeError("Failed to parse ES_LOCAL_API_KEY and ES_LOCAL_URL from .env file") + return ElasticSearchConfig(url=url, api_key=api_key) + + def _setup_elastic_search(self) -> ElasticSearchConfig: + eval_protocol_dir = find_eval_protocol_dir() + elastic_start_local_dir = os.path.join(eval_protocol_dir, "elastic-start-local") + env_file_path = os.path.join(elastic_start_local_dir, ".env") + + # if elastic-start-local directory exists, return the config + if os.path.exists(elastic_start_local_dir): + # run start.sh in the elastic-start-local directory + from eval_protocol.utils.subprocess_utils import run_script_and_wait + + run_script_and_wait( + script_name="start.sh", + working_directory=elastic_start_local_dir, + inherit_stdout=True, + ) + return self._parse_elastic_env_file(env_file_path) + + # run Elasticsearch start-local script: "curl -fsSL https://elastic.co/start-local | sh -s -- --esonly" + process = subprocess.Popen( + ["sh", "-c", "curl -fsSL https://elastic.co/start-local | sh -s -- --esonly"], + cwd=eval_protocol_dir, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + text=True, + ) + returncode = process.wait() + if returncode != 0: + raise RuntimeError("Failed to start Elasticsearch") + + return self._parse_elastic_env_file(env_file_path) def __call__(self, rows: List[EvaluationRow], config: RolloutProcessorConfig) -> List[asyncio.Task[EvaluationRow]]: tasks: List[asyncio.Task[EvaluationRow]] = [] @@ -119,6 +180,7 @@ async def _process_row(row: EvaluationRow) -> EvaluationRow: tools=row.tools, metadata=meta, model_base_url=model_base_url, + elastic_search_config=self._elastic_search_config, ) # Fire-and-poll diff --git a/eval_protocol/pytest/rollout_processor.py b/eval_protocol/pytest/rollout_processor.py index 313f1768..95fbfa1b 100644 --- a/eval_protocol/pytest/rollout_processor.py +++ b/eval_protocol/pytest/rollout_processor.py @@ -10,6 +10,10 @@ class RolloutProcessor(ABC): Abstract base class for all rollout processor strategies. """ + def setup(self) -> None: + """Setup resources. Override in subclasses if setup is needed. Executed once per invocation.""" + pass + @abstractmethod def __call__(self, rows: list[EvaluationRow], config: RolloutProcessorConfig) -> list[asyncio.Task[EvaluationRow]]: """Process evaluation rows and return async tasks. Must be implemented by subclasses.""" diff --git a/eval_protocol/types/remote_rollout_processor.py b/eval_protocol/types/remote_rollout_processor.py index 817d1c3f..422b8401 100644 --- a/eval_protocol/types/remote_rollout_processor.py +++ b/eval_protocol/types/remote_rollout_processor.py @@ -7,6 +7,15 @@ from eval_protocol.models import Message, Status +class ElasticSearchConfig(BaseModel): + """ + Configuration for Elasticsearch. + """ + + url: str + api_key: str + + class RolloutMetadata(BaseModel): """Metadata for rollout execution.""" @@ -21,6 +30,7 @@ class InitRequest(BaseModel): """Request model for POST /init endpoint.""" model: str + elastic_search_config: Optional[ElasticSearchConfig] = None messages: Optional[List[Message]] = None tools: Optional[List[Dict[str, Any]]] = None diff --git a/eval_protocol/utils/subprocess_utils.py b/eval_protocol/utils/subprocess_utils.py new file mode 100644 index 00000000..277eaf87 --- /dev/null +++ b/eval_protocol/utils/subprocess_utils.py @@ -0,0 +1,118 @@ +"""Cross-platform subprocess utilities for running scripts and commands.""" + +import os +import platform +import subprocess +from typing import Optional + + +def run_script_cross_platform( + script_name: str, + working_directory: str, + capture_output: bool = True, + print_output: bool = False, + inherit_stdout: bool = False, +) -> subprocess.Popen: + """ + Run a script in a cross-platform manner. + + Args: + script_name: Name of the script to run (e.g., "start.sh") + working_directory: Directory to run the script in + capture_output: Whether to capture stdout/stderr + print_output: Whether to print output in real-time + inherit_stdout: Whether to inherit stdout from parent process + + Returns: + subprocess.Popen object for the running process + + Raises: + RuntimeError: If the script fails to start or execute + """ + script_path = os.path.join(working_directory, script_name) + + if not os.path.exists(script_path): + raise FileNotFoundError(f"Script not found: {script_path}") + + # Determine stdout handling + if inherit_stdout: + stdout = None # Inherit from parent process + stderr = subprocess.STDOUT # Still capture stderr + elif capture_output: + stdout = subprocess.PIPE + stderr = subprocess.STDOUT + else: + stdout = None + stderr = None + + if platform.system() == "Windows": + # On Windows, use cmd.exe to run the script + cmd = ["cmd.exe", "/c", script_name] + process = subprocess.Popen( + cmd, + cwd=working_directory, + stdout=stdout, + stderr=stderr, + text=True, + ) + else: + # On Unix-like systems, make executable and run with proper shebang + os.chmod(script_path, 0o755) + + # Use the full path to the script with shell=True + process = subprocess.Popen( + script_path, + stdout=stdout, + stderr=stderr, + text=True, + shell=True, + ) + + # Print output in real-time if requested + if print_output and capture_output and process.stdout: + for line in process.stdout: + print(line, end="") + + return process + + +def run_script_and_wait( + script_name: str, + working_directory: str, + print_output: bool = False, + inherit_stdout: bool = False, + timeout: Optional[int] = None, +) -> int: + """ + Run a script and wait for it to complete. + + Args: + script_name: Name of the script to run + working_directory: Directory to run the script in + print_output: Whether to print output in real-time + inherit_stdout: Whether to inherit stdout from parent process + timeout: Maximum time to wait for the script to complete + + Returns: + Return code of the script + + Raises: + RuntimeError: If the script fails to execute + subprocess.TimeoutExpired: If the script times out + """ + process = run_script_cross_platform( + script_name=script_name, + working_directory=working_directory, + capture_output=print_output and not inherit_stdout, + print_output=print_output, + inherit_stdout=inherit_stdout, + ) + + try: + returncode = process.wait(timeout=timeout) + if returncode != 0: + raise RuntimeError(f"Script '{script_name}' failed with return code {returncode}") + return returncode + except subprocess.TimeoutExpired: + process.kill() + raise From 8b7e9cf721861f6e6dfb9b5daaca10e076d43aa4 Mon Sep 17 00:00:00 2001 From: Dylan Huang Date: Wed, 1 Oct 2025 09:35:13 -0700 Subject: [PATCH 4/7] Refactor Elasticsearch setup in RemoteRolloutProcessor - Removed the previous logic for parsing environment variables and starting Elasticsearch directly. - Introduced a dedicated ElasticsearchSetup module to handle Elasticsearch initialization. - Updated the _setup_elastic_search method to utilize the new setup module for improved clarity and maintainability. --- eval_protocol/pytest/elasticsearch_setup.py | 127 ++++++++++++++++++ .../pytest/remote_rollout_processor.py | 47 +------ 2 files changed, 131 insertions(+), 43 deletions(-) create mode 100644 eval_protocol/pytest/elasticsearch_setup.py diff --git a/eval_protocol/pytest/elasticsearch_setup.py b/eval_protocol/pytest/elasticsearch_setup.py new file mode 100644 index 00000000..57f833fb --- /dev/null +++ b/eval_protocol/pytest/elasticsearch_setup.py @@ -0,0 +1,127 @@ +import os +import subprocess +import tempfile +import logging +from typing import Optional + +from dotenv import load_dotenv +from eval_protocol.directory_utils import find_eval_protocol_dir +from eval_protocol.types.remote_rollout_processor import ElasticSearchConfig + +logger = logging.getLogger(__name__) + + +class ElasticsearchSetupError(Exception): + """Exception raised when Elasticsearch setup fails.""" + + pass + + +class ElasticsearchSetup: + """Handles Elasticsearch setup with retry logic for existing containers.""" + + def __init__(self): + self.eval_protocol_dir = find_eval_protocol_dir() + + def setup_elasticsearch(self) -> ElasticSearchConfig: + """ + Set up Elasticsearch, handling both local and remote scenarios. + Returns the ElasticSearchConfig for the running instance. + """ + elastic_start_local_dir = os.path.join(self.eval_protocol_dir, "elastic-start-local") + env_file_path = os.path.join(elastic_start_local_dir, ".env") + + # If elastic-start-local directory exists, use local script + if os.path.exists(elastic_start_local_dir): + return self._setup_local_elasticsearch(elastic_start_local_dir, env_file_path) + + # Otherwise, use remote curl command with retry logic + return self._setup_remote_elasticsearch(env_file_path) + + def _setup_local_elasticsearch(self, elastic_start_local_dir: str, env_file_path: str) -> ElasticSearchConfig: + """Set up Elasticsearch using local start.sh script.""" + from eval_protocol.utils.subprocess_utils import run_script_and_wait + + run_script_and_wait( + script_name="start.sh", + working_directory=elastic_start_local_dir, + inherit_stdout=True, + ) + return self._parse_elastic_env_file(env_file_path) + + def _setup_remote_elasticsearch(self, env_file_path: str) -> ElasticSearchConfig: + """Set up Elasticsearch using remote curl command with retry logic.""" + max_retries = 2 + for attempt in range(max_retries): + # Use a temporary file to capture output while also showing it in parent stdout + with tempfile.NamedTemporaryFile(mode="w+", delete=False) as temp_file: + temp_file_path = temp_file.name + + try: + # Run the command and tee output to both stdout and temp file + # Use set -o pipefail to ensure we get the return code of the first failing command + process = subprocess.Popen( + [ + "sh", + "-c", + f"set -o pipefail; curl -fsSL https://elastic.co/start-local | sh -s -- --esonly | tee {temp_file_path}", + ], + cwd=self.eval_protocol_dir, + ) + returncode = process.wait() + + # Read the captured output + with open(temp_file_path, "r") as f: + stdout = f.read() + + if returncode == 0: + return self._parse_elastic_env_file(env_file_path) + + # Check if container is already running and handle it + if self._handle_existing_elasticsearch_container(stdout): + logger.info(f"Retrying Elasticsearch setup (attempt {attempt + 1}/{max_retries})") + continue + + # If we get here, it's a different error + raise ElasticsearchSetupError( + f"Failed to start Elasticsearch (attempt {attempt + 1}/{max_retries}): {stdout}" + ) + + finally: + # Clean up the temporary file + try: + os.unlink(temp_file_path) + except OSError: + pass + + raise ElasticsearchSetupError(f"Failed to start Elasticsearch after {max_retries} attempts") + + def _handle_existing_elasticsearch_container(self, output: str) -> bool: + """ + Check if the curl command output indicates that the Elasticsearch container is already running. + If so, stop the existing container and return True to indicate a retry is needed. + """ + if "docker stop es-local-dev" in output: + logger.info("Elasticsearch container 'es-local-dev' is already running. Stopping it...") + try: + subprocess.run(["docker", "stop", "es-local-dev"], check=True, capture_output=True, text=True) + logger.info("Successfully stopped existing Elasticsearch container") + return True # Indicate retry is needed + except subprocess.CalledProcessError as e: + logger.warning(f"Failed to stop existing container: {e}") + return False + return False + + def _parse_elastic_env_file(self, env_file_path: str) -> ElasticSearchConfig: + """Parse ES_LOCAL_API_KEY and ES_LOCAL_URL from .env file.""" + loaded = load_dotenv(env_file_path) + if not loaded: + raise ElasticsearchSetupError("Failed to load .env file") + + api_key = os.getenv("ES_LOCAL_API_KEY") + url = os.getenv("ES_LOCAL_URL") + + if not url or not api_key: + raise ElasticsearchSetupError("Failed to parse ES_LOCAL_API_KEY and ES_LOCAL_URL from .env file") + + return ElasticSearchConfig(url=url, api_key=api_key) diff --git a/eval_protocol/pytest/remote_rollout_processor.py b/eval_protocol/pytest/remote_rollout_processor.py index d573e43b..3a507aae 100644 --- a/eval_protocol/pytest/remote_rollout_processor.py +++ b/eval_protocol/pytest/remote_rollout_processor.py @@ -1,17 +1,15 @@ import asyncio -import subprocess import time from typing import Any, Dict, List, Optional, Callable -from dotenv import load_dotenv import requests -from eval_protocol.directory_utils import find_eval_protocol_dir from eval_protocol.models import EvaluationRow, Status from eval_protocol.data_loader.dynamic_data_loader import DynamicDataLoader from eval_protocol.types.remote_rollout_processor import ElasticSearchConfig, InitRequest, RolloutMetadata from .rollout_processor import RolloutProcessor from .types import RolloutProcessorConfig +from .elasticsearch_setup import ElasticsearchSetup import logging import os @@ -57,47 +55,10 @@ def setup(self) -> None: self._elastic_search_config = self._setup_elastic_search() logger.info("Elasticsearch setup complete") - def _parse_elastic_env_file(self, env_file_path: str) -> ElasticSearchConfig: - """Parse ES_LOCAL_API_KEY and ES_LOCAL_URL from .env file.""" - loaded = load_dotenv(env_file_path) - if not loaded: - raise RuntimeError("Failed to load .env file") - api_key = os.getenv("ES_LOCAL_API_KEY") - url = os.getenv("ES_LOCAL_URL") - if not url or not api_key: - raise RuntimeError("Failed to parse ES_LOCAL_API_KEY and ES_LOCAL_URL from .env file") - return ElasticSearchConfig(url=url, api_key=api_key) - def _setup_elastic_search(self) -> ElasticSearchConfig: - eval_protocol_dir = find_eval_protocol_dir() - elastic_start_local_dir = os.path.join(eval_protocol_dir, "elastic-start-local") - env_file_path = os.path.join(elastic_start_local_dir, ".env") - - # if elastic-start-local directory exists, return the config - if os.path.exists(elastic_start_local_dir): - # run start.sh in the elastic-start-local directory - from eval_protocol.utils.subprocess_utils import run_script_and_wait - - run_script_and_wait( - script_name="start.sh", - working_directory=elastic_start_local_dir, - inherit_stdout=True, - ) - return self._parse_elastic_env_file(env_file_path) - - # run Elasticsearch start-local script: "curl -fsSL https://elastic.co/start-local | sh -s -- --esonly" - process = subprocess.Popen( - ["sh", "-c", "curl -fsSL https://elastic.co/start-local | sh -s -- --esonly"], - cwd=eval_protocol_dir, - stdout=subprocess.PIPE, - stderr=subprocess.STDOUT, - text=True, - ) - returncode = process.wait() - if returncode != 0: - raise RuntimeError("Failed to start Elasticsearch") - - return self._parse_elastic_env_file(env_file_path) + """Set up Elasticsearch using the dedicated setup module.""" + setup = ElasticsearchSetup() + return setup.setup_elasticsearch() def __call__(self, rows: List[EvaluationRow], config: RolloutProcessorConfig) -> List[asyncio.Task[EvaluationRow]]: tasks: List[asyncio.Task[EvaluationRow]] = [] From 388d23bfa377dccfce141db852ab41871632518d Mon Sep 17 00:00:00 2001 From: Dylan Huang Date: Wed, 1 Oct 2025 09:37:51 -0700 Subject: [PATCH 5/7] Refactor Elasticsearch setup methods in ElasticsearchSetup class - Renamed and updated methods for clarity: _setup_local_elasticsearch to _setup_existing_docker_elasticsearch and _setup_remote_elasticsearch to _setup_initialized_docker_elasticsearch. - Improved comments to better describe the purpose of each setup method. - Enhanced the logic for initializing Elasticsearch with Docker, ensuring clearer handling of existing and new setups. --- eval_protocol/pytest/elasticsearch_setup.py | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/eval_protocol/pytest/elasticsearch_setup.py b/eval_protocol/pytest/elasticsearch_setup.py index 57f833fb..c7705b3f 100644 --- a/eval_protocol/pytest/elasticsearch_setup.py +++ b/eval_protocol/pytest/elasticsearch_setup.py @@ -31,15 +31,17 @@ def setup_elasticsearch(self) -> ElasticSearchConfig: elastic_start_local_dir = os.path.join(self.eval_protocol_dir, "elastic-start-local") env_file_path = os.path.join(elastic_start_local_dir, ".env") - # If elastic-start-local directory exists, use local script + # If elastic-start-local directory exists, use existing Docker script if os.path.exists(elastic_start_local_dir): - return self._setup_local_elasticsearch(elastic_start_local_dir, env_file_path) + return self._setup_existing_docker_elasticsearch(elastic_start_local_dir, env_file_path) - # Otherwise, use remote curl command with retry logic - return self._setup_remote_elasticsearch(env_file_path) + # Otherwise, initialize Docker setup from scratch + return self._setup_initialized_docker_elasticsearch(env_file_path) - def _setup_local_elasticsearch(self, elastic_start_local_dir: str, env_file_path: str) -> ElasticSearchConfig: - """Set up Elasticsearch using local start.sh script.""" + def _setup_existing_docker_elasticsearch( + self, elastic_start_local_dir: str, env_file_path: str + ) -> ElasticSearchConfig: + """Set up Elasticsearch using existing Docker start.sh script.""" from eval_protocol.utils.subprocess_utils import run_script_and_wait run_script_and_wait( @@ -49,8 +51,8 @@ def _setup_local_elasticsearch(self, elastic_start_local_dir: str, env_file_path ) return self._parse_elastic_env_file(env_file_path) - def _setup_remote_elasticsearch(self, env_file_path: str) -> ElasticSearchConfig: - """Set up Elasticsearch using remote curl command with retry logic.""" + def _setup_initialized_docker_elasticsearch(self, env_file_path: str) -> ElasticSearchConfig: + """Set up Elasticsearch by initializing Docker setup from scratch with retry logic.""" max_retries = 2 for attempt in range(max_retries): # Use a temporary file to capture output while also showing it in parent stdout From 282000908e527d3a59636ed0acf33ecb9050c676 Mon Sep 17 00:00:00 2001 From: Dylan Huang Date: Wed, 1 Oct 2025 09:38:51 -0700 Subject: [PATCH 6/7] Add PID tracking to langfuse_row in RemoteRolloutProcessor - Introduced a new attribute 'pid' to langfuse_row to facilitate detection of stopped evaluations. - Updated comments to clarify the purpose of the new attribute in relation to logging status updates. --- eval_protocol/pytest/remote_rollout_processor.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/eval_protocol/pytest/remote_rollout_processor.py b/eval_protocol/pytest/remote_rollout_processor.py index 3a507aae..4fd442e4 100644 --- a/eval_protocol/pytest/remote_rollout_processor.py +++ b/eval_protocol/pytest/remote_rollout_processor.py @@ -220,6 +220,11 @@ def _load_data(): langfuse_row.input_metadata.dataset_info = row.input_metadata.dataset_info langfuse_row.eval_metadata = row.eval_metadata langfuse_row.ground_truth = row.ground_truth + + # this is useful to detect stopped evaluations so we can update + # the status in the logs server + langfuse_row.pid = row.pid + return langfuse_row else: raise ValueError("RemoteRolloutProcessor's output_data_loader should return exactly one row.") From 5496adad8c4cbf7823bc4d2c77c7f6b8be9c405a Mon Sep 17 00:00:00 2001 From: Dylan Huang Date: Wed, 1 Oct 2025 11:14:58 -0700 Subject: [PATCH 7/7] Add Elasticsearch logging and index management functionality - Introduced ElasticsearchDirectHttpHandler for asynchronous logging to Elasticsearch. - Added ElasticsearchIndexManager for managing index creation and mapping configuration. - Updated ElasticsearchSetup to create logging indices with proper mappings. - Enhanced ElasticSearchConfig model to include index_name attribute. - Implemented tests for ElasticsearchDirectHttpHandler to verify log transmission and sorting. --- .../elasticsearch_direct_http_handler.py | 91 ++++++++ .../logging/elasticsearch_index_manager.py | 187 +++++++++++++++++ eval_protocol/pytest/elasticsearch_setup.py | 50 ++++- .../types/remote_rollout_processor.py | 1 + .../test_elasticsearch_direct_http_handler.py | 196 ++++++++++++++++++ 5 files changed, 519 insertions(+), 6 deletions(-) create mode 100644 eval_protocol/logging/elasticsearch_direct_http_handler.py create mode 100644 eval_protocol/logging/elasticsearch_index_manager.py create mode 100644 tests/logging/test_elasticsearch_direct_http_handler.py diff --git a/eval_protocol/logging/elasticsearch_direct_http_handler.py b/eval_protocol/logging/elasticsearch_direct_http_handler.py new file mode 100644 index 00000000..cbe5e402 --- /dev/null +++ b/eval_protocol/logging/elasticsearch_direct_http_handler.py @@ -0,0 +1,91 @@ +import json +import logging +import asyncio +import threading +from concurrent.futures import ThreadPoolExecutor +from typing import Optional, Tuple, Any, Dict +from datetime import datetime +from urllib.parse import urlparse +import requests + +from eval_protocol.types.remote_rollout_processor import ElasticSearchConfig + + +class ElasticsearchDirectHttpHandler(logging.Handler): + def __init__(self, elasticsearch_config: ElasticSearchConfig) -> None: + super().__init__() + self.base_url: str = elasticsearch_config.url.rstrip("/") + self.index_name: str = elasticsearch_config.index_name + self.api_key: str = elasticsearch_config.api_key + self.url: str = f"{self.base_url}/{self.index_name}/_doc" + self.formatter: logging.Formatter = logging.Formatter() + self._executor = None + + # Parse URL to determine if we should verify SSL + parsed_url = urlparse(elasticsearch_config.url) + self.verify_ssl = parsed_url.scheme == "https" + + def emit(self, record: logging.LogRecord) -> None: + """Emit a log record by scheduling it for async transmission.""" + try: + # Create proper ISO 8601 timestamp + timestamp = datetime.fromtimestamp(record.created).strftime("%Y-%m-%dT%H:%M:%S.%fZ") + + data: Dict[str, Any] = { + "@timestamp": timestamp, + "level": record.levelname, + "message": record.getMessage(), + "logger_name": record.name, + # Add other relevant record attributes if needed + } + + # Schedule the HTTP request to run asynchronously + self._schedule_async_send(data, record) + except Exception as e: + self.handleError(record) + print(f"Error preparing log for Elasticsearch: {e}") + + def _schedule_async_send(self, data: Dict[str, Any], record: logging.LogRecord) -> None: + """Schedule an async task to send the log data to Elasticsearch.""" + if self._executor is None: + self._executor = ThreadPoolExecutor(max_workers=1, thread_name_prefix="elasticsearch-logger") + + # Submit the HTTP request to the thread pool + future = self._executor.submit(self._send_to_elasticsearch, data, record) + + # Add error handling callback + future.add_done_callback(lambda f: self._handle_async_result(f, record)) + + def _send_to_elasticsearch(self, data: Dict[str, Any], record: logging.LogRecord) -> None: + """Send data to Elasticsearch (runs in thread pool).""" + try: + response: requests.Response = requests.post( + self.url, + headers={"Content-Type": "application/json", "Authorization": f"ApiKey {self.api_key}"}, + data=json.dumps(data), + verify=self.verify_ssl, # If using HTTPS, verify SSL certificate + ) + response.raise_for_status() # Raise an exception for HTTP errors + except Exception as e: + # Re-raise to be handled by the callback + raise e + + def _handle_async_result(self, future, record: logging.LogRecord) -> None: + """Handle the result of the async send operation.""" + try: + future.result() # This will raise any exception that occurred + except Exception as e: + self.handleError(record) + # You might want to log this error to a file or console + # to prevent a logging loop. + if hasattr(e, "response") and getattr(e, "response", None) is not None: + print(f"Error sending log to Elasticsearch: {e}") + print(f"Response content: {getattr(e, 'response').text}") + else: + print(f"Error sending log to Elasticsearch: {e}") + + def close(self) -> None: + """Clean up resources when the handler is closed.""" + super().close() + if self._executor: + self._executor.shutdown(wait=True) diff --git a/eval_protocol/logging/elasticsearch_index_manager.py b/eval_protocol/logging/elasticsearch_index_manager.py new file mode 100644 index 00000000..c9808dbc --- /dev/null +++ b/eval_protocol/logging/elasticsearch_index_manager.py @@ -0,0 +1,187 @@ +import requests +from typing import Dict, Any, Optional +from urllib.parse import urlparse + + +class ElasticsearchIndexManager: + """Manages Elasticsearch index creation and mapping configuration.""" + + def __init__(self, base_url: str, index_name: str, api_key: str) -> None: + """Initialize the Elasticsearch index manager. + + Args: + base_url: Elasticsearch base URL (e.g., "https://localhost:9200") + index_name: Name of the index to manage + api_key: API key for authentication + """ + self.base_url: str = base_url.rstrip("/") + self.index_name: str = index_name + self.api_key: str = api_key + self.index_url: str = f"{self.base_url}/{self.index_name}" + self._mapping_created: bool = False + + # Parse URL to determine if we should verify SSL + parsed_url = urlparse(base_url) + self.verify_ssl = parsed_url.scheme == "https" + + def create_logging_index_mapping(self) -> bool: + """Create index with proper mapping for logging data. + + Returns: + bool: True if mapping was created successfully, False otherwise. + """ + if self._mapping_created: + return True + + try: + # Check if index exists and has correct mapping + if self._index_exists_with_correct_mapping(): + self._mapping_created = True + return True + + # If index exists but has wrong mapping, delete and recreate it + if self.index_exists(): + print(f"Warning: Index {self.index_name} exists with incorrect mapping. Deleting and recreating...") + if not self.delete_index(): + print(f"Warning: Failed to delete existing index {self.index_name}") + return False + + # Create index with proper mapping + mapping = self._get_logging_mapping() + response = requests.put( + self.index_url, + headers={"Content-Type": "application/json", "Authorization": f"ApiKey {self.api_key}"}, + json=mapping, + verify=self.verify_ssl, + ) + + if response.status_code in [200, 201]: + self._mapping_created = True + return True + else: + print(f"Warning: Failed to create index mapping: {response.status_code} - {response.text}") + return False + + except Exception as e: + print(f"Warning: Failed to create index mapping: {e}") + return False + + def _index_exists_with_correct_mapping(self) -> bool: + """Check if index exists and has the correct @timestamp mapping. + + Returns: + bool: True if index exists with correct mapping, False otherwise. + """ + try: + # Check if index exists + response = requests.head( + self.index_url, headers={"Authorization": f"ApiKey {self.api_key}"}, verify=self.verify_ssl + ) + + if response.status_code != 200: + return False + + # Check if mapping is correct + mapping_response = requests.get( + f"{self.index_url}/_mapping", + headers={"Authorization": f"ApiKey {self.api_key}"}, + verify=self.verify_ssl, + ) + + if mapping_response.status_code != 200: + return False + + mapping_data = mapping_response.json() + return self._has_correct_timestamp_mapping(mapping_data) + + except Exception: + return False + + def _has_correct_timestamp_mapping(self, mapping_data: Dict[str, Any]) -> bool: + """Check if the mapping has @timestamp as a date field. + + Args: + mapping_data: Elasticsearch mapping response data + + Returns: + bool: True if @timestamp is correctly mapped as date field + """ + try: + return ( + self.index_name in mapping_data + and "mappings" in mapping_data[self.index_name] + and "properties" in mapping_data[self.index_name]["mappings"] + and "@timestamp" in mapping_data[self.index_name]["mappings"]["properties"] + and mapping_data[self.index_name]["mappings"]["properties"]["@timestamp"].get("type") == "date" + ) + except (KeyError, TypeError): + return False + + def _get_logging_mapping(self) -> Dict[str, Any]: + """Get the standard mapping for logging data. + + Returns: + Dict containing the index mapping configuration + """ + return { + "mappings": { + "properties": { + "@timestamp": {"type": "date", "format": "strict_date_optional_time||epoch_millis"}, + "level": {"type": "keyword"}, + "message": {"type": "text"}, + "logger_name": {"type": "keyword"}, + } + } + } + + def delete_index(self) -> bool: + """Delete the managed index. + + Returns: + bool: True if index was deleted successfully, False otherwise. + """ + try: + response = requests.delete( + self.index_url, headers={"Authorization": f"ApiKey {self.api_key}"}, verify=self.verify_ssl + ) + if response.status_code in [200, 404]: # 404 means index doesn't exist, which is fine + self._mapping_created = False + return True + else: + print(f"Warning: Failed to delete index: {response.status_code} - {response.text}") + return False + except Exception as e: + print(f"Warning: Failed to delete index: {e}") + return False + + def index_exists(self) -> bool: + """Check if the index exists. + + Returns: + bool: True if index exists, False otherwise. + """ + try: + response = requests.head( + self.index_url, headers={"Authorization": f"ApiKey {self.api_key}"}, verify=self.verify_ssl + ) + return response.status_code == 200 + except Exception: + return False + + def get_index_stats(self) -> Optional[Dict[str, Any]]: + """Get statistics about the index. + + Returns: + Dict containing index statistics, or None if failed + """ + try: + response = requests.get( + f"{self.index_url}/_stats", + headers={"Authorization": f"ApiKey {self.api_key}"}, + verify=self.verify_ssl, + ) + if response.status_code == 200: + return response.json() + return None + except Exception: + return None diff --git a/eval_protocol/pytest/elasticsearch_setup.py b/eval_protocol/pytest/elasticsearch_setup.py index c7705b3f..1f3af3fc 100644 --- a/eval_protocol/pytest/elasticsearch_setup.py +++ b/eval_protocol/pytest/elasticsearch_setup.py @@ -7,6 +7,7 @@ from dotenv import load_dotenv from eval_protocol.directory_utils import find_eval_protocol_dir from eval_protocol.types.remote_rollout_processor import ElasticSearchConfig +from eval_protocol.logging.elasticsearch_index_manager import ElasticsearchIndexManager logger = logging.getLogger(__name__) @@ -23,20 +24,31 @@ class ElasticsearchSetup: def __init__(self): self.eval_protocol_dir = find_eval_protocol_dir() - def setup_elasticsearch(self) -> ElasticSearchConfig: + def setup_elasticsearch(self, index_name: str = "default-logs") -> ElasticSearchConfig: """ Set up Elasticsearch, handling both local and remote scenarios. - Returns the ElasticSearchConfig for the running instance. + + Args: + index_name: Name of the Elasticsearch index to use for logging + + Returns: + ElasticSearchConfig for the running instance with the specified index name. """ elastic_start_local_dir = os.path.join(self.eval_protocol_dir, "elastic-start-local") env_file_path = os.path.join(elastic_start_local_dir, ".env") # If elastic-start-local directory exists, use existing Docker script if os.path.exists(elastic_start_local_dir): - return self._setup_existing_docker_elasticsearch(elastic_start_local_dir, env_file_path) + config = self._setup_existing_docker_elasticsearch(elastic_start_local_dir, env_file_path) + else: + # Otherwise, initialize Docker setup from scratch + config = self._setup_initialized_docker_elasticsearch(env_file_path) + + # Create the logging index with proper mapping + self.create_logging_index(index_name) - # Otherwise, initialize Docker setup from scratch - return self._setup_initialized_docker_elasticsearch(env_file_path) + # Return config with the specified index name + return ElasticSearchConfig(url=config.url, api_key=config.api_key, index_name=index_name) def _setup_existing_docker_elasticsearch( self, elastic_start_local_dir: str, env_file_path: str @@ -126,4 +138,30 @@ def _parse_elastic_env_file(self, env_file_path: str) -> ElasticSearchConfig: if not url or not api_key: raise ElasticsearchSetupError("Failed to parse ES_LOCAL_API_KEY and ES_LOCAL_URL from .env file") - return ElasticSearchConfig(url=url, api_key=api_key) + return ElasticSearchConfig(url=url, api_key=api_key, index_name="default-logs") + + def create_logging_index(self, index_name: str) -> bool: + """Create an Elasticsearch index with proper mapping for logging data. + + Args: + index_name: Name of the index to create + + Returns: + bool: True if index was created successfully, False otherwise. + """ + try: + # Get the config from the .env file + config = self._parse_elastic_env_file(self._get_env_file_path()) + + # Create index manager and set up mapping + index_manager = ElasticsearchIndexManager(config.url, index_name, config.api_key) + return index_manager.create_logging_index_mapping() + + except Exception as e: + logger.error(f"Failed to create logging index {index_name}: {e}") + return False + + def _get_env_file_path(self) -> str: + """Get the path to the .env file.""" + elastic_start_local_dir = os.path.join(self.eval_protocol_dir, "elastic-start-local") + return os.path.join(elastic_start_local_dir, ".env") diff --git a/eval_protocol/types/remote_rollout_processor.py b/eval_protocol/types/remote_rollout_processor.py index 422b8401..e8ccdf75 100644 --- a/eval_protocol/types/remote_rollout_processor.py +++ b/eval_protocol/types/remote_rollout_processor.py @@ -14,6 +14,7 @@ class ElasticSearchConfig(BaseModel): url: str api_key: str + index_name: str class RolloutMetadata(BaseModel): diff --git a/tests/logging/test_elasticsearch_direct_http_handler.py b/tests/logging/test_elasticsearch_direct_http_handler.py new file mode 100644 index 00000000..8b7eb892 --- /dev/null +++ b/tests/logging/test_elasticsearch_direct_http_handler.py @@ -0,0 +1,196 @@ +import os +import logging +import time +import requests +import pytest +from urllib.parse import urlparse + +from eval_protocol.logging.elasticsearch_direct_http_handler import ElasticsearchDirectHttpHandler +from eval_protocol.pytest.elasticsearch_setup import ElasticsearchSetup +from eval_protocol.types.remote_rollout_processor import ElasticSearchConfig + + +@pytest.fixture +def elasticsearch_config(): + """Set up Elasticsearch and return configuration.""" + import time + + index_name = f"test-logs-{int(time.time())}" + setup = ElasticsearchSetup() + config = setup.setup_elasticsearch(index_name) + return config + + +@pytest.fixture +def elasticsearch_handler(elasticsearch_config: ElasticSearchConfig): + """Create and configure ElasticsearchDirectHttpHandler.""" + # Use a unique test-specific index name with timestamp + + handler = ElasticsearchDirectHttpHandler(elasticsearch_config) + + # Set a specific log level + handler.setLevel(logging.INFO) + + return handler + + +@pytest.fixture +def test_logger(elasticsearch_handler, elasticsearch_config): + """Set up a test logger with the Elasticsearch handler.""" + # Create the index for this specific handler + setup = ElasticsearchSetup() + setup.create_logging_index(elasticsearch_handler.index_name) + + logger = logging.getLogger("test_elasticsearch_logger") + logger.setLevel(logging.INFO) + + # Clear any existing handlers + logger.handlers.clear() + + # Add our Elasticsearch handler + logger.addHandler(elasticsearch_handler) + + # Prevent propagation to avoid duplicate logs + logger.propagate = False + + return logger + + +@pytest.mark.skipif(os.environ.get("CI") == "true", reason="Only run this test locally (skipped in CI)") +def test_elasticsearch_direct_http_handler_sends_logs( + elasticsearch_config: ElasticSearchConfig, test_logger: logging.Logger +): + """Test that ElasticsearchDirectHttpHandler successfully sends logs to Elasticsearch.""" + + # Generate a unique test message to avoid conflicts + test_message = f"Test log message at {time.time()}" + + # Send the log message + test_logger.info(test_message) + + # Give Elasticsearch a moment to process the document + time.sleep(3) + + # Query Elasticsearch to verify the document was received + # Parse the URL to construct the search endpoint + parsed_url = urlparse(elasticsearch_config.url) + base_url = f"{parsed_url.scheme}://{parsed_url.netloc}" + search_url = f"{base_url}/{elasticsearch_config.index_name}/_search" + + # Prepare the search query with sorting by @timestamp + search_query = { + "query": {"match": {"message": test_message}}, + "sort": [{"@timestamp": {"order": "desc"}}], + "size": 1, + } + + # Execute the search + response = requests.post( + search_url, + headers={"Content-Type": "application/json", "Authorization": f"ApiKey {elasticsearch_config.api_key}"}, + json=search_query, + verify=parsed_url.scheme == "https", + ) + + # Check for errors and provide better debugging + if response.status_code != 200: + print(f"Elasticsearch search failed with status {response.status_code}") + print(f"Response: {response.text}") + response.raise_for_status() + + search_results = response.json() + + # Assert that we found our log message + assert "hits" in search_results, "Search response should contain 'hits'" + assert "total" in search_results["hits"], "Search hits should contain 'total'" + + total_hits = search_results["hits"]["total"] + if isinstance(total_hits, dict): + # Elasticsearch 7+ format + total_count = total_hits["value"] + else: + # Elasticsearch 6 format + total_count = total_hits + + assert total_count > 0, f"Expected to find at least 1 log message, but found {total_count}" + + # Verify the content of the found document + hits = search_results["hits"]["hits"] + assert len(hits) > 0, "Expected at least one hit" + + found_document = hits[0]["_source"] + assert found_document["message"] == test_message, ( + f"Expected message '{test_message}', got '{found_document['message']}'" + ) + assert found_document["level"] == "INFO", f"Expected level 'INFO', got '{found_document['level']}'" + assert found_document["logger_name"] == "test_elasticsearch_logger", ( + f"Expected logger name 'test_elasticsearch_logger', got '{found_document['logger_name']}'" + ) + assert "@timestamp" in found_document, "Expected document to contain '@timestamp' field" + + print(f"Successfully verified log message in Elasticsearch: {test_message}") + + +@pytest.mark.skipif(os.environ.get("CI") == "true", reason="Only run this test locally (skipped in CI)") +def test_elasticsearch_direct_http_handler_sorts_logs_chronologically( + elasticsearch_config: ElasticSearchConfig, test_logger: logging.Logger +): + """Test that logs can be sorted chronologically by timestamp.""" + + # Send multiple log messages with small delays to ensure different timestamps + test_messages = [] + for i in range(3): + message = f"Chronological test message {i} at {time.time()}" + test_messages.append(message) + test_logger.info(message) + time.sleep(0.1) # Small delay to ensure different timestamps + + # Give Elasticsearch time to process all documents + time.sleep(2) + + # Query Elasticsearch to get all our test messages sorted by timestamp + parsed_url = urlparse(elasticsearch_config.url) + base_url = f"{parsed_url.scheme}://{parsed_url.netloc}" + search_url = f"{base_url}/{elasticsearch_config.index_name}/_search" + + # Search for all messages containing our test prefix + search_query = { + "query": {"match_phrase_prefix": {"message": "Chronological test message"}}, + "sort": [{"@timestamp": {"order": "asc"}}], # Ascending order (oldest first) + "size": 10, + } + + response = requests.post( + search_url, + headers={"Content-Type": "application/json", "Authorization": f"ApiKey {elasticsearch_config.api_key}"}, + json=search_query, + verify=parsed_url.scheme == "https", + ) + + if response.status_code != 200: + print(f"Elasticsearch search failed with status {response.status_code}") + print(f"Response: {response.text}") + response.raise_for_status() + + search_results = response.json() + + # Verify we found our messages + hits = search_results["hits"]["hits"] + assert len(hits) >= 3, f"Expected at least 3 messages, found {len(hits)}" + + # Extract messages and verify they are in chronological order + found_messages = [hit["_source"]["message"] for hit in hits] + found_timestamps = [hit["_source"]["@timestamp"] for hit in hits] + + # Verify all our test messages are present + for test_message in test_messages: + assert test_message in found_messages, f"Expected message '{test_message}' not found in results" + + # Verify timestamps are in ascending order (chronological) + for i in range(1, len(found_timestamps)): + assert found_timestamps[i - 1] <= found_timestamps[i], ( + f"Timestamps not in chronological order: {found_timestamps[i - 1]} > {found_timestamps[i]}" + ) + + print(f"Successfully verified chronological sorting of {len(hits)} log messages") + print(f"Timestamps in order: {found_timestamps}")