Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
aca194f
Refactor agent output handlers
val500 Apr 16, 2025
73653f8
Added tests for new handlers
val500 Apr 22, 2025
6423926
Replace tmpdir fixture with pytest tmp_path
val500 Apr 24, 2025
6e42e47
Remove log buffer field
val500 Apr 30, 2025
c4599ed
Change log endpoint in agent code
val500 Apr 30, 2025
6d89bac
Add docstrings to Log Handlers
val500 Apr 30, 2025
338bce9
Change pass to NotImplementedError
val500 Apr 30, 2025
731ac00
Remove post_output and post_serial functions
val500 Apr 30, 2025
e1823a2
Create LogType in common module and use it in agent
val500 May 1, 2025
bcd6729
Change LogEndpointInput to a dataclass
val500 May 1, 2025
f6a21da
Ruff linting fixes
val500 May 1, 2025
70d5dc4
Changed NORMAL_OUTPUT to STANDARD_OUTPUT
val500 May 5, 2025
1468283
Formatting
val500 May 5, 2025
50d07d8
Formatting
val500 May 6, 2025
aac28bf
Added logging handlers and changed logging endpoints
val500 Apr 24, 2025
1b4f4ab
Added tests for logging handlers
val500 Apr 24, 2025
cd471f8
Added phase querying to GET endpoint
val500 May 5, 2025
e728e2b
Modify agent to only send status as results
val500 May 6, 2025
bd43c79
Change schema for results endpoint and use log handler
val500 May 8, 2025
2a9c166
Add tests to agent for new results format
val500 May 8, 2025
68accc9
Fix formatting
val500 May 12, 2025
5fcc334
Add tests for results endpoint
val500 May 12, 2025
3d3a2e9
conserver temp
val500 May 15, 2025
582fa30
test_serial
val500 May 15, 2025
9e63445
temp
val500 May 19, 2025
cedb5fd
f
val500 May 19, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 30 additions & 37 deletions agent/src/testflinger_agent/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import shutil
import tempfile
import time
from dataclasses import asdict, dataclass
from pathlib import Path
from typing import Dict, List
from urllib.parse import urljoin
Expand All @@ -27,12 +28,23 @@
from influxdb.exceptions import InfluxDBClientError
from requests.adapters import HTTPAdapter
from requests.packages.urllib3.util.retry import Retry
from testflinger_common.enums import LogType

from testflinger_agent.errors import TFServerError

logger = logging.getLogger(__name__)


@dataclass(frozen=True)
class LogEndpointInput:
"""Schema for Testflinger Log endpoints."""

fragment_number: int
timestamp: str
phase: str
log_data: str


class TestflingerClient:
__test__ = False
"""This prevents pytest from trying to run this class as a test."""
Expand Down Expand Up @@ -135,33 +147,6 @@ def check_job_state(self, job_id):
if job_data:
return job_data.get("job_state")

def repost_job(self, job_data):
"""Resubmit the job to the testflinger server with the same id.

:param job_id:
id for the job on which we want to post results
"""
job_uri = urljoin(self.server, "/v1/job")
job_id = job_data.get("job_id")
logger.info("Resubmitting job: %s", job_id)
job_output = """
There was an unrecoverable error while running this stage. Your job
will attempt to be automatically resubmitted back to the queue.
Resubmitting job: {}\n""".format(job_id)
self.post_live_output(job_id, job_output)
try:
job_request = self.session.post(job_uri, json=job_data)
except requests.exceptions.RequestException as exc:
logger.error(exc)
raise TFServerError("other exception") from exc
if not job_request:
logger.error(
"Unable to re-post job to: %s (error: %d)",
job_uri,
job_request.status_code,
)
raise TFServerError(job_request.status_code)

def post_job_state(self, job_id, phase):
"""Update the job_state on the testflinger server."""
try:
Expand Down Expand Up @@ -252,6 +237,9 @@ def transmit_job_outcome(self, rundir):
logger.info("Submitting job outcome for job: %s", job_id)
with open(outcome_file) as f:
data = json.load(f)
# Only include status in posted results
data.pop("output", None)
data.pop("serial", None)
data["job_state"] = "complete"
self.post_result(job_id, data)
# Remove the outcome file so we don't retransmit
Expand Down Expand Up @@ -297,20 +285,25 @@ def save_artifacts(self, rundir, job_id):
else:
shutil.rmtree(artifacts_dir)

def post_live_output(self, job_id, data):
"""Post output data to the testflinger server for this job.
def post_log(
self,
job_id: str,
log_input: LogEndpointInput,
log_type: LogType,
):
"""Post log data to the testflinger server for this job.

:param job_id:
id for the job on which we want to post results
:param data:
string with latest output data
:param job_id
id for the job
:param log_input
Dataclass with all of the keys for the log endpoint
:param log_type
Enum of different log types the server accepts
"""
output_uri = urljoin(
self.server, "/v1/result/{}/output".format(job_id)
)
endpoint = urljoin(self.server, f"/v1/result/{job_id}/log/{log_type}")
try:
job_request = self.session.post(
output_uri, data=data.encode("utf-8"), timeout=60
endpoint, json=asdict(log_input), timeout=60
)
except requests.exceptions.RequestException as exc:
logger.error(exc)
Expand Down
86 changes: 77 additions & 9 deletions agent/src/testflinger_agent/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,22 +12,90 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>

from .client import TestflingerClient
from abc import ABC, abstractmethod
from datetime import datetime, timezone

from testflinger_common.enums import LogType

from .client import LogEndpointInput, TestflingerClient

class LiveOutputHandler:
def __init__(self, client: TestflingerClient, job_id: str):
self.client = client
self.job_id = job_id

class LogHandler(ABC):
"""Abstract callable class that receives live log updates."""

@abstractmethod
def __call__(self, data: str):
self.client.post_live_output(self.job_id, data)
raise NotImplementedError


class FileLogHandler(LogHandler):
"""
Implementation of LogHandler that writes live log updates
to a file.
"""

class LogUpdateHandler:
def __init__(self, log_file: str):
self.log_file = log_file
def __init__(self, filename: str):
self.log_file = filename

def __call__(self, data: str):
with open(self.log_file, "a") as log:
log.write(data)


class EndpointLogHandler(LogHandler):
"""
Abstract class that writes live log updates to a generic endpoint
in Testflinger server.
"""

def __init__(self, client: TestflingerClient, job_id: str, phase: str):
self.fragment_number = 0
self.client = client
self.phase = phase
self.job_id = job_id

@abstractmethod
def write_to_endpoint(self, data: LogEndpointInput):
raise NotImplementedError

def __call__(self, data: str):
log_input = LogEndpointInput(
self.fragment_number,
datetime.now(timezone.utc).isoformat(),
self.phase,
data,
)
self.write_to_endpoint(log_input)
self.fragment_number += 1

def write_from_file(self, filename: str, chunk_size: int = 1024):
"""Write logs to endpoint from a file chunking by chunk_size."""
try:
with open(filename, "r") as log:
while True:
data = log.read(chunk_size)
if not data:
break
self(data)
except FileNotFoundError:
pass


class OutputLogHandler(EndpointLogHandler):
"""
Implementation of EndpointLogHandler that writes logs to the output
endpoint in Testflinger server.
"""

def write_to_endpoint(self, data: LogEndpointInput):
self.client.post_log(self.job_id, data, LogType.STANDARD_OUTPUT)


class SerialLogHandler(EndpointLogHandler):
"""
Implementation of EndpointLogHandler that writes logs to the serial
endpoint in Testflinger server.
"""

def write_to_endpoint(self, data: LogEndpointInput):
self.client.post_log(self.job_id, data, LogType.SERIAL_OUTPUT)
92 changes: 73 additions & 19 deletions agent/src/testflinger_agent/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,16 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>

import copy
import json
import logging
import os
import signal
import time

from testflinger_agent.errors import TFServerError

from .handlers import LiveOutputHandler, LogUpdateHandler
from .handlers import FileLogHandler, OutputLogHandler, SerialLogHandler
from .runner import CommandRunner, RunnerEvents
from .stop_condition_checkers import (
GlobalTimeoutChecker,
Expand All @@ -45,6 +47,30 @@ def __init__(self, job_data, client):
self.job_data = job_data
self.job_id = job_data.get("job_id")
self.phase = "unknown"
self.conserver_runner = None
self.command_runner = None

def cleanup_runners(self):
"""
Reset termination signal handler and runs cleanup function on a set of
runners.
"""
# Resets termination signal behavior to default
signal.signal(signal.SIGTERM, signal.SIG_DFL)
if self.conserver_runner is not None:
self.conserver_runner.cleanup()
if self.command_runner is not None:
self.command_runner.cleanup()

def _start_conserver_capture(self):
"""Start capturing serial logs from the conserver server."""
if "conserver_address" in self.client.config:
self.conserver_runner.run_async(
(
f"console -M {self.client.config['conserver_address']} "
f"{self.client.config['agent_id']} -s"
)
)

def run_test_phase(self, phase, rundir):
"""Run the specified test phase in rundir.
Expand Down Expand Up @@ -83,28 +109,45 @@ def run_test_phase(self, phase, rundir):
results_file = os.path.join(rundir, "testflinger-outcome.json")
output_log = os.path.join(rundir, phase + ".log")
serial_log = os.path.join(rundir, phase + "-serial.log")
serial_log_conserver = os.path.join(
rundir, phase + "-serial-conserver.log"
)

logger.info("Running %s_command: %s", phase, cmd)
runner = CommandRunner(cwd=rundir, env=self.client.config)
output_log_handler = LogUpdateHandler(output_log)
live_output_handler = LiveOutputHandler(self.client, self.job_id)
runner.register_output_handler(output_log_handler)
runner.register_output_handler(live_output_handler)
self.command_runner = CommandRunner(cwd=rundir, env=self.client.config)
self.conserver_runner = CommandRunner(
cwd=rundir, env=self.client.config
)
output_file_handler = FileLogHandler(output_log)
live_output_handler = OutputLogHandler(self.client, self.job_id, phase)
serial_output_handler = SerialLogHandler(
copy.deepcopy(self.client), self.job_id, phase
)
serial_file_handler = FileLogHandler(serial_log_conserver)

self.command_runner.register_output_handler(output_file_handler)
self.command_runner.register_output_handler(live_output_handler)
self.conserver_runner.register_output_handler(serial_file_handler)
self.conserver_runner.register_output_handler(serial_output_handler)

# Reserve phase uses a separate timeout handler
if phase != "reserve":
global_timeout_checker = GlobalTimeoutChecker(
self.get_global_timeout()
)
runner.register_stop_condition_checker(global_timeout_checker)
self.command_runner.register_stop_condition_checker(
global_timeout_checker
)

# We only need to check for output timeouts during the test phase
if phase == "test":
output_timeout_checker = OutputTimeoutChecker(
self.get_output_timeout()
)
runner.register_stop_condition_checker(output_timeout_checker)
runner.subscribe_event(
self.command_runner.register_stop_condition_checker(
output_timeout_checker
)
self.command_runner.subscribe_event(
RunnerEvents.OUTPUT_RECEIVED, output_timeout_checker.update
)

Expand All @@ -113,16 +156,25 @@ def run_test_phase(self, phase, rundir):
job_cancelled_checker = JobCancelledChecker(
self.client, self.job_id
)
runner.register_stop_condition_checker(job_cancelled_checker)
self.command_runner.register_stop_condition_checker(
job_cancelled_checker
)

for line in self.banner(
"Starting testflinger {} phase on {}".format(phase, node)
):
runner.run(f"echo '{line}'")
self.command_runner.run(f"echo '{line}'")
try:
# Set exit_event to fail for this phase in case of an exception
exit_event = f"{phase}_fail"
exitcode, exit_event, exit_reason = runner.run(cmd)

# Cleanup all running processes on sigterm
signal.signal(
signal.SIGTERM, lambda signum, frame: self.cleanup_runners()
)
self._start_conserver_capture()
exitcode, exit_event, exit_reason = self.command_runner.run(cmd)
self.conserver_runner.kill_async()
# make sure the exit code is within the expected 0-255 range
# (this also handles negative numbers)
exitcode = exitcode % 256
Expand All @@ -131,6 +183,9 @@ def run_test_phase(self, phase, rundir):
exitcode = 100
exit_reason = str(exc) # noqa: F841 - ignore this until it's used
finally:
# Write serial log file generated in device connector to
# the serial log endpoint if the file exists
serial_output_handler.write_from_file(serial_log)
self._update_phase_results(
results_file, phase, exitcode, output_log, serial_log
)
Expand Down Expand Up @@ -160,14 +215,13 @@ def _update_phase_results(
with open(results_file, "r+") as results:
outcome_data = json.load(results)
if os.path.exists(output_log):
outcome_data[phase + "_output"] = read_truncated(
output_log, size=max_log_size
)
phase_outputs = outcome_data.setdefault("output", {})
phase_outputs[phase] = read_truncated(output_log, max_log_size)
if os.path.exists(serial_log):
outcome_data[phase + "_serial"] = read_truncated(
serial_log, max_log_size
)
outcome_data[phase + "_status"] = exitcode
phase_serials = outcome_data.setdefault("serial", {})
phase_serials[phase] = read_truncated(serial_log, max_log_size)
phase_status = outcome_data.setdefault("status", {})
phase_status[phase] = exitcode
results.seek(0)
json.dump(outcome_data, results)

Expand Down
Loading