Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -192,3 +192,9 @@ checkpoints

# MAS (madragonse)
*mastest*

*.csv
tmp/
data_long_ctx/
data_test/
checkpoint/
8 changes: 4 additions & 4 deletions configs/_misc/default.yaml
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@

infrastructure:
metric_logger:
project_name: pmtest/llm-random
heavy_metrics_calculation_interval: 100
new_neptune_job: true
new_wandb_job: true
type: neptune
name: default
type: wandb
wandb_entity: ideas_cv
project_name: llm-random-test
tags:
- nano
- new_wandb_job

git:
remote_name: cemetery
Expand Down
10 changes: 5 additions & 5 deletions configs/tiny_local.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,15 @@ defaults:
- _dataset@_here_: local_dummy
- _checkpoints@_here_: none
- _misc@_here_: default
- _eval@_here_: basic
# - _eval@_here_: basic

common:
sequence_length: 16
batch_size: 2

trainer:
gradient_accumulation_steps: 1
n_steps: 100
n_steps: 201
learning_rate: 1e-3

checkpoint:
Expand All @@ -29,6 +29,6 @@ infrastructure:
- local
- tiny

evaluator:
limit: 1
device: cpu
# evaluator:
# limit: 1
# device: cpu
31 changes: 5 additions & 26 deletions main.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,12 @@
import logging
from hydra.utils import instantiate
import logging
from neptune.integrations.python_logger import NeptuneHandler
from src.core.checkpointing import (
load_checkpoint_from_file,
load_training_state,
get_full_checkpoint_path,
)
from src.core.metric_loggers import NeptuneLogger, WandbLogger, get_metric_logger
from src.core.metric_loggers import WandbLogger, get_metric_logger
from src.core.model import Residual
import platform

Expand Down Expand Up @@ -61,9 +60,7 @@ def upload_config_file(metric_logger):
slurm_array_task_id = os.environ.get("SLURM_ARRAY_TASK_ID")
file_path = f"generated_configs/config_{slurm_array_task_id}.yaml"
if slurm_array_task_id is not None and os.path.exists(file_path):
metric_logger.run["yaml_config"].upload(
f"generated_configs/config_{slurm_array_task_id}.yaml"
)
metric_logger.run.save(file_path)


def check_env_vars():
Expand Down Expand Up @@ -179,8 +176,8 @@ def log_environs(metric_logger):
]

environs = os.environ
for environ_key in scrap_keys:
metric_logger.run[f"job/{environ_key}"] = str(environs.get(environ_key))
env_dict = {f"job/{k}": str(environs.get(k)) for k in scrap_keys}
metric_logger.run.config.update(env_dict)


def get_device():
Expand Down Expand Up @@ -219,27 +216,9 @@ def initialize_training_components(cfg: OmegaConf, metric_logger=None):
full_config=cfg,
)

# Other loggers do not have `run` method
if isinstance(metric_logger, NeptuneLogger):
npt_handler = NeptuneHandler(run=metric_logger.run)
logger.addHandler(npt_handler)

learning_rate, exp_lr = solve_config_lr(cfg.trainer.learning_rate)

if isinstance(metric_logger, NeptuneLogger) and (
training_state["run_id"] is None
or cfg.infrastructure.metric_logger.new_neptune_job
):
metric_logger.run["job_config"] = cfg
upload_config_file(metric_logger)
log_environs(metric_logger)
metric_logger.run[f"job/full_save_checkpoints_path"] = get_full_checkpoint_path(
cfg.trainer.checkpoint.save.path
)
metric_logger.run["learning_rate"] = learning_rate
metric_logger.run["exp_lr"] = exp_lr

elif isinstance(metric_logger, WandbLogger) and (
if isinstance(metric_logger, WandbLogger) and (
training_state["run_id"] is None
or cfg.infrastructure.metric_logger.new_wandb_job
):
Expand Down
3,629 changes: 1,565 additions & 2,064 deletions pixi.lock

Large diffs are not rendered by default.

1 change: 0 additions & 1 deletion pixi.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ python = "==3.12.3"
[pypi-dependencies]
transformers = "==4.55.0"
datasets = "==3.1.0"
neptune = "==1.13.0"
submitit = "==1.5.2"
hydra-core = "==1.3.2"
hydra-submitit-launcher = "==1.2.0"
Expand Down
5 changes: 0 additions & 5 deletions run_exp.py
Original file line number Diff line number Diff line change
Expand Up @@ -204,11 +204,6 @@ def submit_experiment(
cemetery_dir = cfg.infrastructure.cemetery_experiments_dir
connection.run(f"mkdir -p {cemetery_dir}")

if "NEPTUNE_API_TOKEN" in os.environ:
connection.config["run"]["env"]["NEPTUNE_API_TOKEN"] = os.environ[
"NEPTUNE_API_TOKEN"
]

if "WANDB_API_KEY" in os.environ:
connection.config["run"]["env"]["WANDB_API_KEY"] = os.environ[
"WANDB_API_KEY"
Expand Down
6 changes: 2 additions & 4 deletions src/core/checkpointing.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from torch.distributed.checkpoint.state_dict import get_state_dict, set_state_dict
import torch.distributed.checkpoint as dcp

from src.core.metric_loggers import NeptuneLogger, WandbLogger
from src.core.metric_loggers import WandbLogger
import logging

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -50,9 +50,7 @@ def save_training_state(
metric_logger=None,
):
run_id = None
if isinstance(metric_logger, NeptuneLogger):
run_id = metric_logger.run["sys/id"].fetch()
elif isinstance(metric_logger, WandbLogger):
if isinstance(metric_logger, WandbLogger):
run_id = metric_logger.run.id

path = step_checkpoint_path(save_config.path, step)
Expand Down
6 changes: 3 additions & 3 deletions src/core/eval.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,10 @@ def eval(self):
self.log_eval(results)

def log_eval(self, eval_results: dict):
"""Log evaluation results to Neptune."""
"""Log evaluation results."""
for task_name, metrics in eval_results["results"].items():
for metric_name, value in metrics.items():
clean_metric_name = metric_name.replace(",none", "")
self.metric_logger.run[f"eval/{task_name}/{clean_metric_name}"] = value
self.metric_logger.log(f"eval/{task_name}/{clean_metric_name}", value)

self.metric_logger.run["eval/limit"] = eval_results["config"]["limit"]
self.metric_logger.log("eval/limit", eval_results["config"]["limit"])
108 changes: 39 additions & 69 deletions src/core/metric_loggers.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import os
import statistics
import neptune
from omegaconf import OmegaConf
import wandb
import torch
Expand All @@ -20,6 +19,7 @@ def __init__(self, config: Optional[OmegaConf] = None):
)
self.accumulators = {}
self.step = 0
self.tokens = 0

@property
def _should_log_heavy_metrics(self) -> bool:
Expand All @@ -30,7 +30,7 @@ def _should_log_heavy_metrics(self) -> bool:
)

@abstractmethod
def log(self, name, step, value):
def log(self, name, value):
pass

def accumulate_metrics(self, layer_name, calculate_fn, metrics, transform_fn=None):
Expand All @@ -43,16 +43,22 @@ def accumulate_metrics(self, layer_name, calculate_fn, metrics, transform_fn=Non
else:
self.accumulators[layer_name].append(metrics)

def flush_accumulated_metrics(self, step):
def flush_accumulated_metrics(self):
if self._should_log_heavy_metrics:
for name, accumulator in self.accumulators.items():
for metric, result in accumulator.calculate().items():
self.log(f"steps/{name}/{metric}", step, result)
self.log(f"{name}/{metric}", result)
accumulator.reset()

def flush(self):
pass

def set_step(self, step):
self.step = step

def set_tokens(self, tokens):
self.tokens = tokens


class MetricAccumulator:
def __init__(self, calculate_fn, metrics):
Expand All @@ -76,30 +82,32 @@ def reset(self):


class DummyLogger(MetricLogger):
def log(self, _name, _step, _value):
def log(self, _name, _value):
pass


class NeptuneLogger(MetricLogger):
def __init__(self, run, rank, config=None):
super().__init__(config)
self.run = run
self.rank = rank

def log(self, name, step, value):
if self.rank == 0:
self.run[name].append(value=value, step=step)


class WandbLogger(MetricLogger):
def __init__(self, run, should_log, config=None):
super().__init__(config)
self.run = run
self.should_log = should_log
self._pending = {}

if self.should_log and self.run is not None:
wandb.define_metric("steps/*", step_metric="step")
wandb.define_metric("tokens/*", step_metric="token_count")

def log(self, name, step, value):
def log(self, name, value):
if self.should_log:
self.run.log({name: value}, step=step)
self._pending[f"steps/{name}"] = value
self._pending[f"tokens/{name}"] = value

def flush(self):
if self.should_log and self._pending:
self._pending["step"] = self.step
self._pending["token_count"] = self.tokens
self.run.log(self._pending)
self._pending = {}


class StdoutLogger(MetricLogger):
Expand All @@ -108,19 +116,21 @@ def __init__(self, config=None):
self.rank = os.environ.get("RANK", 0)
logger.info("Logging to stdout.")

def log(self, name, step, value):
logger.info(f"[device:{self.rank}] on step:{step} -> {name}: {value}")
def log(self, name, value):
logger.info(
f"[device:{self.rank}] step:{self.step} tokens:{self.tokens} -> {name}: {value}"
)


class RecorderLogger(MetricLogger):
def __init__(self, config=None):
super().__init__(config)
self.data = {}

def log(self, name, step, value):
def log(self, name, value):
if name not in self.data:
self.data[name] = []
self.data[name].append((value, step))
self.data[name].append((value, self.step))

def clear(self):
self.data = {}
Expand All @@ -146,45 +156,8 @@ def get_metric_logger(
full_config: Optional[OmegaConf] = None,
):
_metric_logger = None
if metric_logger_config.type == "neptune":
neptune_run_id = (
None if metric_logger_config.new_neptune_job else tracker_run_id
)
rank = int(os.environ["RANK"])

if rank == 0:
neptune_logger = neptune.init_run(
project=metric_logger_config.project_name,
name=metric_logger_config.name,
tags=list(metric_logger_config.tags),
with_id=neptune_run_id,
)
_metric_logger = NeptuneLogger(neptune_logger, rank, metric_logger_config)

if int(os.environ["WORLD_SIZE"]) > 1:
run_id_container = [None]
if neptune_run_id is None:
neptune_run_id = neptune_logger["sys/id"].fetch()

run_id_container[0] = neptune_run_id
dist.broadcast_object_list(run_id_container, src=0)
else:
run_id_container = [neptune_run_id]
dist.broadcast_object_list(run_id_container, src=0)
neptune_run_id = run_id_container[0]

neptune_logger = neptune.init_run(
project=metric_logger_config.project_name,
with_id=neptune_run_id,
capture_hardware_metrics=False,
name=metric_logger_config.name,
tags=list(metric_logger_config.tags),
)
_metric_logger = NeptuneLogger(neptune_logger, rank, metric_logger_config)
elif metric_logger_config.type == "wandb":
if os.environ.get("WORLD_SIZE") != os.environ.get("LOCAL_WORLD_SIZE"):
# TODO: Implement W&B multinode logging (https://docs.wandb.ai/models/track/log/distributed-training)
raise NotImplementedError("W&B multinode logging is not implemented yet.")
if metric_logger_config.type == "wandb":
wandb_run_id = None if metric_logger_config.new_wandb_job else tracker_run_id
rank = int(os.environ.get("RANK", 0))
if rank == 0:
Expand Down Expand Up @@ -229,22 +202,19 @@ def __init__(self, average_tail_len, name):
self.tail_len = average_tail_len
self.metric_stack = []

def log(self, mlogger: MetricLogger, step, metric_val):
do_log = False
def log(self, mlogger: MetricLogger, metric_val):
self.metric_stack.append(metric_val)
while len(self.metric_stack) > self.tail_len:
self.metric_stack.pop(0)
do_log = True
if do_log:
mlogger.log(self.name, step, statistics.mean(self.metric_stack))
if len(self.metric_stack) >= self.tail_len:
mlogger.log(self.name, statistics.mean(self.metric_stack))
self.metric_stack = []


class AveDiffMetric(AveMetric):
def __init__(self, average_tail_len, name, first_metric_val):
super().__init__(average_tail_len, name)
self.last_metric_val = first_metric_val

def log(self, mlogger, step, metric_val):
def log(self, mlogger, metric_val):
metric_val_diff = metric_val - self.last_metric_val
self.last_metric_val = metric_val
super().log(mlogger, step, metric_val_diff)
super().log(mlogger, metric_val_diff)
Loading
Loading