diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 70b02fc6a9..8a97c67c7d 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -126,7 +126,7 @@ jobs: runs-on: ${{ matrix.os }} strategy: matrix: - os: [ubuntu-latest] + os: [ubuntu-latest, macos-latest] steps: - uses: actions/checkout@v4 - name: Set up Go diff --git a/runner/cmd/shim/main.go b/runner/cmd/shim/main.go index ba6249490d..3ca57a0ab6 100644 --- a/runner/cmd/shim/main.go +++ b/runner/cmd/shim/main.go @@ -204,7 +204,7 @@ func start(ctx context.Context, args shim.CLIArgs, serviceMode bool) (err error) } var dcgmExporter *dcgm.DCGMExporter - var dcgmWrapper *dcgm.DCGMWrapper + var dcgmWrapper dcgm.DCGMWrapperInterface if common.GetGpuVendor() == common.GpuVendorNvidia { dcgmExporterPath, err := dcgm.GetDCGMExporterExecPath(ctx) diff --git a/runner/internal/shim/api/server.go b/runner/internal/shim/api/server.go index 62052e6b8f..e012a0b76c 100644 --- a/runner/internal/shim/api/server.go +++ b/runner/internal/shim/api/server.go @@ -29,14 +29,14 @@ type ShimServer struct { runner TaskRunner dcgmExporter *dcgm.DCGMExporter - dcgmWrapper *dcgm.DCGMWrapper + dcgmWrapper dcgm.DCGMWrapperInterface version string } func NewShimServer( ctx context.Context, address string, version string, - runner TaskRunner, dcgmExporter *dcgm.DCGMExporter, dcgmWrapper *dcgm.DCGMWrapper, + runner TaskRunner, dcgmExporter *dcgm.DCGMExporter, dcgmWrapper dcgm.DCGMWrapperInterface, ) *ShimServer { r := api.NewRouter() s := &ShimServer{ diff --git a/runner/internal/shim/dcgm/wrapper.go b/runner/internal/shim/dcgm/wrapper.go index 749060128c..bc1979a858 100644 --- a/runner/internal/shim/dcgm/wrapper.go +++ b/runner/internal/shim/dcgm/wrapper.go @@ -1,13 +1,5 @@ package dcgm -import ( - "errors" - "fmt" - "sync" - - godcgm "github.com/NVIDIA/go-dcgm/pkg/dcgm" -) - type HealthStatus string const ( @@ -30,88 +22,8 @@ type Health struct { Incidents []HealthIncident `json:"incidents"` } -// DCGMWrapper is a wrapper around go-dcgm (which, in turn, is a wrapper around libdcgm.so) -type DCGMWrapper struct { - group godcgm.GroupHandle - healthCheckEnabled bool - - mu *sync.Mutex -} - -// NewDCGMWrapper initializes and starts DCGM in the specific mode: -// - If address is empty, then libdcgm starts embedded hostengine within the current process. -// This is the main mode. -// - If address is not empty, then libdcgm connects to already running nv-hostengine service via TCP. -// This mode is useful for debugging, e.g., one can start nv-hostengine via systemd and inject -// errors via dcgmi: -// - systemctl start nvidia-dcgm.service -// - dcgmi test --inject --gpuid 0 -f 202 -v 99999 -// -// Note: embedded hostengine is started in AUTO operation mode, which means that -// the library handles periodic tasks by itself executing them in additional threads. -func NewDCGMWrapper(address string) (*DCGMWrapper, error) { - var err error - if address == "" { - _, err = godcgm.Init(godcgm.Embedded) - } else { - // "address is a unix socket filename (1) or a TCP/IP address (0)" - _, err = godcgm.Init(godcgm.Standalone, address, "0") - } - if err != nil { - return nil, fmt.Errorf("failed to initialize or start DCGM: %w", err) - } - return &DCGMWrapper{ - group: godcgm.GroupAllGPUs(), - mu: new(sync.Mutex), - }, nil -} - -func (w *DCGMWrapper) Shutdown() error { - if err := godcgm.Shutdown(); err != nil { - return fmt.Errorf("failed to shut down DCGM: %w", err) - } - return nil -} - -func (w *DCGMWrapper) EnableHealthChecks() error { - w.mu.Lock() - defer w.mu.Unlock() - if w.healthCheckEnabled { - return errors.New("health check system already enabled") - } - if err := godcgm.HealthSet(w.group, godcgm.DCGM_HEALTH_WATCH_ALL); err != nil { - return fmt.Errorf("failed to configure health watches: %w", err) - } - // "On the first call, stateful information about all of the enabled watches within a group - // is created but no error results are provided. On subsequent calls, any error information - // will be returned." - if _, err := godcgm.HealthCheck(w.group); err != nil { - return fmt.Errorf("failed to initialize health watches state: %w", err) - } - w.healthCheckEnabled = true - return nil -} - -func (w *DCGMWrapper) GetHealth() (Health, error) { - health := Health{} - if !w.healthCheckEnabled { - return health, errors.New("health check system is not enabled") - } - response, err := godcgm.HealthCheck(w.group) - if err != nil { - return health, fmt.Errorf("failed to fetch health status: %w", err) - } - health.OverallHealth = int(response.OverallHealth) - health.Incidents = make([]HealthIncident, 0, len(response.Incidents)) - for _, incident := range response.Incidents { - health.Incidents = append(health.Incidents, HealthIncident{ - System: int(incident.System), - Health: int(incident.Health), - ErrorMessage: incident.Error.Message, - ErrorCode: int(incident.Error.Code), - EntityGroupID: int(incident.EntityInfo.EntityGroupId), - EntityID: int(incident.EntityInfo.EntityId), - }) - } - return health, nil +type DCGMWrapperInterface interface { + Shutdown() error + EnableHealthChecks() error + GetHealth() (Health, error) } diff --git a/runner/internal/shim/dcgm/wrapper_darwin.go b/runner/internal/shim/dcgm/wrapper_darwin.go new file mode 100644 index 0000000000..590e9e9bd0 --- /dev/null +++ b/runner/internal/shim/dcgm/wrapper_darwin.go @@ -0,0 +1,9 @@ +//go:build darwin + +package dcgm + +import "errors" + +func NewDCGMWrapper(address string) (DCGMWrapperInterface, error) { + return nil, errors.New("macOS is not supported") +} diff --git a/runner/internal/shim/dcgm/wrapper_linux.go b/runner/internal/shim/dcgm/wrapper_linux.go new file mode 100644 index 0000000000..23d9ad5b48 --- /dev/null +++ b/runner/internal/shim/dcgm/wrapper_linux.go @@ -0,0 +1,97 @@ +//go:build linux + +package dcgm + +import ( + "errors" + "fmt" + "sync" + + godcgm "github.com/NVIDIA/go-dcgm/pkg/dcgm" +) + +// DCGMWrapper is a wrapper around go-dcgm (which, in turn, is a wrapper around libdcgm.so) +type DCGMWrapper struct { + group godcgm.GroupHandle + healthCheckEnabled bool + + mu *sync.Mutex +} + +// NewDCGMWrapper initializes and starts DCGM in the specific mode: +// - If address is empty, then libdcgm starts embedded hostengine within the current process. +// This is the main mode. +// - If address is not empty, then libdcgm connects to already running nv-hostengine service via TCP. +// This mode is useful for debugging, e.g., one can start nv-hostengine via systemd and inject +// errors via dcgmi: +// - systemctl start nvidia-dcgm.service +// - dcgmi test --inject --gpuid 0 -f 202 -v 99999 +// +// Note: embedded hostengine is started in AUTO operation mode, which means that +// the library handles periodic tasks by itself executing them in additional threads. +func NewDCGMWrapper(address string) (*DCGMWrapper, error) { + var err error + if address == "" { + _, err = godcgm.Init(godcgm.Embedded) + } else { + // "address is a unix socket filename (1) or a TCP/IP address (0)" + _, err = godcgm.Init(godcgm.Standalone, address, "0") + } + if err != nil { + return nil, fmt.Errorf("failed to initialize or start DCGM: %w", err) + } + return &DCGMWrapper{ + group: godcgm.GroupAllGPUs(), + mu: new(sync.Mutex), + }, nil +} + +func (w *DCGMWrapper) Shutdown() error { + if err := godcgm.Shutdown(); err != nil { + return fmt.Errorf("failed to shut down DCGM: %w", err) + } + return nil +} + +func (w *DCGMWrapper) EnableHealthChecks() error { + w.mu.Lock() + defer w.mu.Unlock() + if w.healthCheckEnabled { + return errors.New("health check system already enabled") + } + if err := godcgm.HealthSet(w.group, godcgm.DCGM_HEALTH_WATCH_ALL); err != nil { + return fmt.Errorf("failed to configure health watches: %w", err) + } + // "On the first call, stateful information about all of the enabled watches within a group + // is created but no error results are provided. On subsequent calls, any error information + // will be returned." + if _, err := godcgm.HealthCheck(w.group); err != nil { + return fmt.Errorf("failed to initialize health watches state: %w", err) + } + w.healthCheckEnabled = true + return nil +} + +func (w *DCGMWrapper) GetHealth() (Health, error) { + health := Health{} + if !w.healthCheckEnabled { + return health, errors.New("health check system is not enabled") + } + response, err := godcgm.HealthCheck(w.group) + if err != nil { + return health, fmt.Errorf("failed to fetch health status: %w", err) + } + health.OverallHealth = int(response.OverallHealth) + health.Incidents = make([]HealthIncident, 0, len(response.Incidents)) + for _, incident := range response.Incidents { + health.Incidents = append(health.Incidents, HealthIncident{ + System: int(incident.System), + Health: int(incident.Health), + ErrorMessage: incident.Error.Message, + ErrorCode: int(incident.Error.Code), + EntityGroupID: int(incident.EntityInfo.EntityGroupId), + EntityID: int(incident.EntityInfo.EntityId), + }) + } + return health, nil +} diff --git a/runner/internal/shim/dcgm/wrapper_test.go b/runner/internal/shim/dcgm/wrapper_test.go index 93c5b12ef6..10b16bafa6 100644 --- a/runner/internal/shim/dcgm/wrapper_test.go +++ b/runner/internal/shim/dcgm/wrapper_test.go @@ -1,3 +1,5 @@ +//go:build linux + package dcgm import ( @@ -49,7 +51,7 @@ func TestDCGMWrapperGetHealth(t *testing.T) { // Utils. Must be called after NewDCGMWrapper(), as it indirectly calls dlopen("libdcgm.so.4") -func getDCGMWrapper(t *testing.T) *DCGMWrapper { +func getDCGMWrapper(t *testing.T) DCGMWrapperInterface { dcgmw, err := NewDCGMWrapper("") if err != nil && strings.Contains(err.Error(), "libdcgm.so") { t.Skip("Skipping test that requires ligdcm.so") @@ -73,7 +75,7 @@ func getGpuID(t *testing.T) uint { return gpuIDs[0] } -func injectError(t *testing.T, gpuID uint, fieldID godcgm.Short, fieldType uint, value interface{}) { +func injectError(t *testing.T, gpuID uint, fieldID godcgm.Short, fieldType uint, value any) { t.Helper() err := godcgm.InjectFieldValue(gpuID, fieldID, fieldType, 0, time.Now().UnixMicro(), value) require.NoError(t, err)