Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion core/gallery/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ func InstallModel(ctx context.Context, systemState *system.SystemState, nameOver
}
}
uri := downloader.URI(file.URI)
if err := uri.DownloadFileWithContext(ctx, filePath, file.SHA256, i, len(config.Files), downloadStatus); err != nil {
if err := uri.DownloadFileWithContext(downloader.ContextWithModelID(ctx, config.Name), filePath, file.SHA256, i, len(config.Files), downloadStatus); err != nil {
return nil, err
}
}
Expand Down
146 changes: 146 additions & 0 deletions core/http/routes/ui_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,47 @@ func getDirectorySize(path string) (int64, error) {
return totalSize, nil
}

// parseRateString converts a human-readable bandwidth string (e.g. "2mb",
// "500kb", "10mb") to bytes per second. Returns ≤ 0 for unlimited.
func parseRateString(s string) (int64, error) {
s = strings.TrimSpace(strings.ToLower(s))
if s == "" || s == "0" || s == "unlimited" || s == "-1" {
return 0, nil
}
var multiplier int64 = 1
switch {
case strings.HasSuffix(s, "gb"):
multiplier = 1 << 30
s = strings.TrimSuffix(s, "gb")
case strings.HasSuffix(s, "g"):
multiplier = 1 << 30
s = strings.TrimSuffix(s, "g")
case strings.HasSuffix(s, "mb"):
multiplier = 1 << 20
s = strings.TrimSuffix(s, "mb")
case strings.HasSuffix(s, "m"):
multiplier = 1 << 20
s = strings.TrimSuffix(s, "m")
case strings.HasSuffix(s, "kb"):
multiplier = 1 << 10
s = strings.TrimSuffix(s, "kb")
case strings.HasSuffix(s, "k"):
multiplier = 1 << 10
s = strings.TrimSuffix(s, "k")
case strings.HasSuffix(s, "b"):
multiplier = 1
s = strings.TrimSuffix(s, "b")
}
val, err := strconv.ParseInt(s, 10, 64)
if err != nil {
return 0, fmt.Errorf("cannot parse %q as a number", s)
}
if val <= 0 {
return 0, nil
}
return val * multiplier, nil
}

// RegisterUIAPIRoutes registers JSON API routes for the web UI
func RegisterUIAPIRoutes(app *echo.Echo, cl *config.ModelConfigLoader, ml *model.ModelLoader, appConfig *config.ApplicationConfig, galleryService *galleryop.GalleryService, opcache *galleryop.OpCache, applicationInstance *application.Application, adminMiddleware echo.MiddlewareFunc) {

Expand Down Expand Up @@ -362,6 +403,80 @@ func RegisterUIAPIRoutes(app *echo.Echo, cl *config.ModelConfigLoader, ml *model
})
}, adminMiddleware)

// Pause operation endpoint (admin only)
app.POST("/api/operations/:jobID/pause", func(c echo.Context) error {
jobID := c.Param("jobID")
xlog.Debug("API request to pause operation", "jobID", jobID)

err := galleryService.PauseOperation(jobID)
if err != nil {
xlog.Error("Failed to pause operation", "error", err, "jobID", jobID)
return c.JSON(http.StatusBadRequest, map[string]any{
"error": err.Error(),
})
}

return c.JSON(200, map[string]any{
"success": true,
"message": "Operation paused",
})
}, adminMiddleware)

// Resume operation endpoint (admin only)
app.POST("/api/operations/:jobID/resume", func(c echo.Context) error {
jobID := c.Param("jobID")
xlog.Debug("API request to resume operation", "jobID", jobID)

err := galleryService.ResumeOperation(jobID)
if err != nil {
xlog.Error("Failed to resume operation", "error", err, "jobID", jobID)
return c.JSON(http.StatusBadRequest, map[string]any{
"error": err.Error(),
})
}

return c.JSON(200, map[string]any{
"success": true,
"message": "Operation resumed",
})
}, adminMiddleware)

// Pause all operations (admin only)
app.POST("/api/operations/pause-all", func(c echo.Context) error {
xlog.Debug("API request to pause all operations")

err := galleryService.PauseAllOperations()
if err != nil {
xlog.Error("Failed to pause all operations", "error", err)
return c.JSON(http.StatusBadRequest, map[string]any{
"error": err.Error(),
})
}

return c.JSON(200, map[string]any{
"success": true,
"message": "All operations paused",
})
}, adminMiddleware)

// Resume all operations (admin only)
app.POST("/api/operations/resume-all", func(c echo.Context) error {
xlog.Debug("API request to resume all operations")

err := galleryService.ResumeAllOperations()
if err != nil {
xlog.Error("Failed to resume all operations", "error", err)
return c.JSON(http.StatusBadRequest, map[string]any{
"error": err.Error(),
})
}

return c.JSON(200, map[string]any{
"success": true,
"message": "All operations resumed",
})
}, adminMiddleware)

// Dismiss a failed operation (acknowledge the error and remove it from the list)
app.POST("/api/operations/:jobID/dismiss", func(c echo.Context) error {
jobID := c.Param("jobID")
Expand All @@ -376,6 +491,37 @@ func RegisterUIAPIRoutes(app *echo.Echo, cl *config.ModelConfigLoader, ml *model
})
}, adminMiddleware)

// Throttle (rate-limit) an active download (admin only)
// Query param: ?rate=2mb or ?rate=500kb. Use 0 or -1 to remove the limit.
app.POST("/api/operations/:jobID/throttle", func(c echo.Context) error {
jobID := c.Param("jobID")
rateStr := c.QueryParam("rate")
if rateStr == "" {
return c.JSON(http.StatusBadRequest, map[string]any{
"error": "query parameter 'rate' is required (e.g. rate=2mb, rate=500kb)",
})
}
bytesPerSec, err := parseRateString(rateStr)
if err != nil {
return c.JSON(http.StatusBadRequest, map[string]any{
"error": fmt.Sprintf("invalid rate %q: %v", rateStr, err),
})
}

xlog.Debug("API request to throttle operation", "jobID", jobID, "rate", bytesPerSec)
if err := galleryService.SetOperationRateLimit(jobID, bytesPerSec); err != nil {
xlog.Error("Failed to throttle operation", "error", err, "jobID", jobID)
return c.JSON(http.StatusBadRequest, map[string]any{
"error": err.Error(),
})
}

return c.JSON(200, map[string]any{
"success": true,
"message": fmt.Sprintf("Operation throttled to %d bytes/sec", bytesPerSec),
})
}, adminMiddleware)

// Model Gallery APIs (admin only)
app.GET("/api/models", func(c echo.Context) error {
term := c.QueryParam("term")
Expand Down
21 changes: 21 additions & 0 deletions core/services/galleryop/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/mudler/LocalAI/core/config"
"github.com/mudler/LocalAI/core/gallery"
"github.com/mudler/LocalAI/core/services/messaging"
"github.com/mudler/LocalAI/pkg/downloader"
"github.com/mudler/LocalAI/pkg/model"
"github.com/mudler/LocalAI/pkg/system"
"github.com/mudler/LocalAI/pkg/utils"
Expand Down Expand Up @@ -83,6 +84,26 @@ func (g *GalleryService) modelHandler(op *ManagementOp[gallery.GalleryModel, gal
})
return err
}
// Check if the download was paused — the .partial is preserved for
// later resume, so this is not a terminal failure.
if errors.Is(err, downloader.ErrUserPaused) {
g.UpdateStatus(op.ID, &OpStatus{
Paused: true,
Message: "paused",
GalleryElementName: op.GalleryElementName,
Cancellable: true,
})
// Store the operation metadata so ResumeOperation can re-queue it.
g.storePausedOp(op.ID, &PausedModelOp{
Galleries: op.Galleries,
BackendGalleries: op.BackendGalleries,
Req: op.Req,
GalleryElementName: op.GalleryElementName,
})
// Return nil so Start() does not call updateError — this is not a
// failure, it's a deliberate pause.
return nil
}
return err
}

Expand Down
12 changes: 12 additions & 0 deletions core/services/galleryop/operation.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ type OpStatus struct {
GalleryElementName string `json:"gallery_element_name"`
Cancelled bool `json:"cancelled"` // Cancelled is true if the operation was cancelled
Cancellable bool `json:"cancellable"` // Cancellable is true if the operation can be cancelled
Paused bool `json:"paused"` // Paused is true if the operation was paused (resumable)

// Nodes is the per-node breakdown for a fanned-out backend install.
// Populated by DistributedBackendManager (per-node terminal status)
Expand Down Expand Up @@ -87,6 +88,7 @@ type opStatusWire struct {
GalleryElementName string `json:"gallery_element_name"`
Cancelled bool `json:"cancelled"`
Cancellable bool `json:"cancellable"`
Paused bool `json:"paused"`
Nodes []NodeProgress `json:"nodes,omitempty"`
}

Expand All @@ -102,6 +104,7 @@ func (o OpStatus) MarshalJSON() ([]byte, error) {
GalleryElementName: o.GalleryElementName,
Cancelled: o.Cancelled,
Cancellable: o.Cancellable,
Paused: o.Paused,
Nodes: o.Nodes,
}
if o.Error != nil {
Expand All @@ -125,6 +128,7 @@ func (o *OpStatus) UnmarshalJSON(data []byte) error {
o.GalleryElementName = w.GalleryElementName
o.Cancelled = w.Cancelled
o.Cancellable = w.Cancellable
o.Paused = w.Paused
o.Nodes = w.Nodes
if w.ErrorMessage != "" {
o.Error = errors.New(w.ErrorMessage)
Expand Down Expand Up @@ -161,6 +165,14 @@ type GalleryCancelEvent struct {
JobID string `json:"id"`
}

// GalleryPauseEvent is the NATS payload for a gallery pause. Mirroring the
// cancel pattern, the pause func may live on a different frontend replica;
// the broadcast subscriber applies the pause locally. A paused operation can
// be resumed later — the .partial file is preserved.
type GalleryPauseEvent struct {
JobID string `json:"id"`
}

// NodeStatus values shared between NodeProgress (per-node tick) and the
// NodeOpStatus surfaced by DistributedBackendManager's fan-out. Defined
// as exported constants so producers (the manager, the progress bridge)
Expand Down
Loading