Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions pytest.ini
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,8 @@
pythonpath = src
filterwarnings =
ignore::DeprecationWarning:jupyter_client.connect

addopts = -ra
python_files = test_*.py
python_classes = Test*
python_functions = test_*
8 changes: 1 addition & 7 deletions src/plf/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,5 @@


"""
__all__ = [
"context",
"experiment",
"utils",
"lab",
"danger"
]

__all__ = ["context", "experiment", "utils", "lab", "danger"]
162 changes: 72 additions & 90 deletions src/plf/_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,12 @@

import traceback

from .utils import (
load_component,
hash_args,
get_invalid_loc_queries,
Db,
Component)
from .utils import load_component, hash_args, get_invalid_loc_queries, Db, Component

from ._transfer_utils import TransferContext
from .context import get_shared_data, set_shared_data


class CompsDict(TypedDict):
"""
fgfvv
Expand All @@ -41,12 +37,12 @@ def __init__(self, pplid=None):
"""
Initialize the pipeline with default settings and empty components.
"""
self._paths = ['config']
self._paths = ["config"]
self.settings = get_shared_data()

self.pplid = None
self.workflow = None

self.cnfg = None
self._prepared = False
self.__db = Db(db_path=f"{self.settings['data_path']}/ppls.db")
Expand Down Expand Up @@ -78,46 +74,43 @@ def _save_config(self) -> None:
)

def get_path(
self,
of: str,
pplid: Optional[str] = None,
args: Optional[Dict] = None
self, of: str, pplid: Optional[str] = None, args: Optional[Dict] = None
) -> str:
"""
Generate a standardized file path for various experiment artifacts.

Constructs and returns a file path based on the type of file (`of`), experiment ID,
epoch number, and batch index, where applicable. Automatically creates necessary
directories if they do not exist.

Parameters
----------
of : str
The type of file to retrieve the path for. Supported values:
- "config": Configuration file path.
- "weight": Model weights file path.
- "gradient": Saved gradients file path.
- "history": Training history file path.
- "quick": Quick config file path.
pplid : str, optional
Experiment ID. If not provided, uses the currently set `self.pplid`.
epoch : int, optional
Epoch number. Required for weight and gradient file paths.
For weights, if not specified, the best epoch from config is used.
batch : int, optional
Batch index, required for gradient file paths.

Returns
-------
str
Full path to the specified artifact as a string with forward slashes.

Raises
------
ValueError
If `pplid` is not set or invalid.
If required parameters (`epoch`, `batch`) are missing for gradient paths.
If the `of` argument is not one of the supported values.
Generate a standardized file path for various experiment artifacts.

Constructs and returns a file path based on the type of file (`of`), experiment ID,
epoch number, and batch index, where applicable. Automatically creates necessary
directories if they do not exist.

Parameters
----------
of : str
The type of file to retrieve the path for. Supported values:
- "config": Configuration file path.
- "weight": Model weights file path.
- "gradient": Saved gradients file path.
- "history": Training history file path.
- "quick": Quick config file path.
pplid : str, optional
Experiment ID. If not provided, uses the currently set `self.pplid`.
epoch : int, optional
Epoch number. Required for weight and gradient file paths.
For weights, if not specified, the best epoch from config is used.
batch : int, optional
Batch index, required for gradient file paths.

Returns
-------
str
Full path to the specified artifact as a string with forward slashes.

Raises
------
ValueError
If `pplid` is not set or invalid.
If required parameters (`epoch`, `batch`) are missing for gradient paths.
If the `of` argument is not one of the supported values.
"""
pplid = pplid or self.pplid
if not pplid:
Expand All @@ -130,10 +123,10 @@ def get_path(

else:
if self.workflow is None:
self.workflow = self.load_component(**self.cnfg['workflow'])
self.workflow = self.load_component(**self.cnfg["workflow"])
path = self.workflow.get_path(of=of, pplid=pplid, args=args)

path = base_path / path
path = base_path / path
path = path.as_posix()
os.makedirs(os.path.dirname(path), exist_ok=True)
return path
Expand All @@ -149,36 +142,32 @@ def load(self, pplid: str, prepare: bool = False):
with open(cfg_path, encoding="utf-8") as f:
self.cnfg = json.load(f)
self.pplid = pplid


if prepare:
self.prepare()






def reset(self):
"""
reset
"""
self.pplid = None

self.settings = get_shared_data()
self.cnfg = None
self._prepared = False
self.workflow = None
self.__db = Db(db_path=f"{self.settings['data_path']}/ppls.db")

def load_component(self,loc: str, args: Optional[Dict[str, Any]] = None, setup: bool = True):
def load_component(
self, loc: str, args: Optional[Dict[str, Any]] = None, setup: bool = True
):
# if lab_role nopt exists then add the key in json file at self.settings['settings_path]
if self.settings.get("lab_role") != "base":
Tsx = TransferContext()

loc = Tsx.map_loc(loc, pplid=self.pplid)

comp = load_component(loc=loc, args=args, setup=setup)
comp = load_component(loc=loc, args=args, setup=setup)
comp.P = self
return comp

Expand Down Expand Up @@ -214,10 +203,7 @@ def verify(self, *, pplid: str = None, cnfg: Dict = None) -> Union[str, bool]:
if len(result) > 0:
return pplid
elif cnfg:
args = {
'workflow':cnfg['workflow'],
'args': cnfg['args']
}
args = {"workflow": cnfg["workflow"], "args": cnfg["args"]}
args_hash = hash_args(args)
rows = self.__db.query(
"SELECT pplid FROM ppls WHERE args_hash =? LIMIT 1", (args_hash,)
Expand All @@ -236,7 +222,7 @@ def _check_args(self, cnfg):
t = self.verify(cnfg=cnfg)
if t:
raise ValueError(f"same configuration is already exists in: {t}")

def new(
self,
pplid: Optional[str] = None,
Expand Down Expand Up @@ -279,31 +265,28 @@ def new(
if self.verify(pplid=pplid):
raise ValueError(f"{pplid} is already exists try different id")
self._check_args(args)
t = {
"pplid": pplid,
**args
}
t = {"pplid": pplid, **args}

self.pplid = pplid
self.cnfg = t

try:
self.workflow = self.load_component(**args['workflow'])
self.workflow.new(args['args'])
self.workflow = self.load_component(**args["workflow"])
self.workflow.new(args["args"])
except:
traceback.print_exc()

raise

self.__db.execute(
"INSERT INTO ppls (pplid, args_hash) VALUES (?, ?)",
(pplid, hash_args(args)),
)

self._save_config()
self._save_config()
if prepare:
self.prepare()

def prepare(self) -> None:
"""
Prepare the experiment by loading model, optimizer, metrics, loss, and data loaders.
Expand Down Expand Up @@ -333,15 +316,12 @@ def prepare(self) -> None:

self.cnfg = Tsx.map_cnfg(self.cnfg)


self.workflow = self.load_component(**self.cnfg['workflow'])
self.workflow = self.load_component(**self.cnfg["workflow"])
self._prepared = self.workflow.prepare()



except:
traceback.print_exc()

def run(self) -> None:

if not self._prepared:
Expand Down Expand Up @@ -380,56 +360,58 @@ def is_running(self):
if rows:
return rows[0][0]
return False
@property

@property
def should_running(self):
rows = self.__db.query(
"SELECT parity FROM runnings WHERE pplid = ?", (self.pplid,)
)
if rows and rows[0][0]=='stop':
if rows and rows[0][0] == "stop":
return False
return True

def stop_running(self):
logid = self.is_running()
if logid:
self.__db.execute(
"UPDATE runnings SET parity = ? WHERE logid = ?", ('stop', logid)
"UPDATE runnings SET parity = ? WHERE logid = ?", ("stop", logid)
)
print(
f"ppid:{self.pplid} will be stopped at logid:{logid} after current iteration"
)
print(f"ppid:{self.pplid} will be stopped at logid:{logid} after current iteration")
else:
print("it is not running anywhere")

@property
def paths(self):
artifs = self._paths
if self.workflow is None and self.pplid:
self.workflow = self.load_component(**self.cnfg['workflow'])
self.workflow = self.load_component(**self.cnfg["workflow"])
if self.workflow:
artifs += self.workflow.paths
return artifs

def clean(self):
if self.cnfg==None:
if self.cnfg == None:
print("Empty Pipeline")
return
try:
if self.workflow is None:
self.workflow = self.load_component(**self.cnfg['workflow'])
self.workflow = self.load_component(**self.cnfg["workflow"])
self.workflow.clean()
except:
traceback.print_exc()

def status(self):
if self.cnfg==None:
if self.cnfg == None:
print("Empty Pipeline")
return
try:
if self.workflow is None:
self.workflow = self.load_component(**self.cnfg['workflow'])
self.workflow = self.load_component(**self.cnfg["workflow"])
return self.workflow.status()
except:
traceback.print_exc()

from copy import deepcopy

from copy import deepcopy
Loading