Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
acd2822
Initial integration of a Slurm-based scheduler
Mar 6, 2025
e63a818
Upgrading actions
Mar 6, 2025
2973d9f
Fixing documentation build
Mar 6, 2025
32f90eb
Lint fixes
Mar 6, 2025
8c145d3
Avoiding collision with 'stop' being propagated through websocket wra…
Mar 6, 2025
d1ee314
Passing resource requests through to Slurm
Mar 18, 2025
cb49473
Exporting Feature
Mar 18, 2025
7421015
Changing the way Slurm jobs are launched to allow for more fine-grain…
Mar 27, 2025
5f48e47
Refining Slurm job launching
Mar 27, 2025
e4dad6c
Further refinements
Mar 28, 2025
bcb061d
Removing unnecessary tee
Mar 28, 2025
30cf170
Adding a Python logging adapter
Apr 3, 2025
a7df73c
Adding a method to lookup the parent layer's address
Apr 15, 2025
49c093d
Adding missing 'extend_env' flag to JobGroup
Apr 15, 2025
0201baa
Adding a process statistics reporter
Apr 15, 2025
07450d3
Extra logging messages
Apr 16, 2025
b70b351
Restructuring dependencies
Apr 16, 2025
d98601e
Making hub group optional
Apr 16, 2025
3ad878c
Fixing duplicate dependency on uwsgi
Apr 16, 2025
63819c4
Adding a teardown method to ProcessStats
Apr 16, 2025
81961ab
Normalising handling of CPU and memory resources
Apr 17, 2025
2c57c8a
Missing brackets
Apr 17, 2025
1a29903
Corrections
Apr 17, 2025
2640348
Truncating memory decimal places
Apr 17, 2025
89cb5e8
Debugging issue with log handler
Apr 17, 2025
e5477b8
Removing debug prints
Apr 17, 2025
1faabdf
Logged items now included in messages.log
Apr 17, 2025
f72927c
Swallowing extra arguments
Apr 17, 2025
a57a4dd
Correcting logging of process extra usage
Apr 17, 2025
2c1c334
Including logging hierarchy
Apr 17, 2025
092de06
Removing debug prints
Apr 17, 2025
dbe5b82
Adjusting log message format written to file
Apr 17, 2025
639bc96
Fixing lint issues
Intuity Jun 20, 2025
e12ae75
Fixing various test bugs
Intuity Jun 20, 2025
76d81a1
Fixing remaining testcases
Intuity Jun 23, 2025
06053a7
Removing duplicate aiohttp dependency
Intuity Jun 23, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 7 additions & 7 deletions .github/workflows/python-package.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@ jobs:
runs-on: ubuntu-latest
timeout-minutes: 15
steps:
- uses: actions/checkout@v3
- uses: actions/checkout@v4
- name: Set up Python 3.11
uses: actions/setup-python@v3
uses: actions/setup-python@v5
with:
python-version: 3.11
- name: Install Poetry
Expand All @@ -43,9 +43,9 @@ jobs:
runs-on: ubuntu-latest
timeout-minutes: 15
steps:
- uses: actions/checkout@v3
- uses: actions/checkout@v4
- name: Set up Python 3.11
uses: actions/setup-python@v3
uses: actions/setup-python@v5
with:
python-version: 3.11
- name: Install Poetry
Expand Down Expand Up @@ -78,12 +78,12 @@ jobs:
strategy:
fail-fast: false
matrix:
python-version: ["3.8", "3.9", "3.10", "3.11"]
python-version: ["3.11", "3.12", "3.13"]

steps:
- uses: actions/checkout@v3
- uses: actions/checkout@v4
- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v3
uses: actions/setup-python@v5
with:
python-version: ${{ matrix.python-version }}
- name: Install Poetry
Expand Down
11 changes: 11 additions & 0 deletions examples/job_logging.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
!Job
ident: test_job
env:
test_key_a: hey
test_key_b: you
command: python3
args:
- examples/log_test.py
resources:
- !Cores [1]
- !Memory [30, MB]
25 changes: 25 additions & 0 deletions examples/log_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
import logging
from time import sleep

from gator.adapters.logging import GatorHandler

if __name__ == "__main__":
logging.basicConfig(
level="NOTSET",
format="%(message)s",
datefmt="[%X]",
handlers=[GatorHandler()],
)
log = logging.getLogger("log_test")
log.debug("A debug message!")
sleep(1)
log.getChild("a.b.c.d").info("Hello world!")
sleep(1)
for idx in range(30):
log.getChild("b").info(f"Pass {idx}")
sleep(0.2)
log.getChild("c").warning("A warning message!")
sleep(1)
log.error("An error message!")
sleep(1)
log.info("DONE")
26 changes: 17 additions & 9 deletions gator/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@

from . import launch, launch_progress
from .common.logger import MessageLimits
from .scheduler import LocalScheduler
from .common.ws_wrapper import WebsocketWrapper
from .scheduler import LocalScheduler, SlurmScheduler
from .specs import Spec
from .specs.common import SpecError

Expand Down Expand Up @@ -54,7 +55,7 @@
@click.option(
"--scheduler",
default="local",
type=click.Choice(("local",), case_sensitive=False),
type=click.Choice(("local", "slurm"), case_sensitive=False),
help="Select the scheduler to use for launching jobs",
show_default=True,
)
Expand Down Expand Up @@ -101,7 +102,10 @@ def main(
)
tracking.mkdir(parents=True, exist_ok=True)
# Select the right scheduler
sched = {"local": LocalScheduler}.get(scheduler.lower())
sched = {
"local": LocalScheduler,
"slurm": SlurmScheduler,
}.get(scheduler.lower())
# Break apart scheduler options as '<KEY>=<VALUE>'
sched_opts = {}
for arg in sched_arg:
Expand All @@ -115,6 +119,7 @@ def main(
key, val = arg.split("=")
sched_opts[key.strip()] = val.strip()
# Launch with optional progress tracking
exit_code = 0
try:
summary = asyncio.run(
(launch_progress if progress else launch).launch(
Expand All @@ -136,6 +141,8 @@ def main(
),
)
)
if not summary.passed:
exit_code = 1
except SpecError as e:
console_file = (Path(tracking) / "error.log").open("a") if parent else None
con = Console(file=console_file)
Expand All @@ -146,18 +153,19 @@ def main(
if hasattr(e.obj, "jobs"):
e.obj.jobs = ["..."]
con.log(Spec.dump([e.obj]))
sys.exit(1)
exit_code = 1
except BaseException:
console_file = (Path(tracking) / "error.log").open("a") if parent else None
con = Console(file=console_file)
con.log(traceback.format_exc())
if verbose:
con.print_exception(show_locals=True)
sys.exit(1)

if summary.passed:
sys.exit(0)
sys.exit(1)
exit_code = 1
finally:
# Stop active websocket wrappers (may be left over if an exception occurs)
asyncio.run(WebsocketWrapper.stop_all())
# Forward an exception code
sys.exit(exit_code)


if __name__ == "__main__":
Expand Down
13 changes: 13 additions & 0 deletions gator/adapters/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# Copyright 2023, Peter Birch, mailto:peter@lightlogic.co.uk
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
45 changes: 45 additions & 0 deletions gator/adapters/logging.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
# Copyright 2023, Peter Birch, mailto:peter@lightlogic.co.uk
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import logging
from datetime import datetime

from .parent import Parent


class GatorHandler(logging.Handler):
"""
Custom handler for Python logging to redirect messages via Gator's logging
API such that severities are correctly recorded.

:param ws_address: Optional websocket address for the parent tier, otherwise
it will be read from the GATOR_PARENT environment variable
"""

def __init__(self, ws_address: str | None = None):
super().__init__()
self._parent = Parent(ws_address)
self._do_log("INFO", "Log fowarding via GatorHandler", "root")

def _do_log(self, severity: str, message: str, hierarchy: str):
self._parent.post(
"log",
timestamp=datetime.now().timestamp(),
hierarchy=hierarchy,
severity=severity,
message=message,
)

def emit(self, record: logging.LogRecord):
self._do_log(record.levelname, record.getMessage(), record.name)
115 changes: 115 additions & 0 deletions gator/adapters/parent.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
# Copyright 2023, Peter Birch, mailto:peter@lightlogic.co.uk
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import atexit
import json
import logging
import os
import sys
from queue import SimpleQueue
from threading import Event, Thread

from websockets.exceptions import ConnectionClosed
from websockets.sync.client import connect


class TeardownMarker:
pass


class Parent:
"""
Thread based wrapper around the Gator websocket interface

:param ws_address: Optional websocket address for the parent tier, otherwise
it will be read from the GATOR_PARENT environment variable
"""

def __init__(self, ws_address: str | None = None):
self._ws_address = ws_address or Parent.get_parent_address()
assert self._ws_address, (
"Websocket address for parent process is not set and could not be "
"determined from the environment"
)
self._rx_q = SimpleQueue[dict[str, str]]
self._tx_q = SimpleQueue[TeardownMarker | dict[str, str]]()
self._teardown_evt = Event()
self._ws_thread = Thread(target=self._manage_ws, daemon=True)
self._ws_thread.start()
atexit.register(self._teardown_at_exit)

@staticmethod
def get_parent_address() -> str | None:
return os.environ.get("GATOR_PARENT", None)

def post(self, action, **payload):
self._tx_q.put(
{
"action": action,
"posted": True,
"payload": payload,
}
)

def receive(self) -> dict[str, str]:
return self._rx_q.get()

def _manage_ws(self):
idx = 0

def _receiver(ws, rx_q: SimpleQueue[dict[str, str]]):
try:
for packet in ws:
rx_q.put(json.loads(packet))
except ConnectionClosed:
pass

rx_thread = None
try:
with connect(
f"ws://{self._ws_address}",
logger=(logger := logging.getLogger("gator_ws")),
) as ws:
# Disable log propagation to avoid recursive forwarding
logger.propagate = False
# Setup a receiving thread
rx_thread = Thread(target=_receiver, daemon=True, args=(ws, self._rx_q))
rx_thread.start()
# Transmit until a teardown is inserted
while True:
packet = self._tx_q.get()
# Check if the process wants us to teardown
if isinstance(packet, TeardownMarker):
break
# Otherwise log the message
ws.send(json.dumps(packet))
idx += 1
except ConnectionClosed:
pass
# Wait for the receiver thread to end
rx_thread.join()
# Set the teardown event to signify a clean exit
self._teardown_evt.set()

def _teardown_at_exit(self):
self._teardown()

def _teardown(self):
self._tx_q.put(TeardownMarker())
if not self._teardown_evt.wait(timeout=10):
print(
"Gator timed out waiting for the websocket thread to teardown, "
"some packets may have been missed!",
file=sys.stderr,
)
42 changes: 42 additions & 0 deletions gator/adapters/pstats.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
# Copyright 2023, Peter Birch, mailto:peter@lightlogic.co.uk
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from datetime import datetime

from .parent import Parent


class ProcessStats:
"""
Custom process statistics gathering for operations that Gator cannot normally
track, for example launched Docker containers.

:param ws_address: Optional websocket address for the parent tier, otherwise
it will be read from the GATOR_PARENT environment variable
"""

def __init__(self, ws_address: str | None = None):
super().__init__()
self._parent = Parent(ws_address)

def record(self, cpu_perc: float, memory: float):
self._parent.post(
"extra_usage",
timestamp=datetime.now().timestamp(),
cpu_perc=cpu_perc,
memory=memory,
)

def teardown(self):
self._parent._teardown()
2 changes: 1 addition & 1 deletion gator/babysitter.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import sys
from pathlib import Path

with Path(f"log_{socket.gethostname()}_{os.getpid()}.log").open("w", encoding="utf-8") as fh:
with Path(f"log_{socket.getfqdn()}_{os.getpid()}.log").open("w", encoding="utf-8") as fh:
fh.write(f"Starting process with arguments: {sys.argv[1:]}\n")
fh.flush()
proc = subprocess.Popen(
Expand Down
1 change: 1 addition & 0 deletions gator/common/db_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@ async def get_messages(self, after: int = 0, limit: int = 10) -> ApiMessagesResp
uid=cast(int, x.db_uid),
severity=int(x.severity),
message=x.message,
hierarchy=x.hierarchy,
timestamp=int(x.timestamp.timestamp()),
)
for x in msgs
Expand Down
Loading