Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions swift/megatron/arguments/megatron_args.py
Original file line number Diff line number Diff line change
Expand Up @@ -639,6 +639,10 @@ class MegatronArguments(RLHFMegatronArgumentsMixin, MegatronTunerMixin):
problem_type: Literal['regression', 'single_label_classification', 'multi_label_classification'] = None
save_strategy: Literal['steps', 'epoch'] = 'steps'
callbacks: List[str] = field(default_factory=list)
nsys_profile_start: int = -1 # 1-based; 0 = disabled
nsys_profile_end: int = -1
profiler_type: str = 'none' # nsys or torch

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

It is recommended to use Literal for profiler_type to provide better type safety and validation of the allowed values ('none', 'nsys', 'torch').

Suggested change
profiler_type: str = 'none' # nsys or torch
profiler_type: Literal['none', 'nsys', 'torch'] = 'none' # nsys or torch

profile_rank: Optional[List[int]] = None # global ranks to profile; None = all ranks

@staticmethod
def load_args_config(ckpt_dir: Optional[str]) -> Dict[str, Any]:
Expand Down Expand Up @@ -754,6 +758,9 @@ def __post_init__(self):
self.gradient_accumulation_fusion = False
self.callbacks += ['print', 'default_flow']
self.callbacks += self.report_to
if self.profiler_type != 'none':
cb = 'torch_profiler' if self.profiler_type == 'torch' else 'nsys'
self.callbacks.append(cb)
Comment on lines +761 to +763

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The current logic for adding profiler callbacks is a bit loose. If profiler_type is set to an unexpected string (other than 'none' or 'torch'), it will default to adding the 'nsys' callback. It's better to be explicit.

Suggested change
if self.profiler_type != 'none':
cb = 'torch_profiler' if self.profiler_type == 'torch' else 'nsys'
self.callbacks.append(cb)
if self.profiler_type == 'torch':
self.callbacks.append('torch_profiler')
elif self.profiler_type == 'nsys':
self.callbacks.append('nsys')

if self.save_total_limit is not None:
if self.async_save:
raise ValueError('async_save is not supported with save_total_limit.')
Expand Down
3 changes: 3 additions & 0 deletions swift/megatron/callbacks/mapping.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
# Copyright (c) ModelScope Contributors. All rights reserved.
from .default_flow import DefaultFlowCallback
from .print import PrintCallback
from .profiler import NsysCallback, TorchProfilerCallback
from .swanlab import SwanlabCallback
from .tensorboard import TensorboardCallback
from .wandb import WandbCallback

megatron_callbacks_map = {
'print': PrintCallback,
'default_flow': DefaultFlowCallback,
'nsys': NsysCallback,
'torch_profiler': TorchProfilerCallback,
'swanlab': SwanlabCallback,
'wandb': WandbCallback,
'tensorboard': TensorboardCallback,
Expand Down
105 changes: 105 additions & 0 deletions swift/megatron/callbacks/profiler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
# Copyright (c) ModelScope Contributors. All rights reserved.
import os
import torch
import torch.profiler

from swift.megatron.callbacks.base import MegatronCallback


class NsysCallback(MegatronCallback):
"""Profile steps [nsys_profile_start, nsys_profile_end] via cudaProfilerStart/Stop.

Requires nsys launched with --start-later --capture-range=cudaProfilerApi.
profile_rank: list of global ranks to profile; None = all ranks.
"""

def __init__(self, trainer):
super().__init__(trainer)
self.start_step = getattr(self.args, 'nsys_profile_start', -1)
self.end_step = getattr(self.args, 'nsys_profile_end', -1)
self._local_rank = int(os.environ.get('LOCAL_RANK', 0))
self._global_rank = int(os.environ.get('RANK', 0))
self._profile_ranks = getattr(self.args, 'profile_rank', None)
self._profiling = False

def _should_profile(self):
return self._profile_ranks is None or self._global_rank in self._profile_ranks

def on_step_begin(self):
if not self._should_profile():
return
step = self.state.iteration + 1
if step == self.start_step and not self._profiling:
print(f'[nsys] cudaProfilerStart at step {step} (local_rank={self._local_rank})', flush=True)
torch.cuda.check_error(torch.cuda.cudart().cudaProfilerStart())
self._profiling = True

def on_step_end(self):
if not self._should_profile():
return
step = self.state.iteration
if self._profiling and step >= self.end_step:
print(f'[nsys] cudaProfilerStop after step {step} (local_rank={self._local_rank})', flush=True)
torch.cuda.check_error(torch.cuda.cudart().cudaProfilerStop())
self._profiling = False


class TorchProfilerCallback(MegatronCallback):
"""Profile steps [nsys_profile_start, nsys_profile_end] via torch.profiler.

profile_rank: list of global ranks to profile; None = all ranks.
Saves TensorBoard traces to {tensorboard_dir}/torch_profiler/rank{R}_node{N}/.
Step numbers are 1-based.
"""

def __init__(self, trainer):
super().__init__(trainer)
self.start_step = getattr(self.args, 'nsys_profile_start', 5)
self.end_step = getattr(self.args, 'nsys_profile_end', 5)
Comment on lines +57 to +58

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The default values in getattr (5) are inconsistent with the defaults defined in MegatronArguments (-1) and those used in NsysCallback (-1). While MegatronArguments provides a default, it's safer and clearer to use the same fallback value here.

Suggested change
self.start_step = getattr(self.args, 'nsys_profile_start', 5)
self.end_step = getattr(self.args, 'nsys_profile_end', 5)
self.start_step = getattr(self.args, 'nsys_profile_start', -1)
self.end_step = getattr(self.args, 'nsys_profile_end', -1)

self._local_rank = int(os.environ.get('LOCAL_RANK', 0))
self._global_rank = int(os.environ.get('RANK', 0))
self._node_rank = int(os.environ.get('NODE_RANK', 0))
self._profile_ranks = getattr(self.args, 'profile_rank', None)
self._prof = None
self._trace_dir = None

def _should_profile(self):
return self._profile_ranks is None or self._global_rank in self._profile_ranks

def on_train_begin(self):
if not self._should_profile():
return
wait = max(0, self.start_step - 1)
active = max(1, self.end_step - self.start_step + 1)
base_dir = self.args.output_dir
trace_dir = os.path.join(base_dir, 'torch_profiler', f'rank{self._local_rank}_node{self._node_rank}')
os.makedirs(trace_dir, exist_ok=True)
Comment on lines +70 to +76

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

There are a few issues here:

  1. If nsys_profile_start is set to -1 (the default), the current logic will calculate wait=0 and active=1, causing the Torch profiler to run for the first step even when it should be disabled. You should explicitly check if profiling is enabled (e.g., start_step > 0).
  2. self.args.output_dir can be None. If so, os.path.join will raise a TypeError. Consider providing a default directory (e.g., '.').
        if not self._should_profile() or self.start_step <= 0:
            return
        wait = self.start_step - 1
        active = self.end_step - self.start_step + 1
        if active <= 0:
            return
        base_dir = self.args.output_dir or '.'
        trace_dir = os.path.join(base_dir, 'torch_profiler', f'rank{self._local_rank}_node{self._node_rank}')
        os.makedirs(trace_dir, exist_ok=True)

self._prof = torch.profiler.profile(
schedule=torch.profiler.schedule(wait=wait, warmup=0, active=active, repeat=1),
record_shapes=True,
profile_memory=True,
with_stack=True,
with_flops=True,
)
self._trace_dir = trace_dir
self._prof.__enter__()
print(
f'[torch_profiler] started rank={self._local_rank} node={self._node_rank} '
f'wait={wait} active={active} trace_dir={trace_dir}',
flush=True)

def on_step_end(self):
if self._prof is not None:
self._prof.step()

def on_train_end(self):
if self._prof is not None:
self._prof.__exit__(None, None, None)
chrome_path = os.path.join(self._trace_dir, 'chrome_trace.json')
self._prof.export_chrome_trace(chrome_path)
print(
f'[torch_profiler] trace saved rank={self._local_rank} node={self._node_rank} '
f'chrome={chrome_path}',
flush=True)
self._prof = None
self._trace_dir = None
Loading