Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ jobs:
runs-on: ${{ matrix.os }}
strategy:
matrix:
os: [ubuntu-latest]
os: [ubuntu-latest, macos-latest]
steps:
- uses: actions/checkout@v4
- name: Set up Go
Expand Down
2 changes: 1 addition & 1 deletion runner/cmd/shim/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ func start(ctx context.Context, args shim.CLIArgs, serviceMode bool) (err error)
}

var dcgmExporter *dcgm.DCGMExporter
var dcgmWrapper *dcgm.DCGMWrapper
var dcgmWrapper dcgm.DCGMWrapperInterface

if common.GetGpuVendor() == common.GpuVendorNvidia {
dcgmExporterPath, err := dcgm.GetDCGMExporterExecPath(ctx)
Expand Down
4 changes: 2 additions & 2 deletions runner/internal/shim/api/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,14 @@ type ShimServer struct {
runner TaskRunner

dcgmExporter *dcgm.DCGMExporter
dcgmWrapper *dcgm.DCGMWrapper
dcgmWrapper dcgm.DCGMWrapperInterface

version string
}

func NewShimServer(
ctx context.Context, address string, version string,
runner TaskRunner, dcgmExporter *dcgm.DCGMExporter, dcgmWrapper *dcgm.DCGMWrapper,
runner TaskRunner, dcgmExporter *dcgm.DCGMExporter, dcgmWrapper dcgm.DCGMWrapperInterface,
) *ShimServer {
r := api.NewRouter()
s := &ShimServer{
Expand Down
96 changes: 4 additions & 92 deletions runner/internal/shim/dcgm/wrapper.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,5 @@
package dcgm

import (
"errors"
"fmt"
"sync"

godcgm "github.com/NVIDIA/go-dcgm/pkg/dcgm"
)

type HealthStatus string

const (
Expand All @@ -30,88 +22,8 @@ type Health struct {
Incidents []HealthIncident `json:"incidents"`
}

// DCGMWrapper is a wrapper around go-dcgm (which, in turn, is a wrapper around libdcgm.so)
type DCGMWrapper struct {
group godcgm.GroupHandle
healthCheckEnabled bool

mu *sync.Mutex
}

// NewDCGMWrapper initializes and starts DCGM in the specific mode:
// - If address is empty, then libdcgm starts embedded hostengine within the current process.
// This is the main mode.
// - If address is not empty, then libdcgm connects to already running nv-hostengine service via TCP.
// This mode is useful for debugging, e.g., one can start nv-hostengine via systemd and inject
// errors via dcgmi:
// - systemctl start nvidia-dcgm.service
// - dcgmi test --inject --gpuid 0 -f 202 -v 99999
//
// Note: embedded hostengine is started in AUTO operation mode, which means that
// the library handles periodic tasks by itself executing them in additional threads.
func NewDCGMWrapper(address string) (*DCGMWrapper, error) {
var err error
if address == "" {
_, err = godcgm.Init(godcgm.Embedded)
} else {
// "address is a unix socket filename (1) or a TCP/IP address (0)"
_, err = godcgm.Init(godcgm.Standalone, address, "0")
}
if err != nil {
return nil, fmt.Errorf("failed to initialize or start DCGM: %w", err)
}
return &DCGMWrapper{
group: godcgm.GroupAllGPUs(),
mu: new(sync.Mutex),
}, nil
}

func (w *DCGMWrapper) Shutdown() error {
if err := godcgm.Shutdown(); err != nil {
return fmt.Errorf("failed to shut down DCGM: %w", err)
}
return nil
}

func (w *DCGMWrapper) EnableHealthChecks() error {
w.mu.Lock()
defer w.mu.Unlock()
if w.healthCheckEnabled {
return errors.New("health check system already enabled")
}
if err := godcgm.HealthSet(w.group, godcgm.DCGM_HEALTH_WATCH_ALL); err != nil {
return fmt.Errorf("failed to configure health watches: %w", err)
}
// "On the first call, stateful information about all of the enabled watches within a group
// is created but no error results are provided. On subsequent calls, any error information
// will be returned."
if _, err := godcgm.HealthCheck(w.group); err != nil {
return fmt.Errorf("failed to initialize health watches state: %w", err)
}
w.healthCheckEnabled = true
return nil
}

func (w *DCGMWrapper) GetHealth() (Health, error) {
health := Health{}
if !w.healthCheckEnabled {
return health, errors.New("health check system is not enabled")
}
response, err := godcgm.HealthCheck(w.group)
if err != nil {
return health, fmt.Errorf("failed to fetch health status: %w", err)
}
health.OverallHealth = int(response.OverallHealth)
health.Incidents = make([]HealthIncident, 0, len(response.Incidents))
for _, incident := range response.Incidents {
health.Incidents = append(health.Incidents, HealthIncident{
System: int(incident.System),
Health: int(incident.Health),
ErrorMessage: incident.Error.Message,
ErrorCode: int(incident.Error.Code),
EntityGroupID: int(incident.EntityInfo.EntityGroupId),
EntityID: int(incident.EntityInfo.EntityId),
})
}
return health, nil
type DCGMWrapperInterface interface {
Shutdown() error
EnableHealthChecks() error
GetHealth() (Health, error)
}
9 changes: 9 additions & 0 deletions runner/internal/shim/dcgm/wrapper_darwin.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
//go:build darwin

package dcgm

import "errors"

func NewDCGMWrapper(address string) (DCGMWrapperInterface, error) {
return nil, errors.New("macOS is not supported")
}
97 changes: 97 additions & 0 deletions runner/internal/shim/dcgm/wrapper_linux.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
//go:build linux

package dcgm

import (
"errors"
"fmt"
"sync"

godcgm "github.com/NVIDIA/go-dcgm/pkg/dcgm"
)

// DCGMWrapper is a wrapper around go-dcgm (which, in turn, is a wrapper around libdcgm.so)
type DCGMWrapper struct {
group godcgm.GroupHandle
healthCheckEnabled bool

mu *sync.Mutex
}

// NewDCGMWrapper initializes and starts DCGM in the specific mode:
// - If address is empty, then libdcgm starts embedded hostengine within the current process.
// This is the main mode.
// - If address is not empty, then libdcgm connects to already running nv-hostengine service via TCP.
// This mode is useful for debugging, e.g., one can start nv-hostengine via systemd and inject
// errors via dcgmi:
// - systemctl start nvidia-dcgm.service
// - dcgmi test --inject --gpuid 0 -f 202 -v 99999
//
// Note: embedded hostengine is started in AUTO operation mode, which means that
// the library handles periodic tasks by itself executing them in additional threads.
func NewDCGMWrapper(address string) (*DCGMWrapper, error) {
var err error
if address == "" {
_, err = godcgm.Init(godcgm.Embedded)
} else {
// "address is a unix socket filename (1) or a TCP/IP address (0)"
_, err = godcgm.Init(godcgm.Standalone, address, "0")
}
if err != nil {
return nil, fmt.Errorf("failed to initialize or start DCGM: %w", err)
}
return &DCGMWrapper{
group: godcgm.GroupAllGPUs(),
mu: new(sync.Mutex),
}, nil
}

func (w *DCGMWrapper) Shutdown() error {
if err := godcgm.Shutdown(); err != nil {
return fmt.Errorf("failed to shut down DCGM: %w", err)
}
return nil
}

func (w *DCGMWrapper) EnableHealthChecks() error {
w.mu.Lock()
defer w.mu.Unlock()
if w.healthCheckEnabled {
return errors.New("health check system already enabled")
}
if err := godcgm.HealthSet(w.group, godcgm.DCGM_HEALTH_WATCH_ALL); err != nil {
return fmt.Errorf("failed to configure health watches: %w", err)
}
// "On the first call, stateful information about all of the enabled watches within a group
// is created but no error results are provided. On subsequent calls, any error information
// will be returned."
if _, err := godcgm.HealthCheck(w.group); err != nil {
return fmt.Errorf("failed to initialize health watches state: %w", err)
}
w.healthCheckEnabled = true
return nil
}

func (w *DCGMWrapper) GetHealth() (Health, error) {
health := Health{}
if !w.healthCheckEnabled {
return health, errors.New("health check system is not enabled")
}
response, err := godcgm.HealthCheck(w.group)
if err != nil {
return health, fmt.Errorf("failed to fetch health status: %w", err)
}
health.OverallHealth = int(response.OverallHealth)
health.Incidents = make([]HealthIncident, 0, len(response.Incidents))
for _, incident := range response.Incidents {
health.Incidents = append(health.Incidents, HealthIncident{
System: int(incident.System),
Health: int(incident.Health),
ErrorMessage: incident.Error.Message,
ErrorCode: int(incident.Error.Code),
EntityGroupID: int(incident.EntityInfo.EntityGroupId),
EntityID: int(incident.EntityInfo.EntityId),
})
}
return health, nil
}
6 changes: 4 additions & 2 deletions runner/internal/shim/dcgm/wrapper_test.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
//go:build linux

package dcgm

import (
Expand Down Expand Up @@ -49,7 +51,7 @@ func TestDCGMWrapperGetHealth(t *testing.T) {

// Utils. Must be called after NewDCGMWrapper(), as it indirectly calls dlopen("libdcgm.so.4")

func getDCGMWrapper(t *testing.T) *DCGMWrapper {
func getDCGMWrapper(t *testing.T) DCGMWrapperInterface {
dcgmw, err := NewDCGMWrapper("")
if err != nil && strings.Contains(err.Error(), "libdcgm.so") {
t.Skip("Skipping test that requires ligdcm.so")
Expand All @@ -73,7 +75,7 @@ func getGpuID(t *testing.T) uint {
return gpuIDs[0]
}

func injectError(t *testing.T, gpuID uint, fieldID godcgm.Short, fieldType uint, value interface{}) {
func injectError(t *testing.T, gpuID uint, fieldID godcgm.Short, fieldType uint, value any) {
t.Helper()
err := godcgm.InjectFieldValue(gpuID, fieldID, fieldType, 0, time.Now().UnixMicro(), value)
require.NoError(t, err)
Expand Down
Loading