Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ require (
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.39.0
go.opentelemetry.io/otel/sdk v1.39.0
go.opentelemetry.io/otel/sdk/metric v1.39.0
go.opentelemetry.io/otel/trace v1.39.0
golang.org/x/tools v0.38.0
gorm.io/driver/mysql v1.5.7
gorm.io/gorm v1.30.0
Expand Down Expand Up @@ -146,7 +147,6 @@ require (
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.60.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.39.0 // indirect
go.opentelemetry.io/otel/metric v1.39.0 // indirect
go.opentelemetry.io/otel/trace v1.39.0 // indirect
go.opentelemetry.io/proto/otlp v1.9.0 // indirect
golang.org/x/mod v0.29.0 // indirect
golang.org/x/oauth2 v0.32.0 // indirect
Expand Down
12 changes: 12 additions & 0 deletions internal/core/control_panel/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package controlpanel

import (
"sync"
"sync/atomic"
"time"

"github.com/langgenius/dify-plugin-daemon/internal/core/debugging_runtime"
Expand Down Expand Up @@ -69,11 +70,17 @@ type ControlPanel struct {
plugin_entities.PluginUniqueIdentifier,
*debugging_runtime.RemotePluginRuntime,
]

// initial plugin set (locked at startup)
initialPlugins *initialPluginSet

localReadinessSnapshot atomic.Pointer[LocalReadinessSnapshot]
}

type LocalPluginFailsRecord struct {
RetryCount int32
LastTriedAt time.Time
LastError string
}

// create a new control panel as the engine of the local plugin daemon
Expand All @@ -97,5 +104,10 @@ func NewControlPanel(

// local plugin installation lock
localPluginInstallationLock: lock.NewGranularityLock(),

// initial plugin set
initialPlugins: &initialPluginSet{
ids: make(map[string]bool),
},
}
}
208 changes: 208 additions & 0 deletions internal/core/control_panel/readiness.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,208 @@
package controlpanel

import (
"sync"
"time"

"github.com/langgenius/dify-plugin-daemon/pkg/entities/plugin_entities"
)

type LocalReadinessSnapshot struct {
// ⭐ Core: readiness is only based on initial plugin state
// Once Pod is ready, it will never become not ready due to runtime new plugins
Ready bool

// Initial plugin state (locked at Pod startup, never changed afterward)
InitialPluginsReady bool
InitialExpected int
InitialRunning int
InitialMissing []string
InitialFailed []string

// Runtime added plugin state (not related to readiness, for monitoring only)
RuntimePluginsLoading int
RuntimeMissing []string

// Total statistics (including initial + runtime)
Expected int
Running int
Missing []string
Failed []string
UpdatedAt time.Time
Platform string
Installed int
Ignored int
MaxRetries int32
}

type initialPluginSet struct {
lock sync.RWMutex
ids map[string]bool // plugin id → true
ready bool // whether it has been locked
}

type initialPluginsStatus struct {
ready bool
expected int
running int
missing []string
failed []string
}

func (c *ControlPanel) LocalReadiness() (LocalReadinessSnapshot, bool) {
ptr := c.localReadinessSnapshot.Load()
if ptr == nil {
return LocalReadinessSnapshot{}, false
}
return *ptr, true
}

func (c *ControlPanel) updateLocalReadinessSnapshot(
installed []plugin_entities.PluginUniqueIdentifier,
) {
now := time.Now()

expected := make([]plugin_entities.PluginUniqueIdentifier, 0, len(installed))
ignored := 0
for _, id := range installed {
if _, ok := c.localPluginWatchIgnoreList.Load(id); ok {
ignored++
continue
}
expected = append(expected, id)
}

// Calculate total plugin state
missing := make([]string, 0)
failed := make([]string, 0)
running := 0
for _, id := range expected {
if c.localPluginRuntimes.Exists(id) {
running++
continue
}

if retry, ok := c.localPluginFailsRecord.Load(id); ok && retry.RetryCount >= c.config.PluginLocalMaxRetryCount {
failed = append(failed, id.String())
continue
}
missing = append(missing, id.String())
}

// Calculate initial plugin state
initialStatus := c.isInitialPluginsReady(expected)

// Calculate runtime added plugins
runtimeMissing := make([]string, 0)
runtimeLoading := 0

initialSet := c.getInitialPluginSet()
for _, id := range expected {
idStr := id.String()
if !initialSet[idStr] {
// This is a plugin added at runtime
if !c.localPluginRuntimes.Exists(id) {
if retry, ok := c.localPluginFailsRecord.Load(id); !ok || retry.RetryCount < c.config.PluginLocalMaxRetryCount {
runtimeMissing = append(runtimeMissing, idStr)
runtimeLoading++
}
}
}
}

// 🔑 Key: readiness ONLY depends on initial plugins
// Once ready, it will never become not ready due to runtime plugin additions
snapshot := &LocalReadinessSnapshot{
Ready: initialStatus.ready,
InitialPluginsReady: initialStatus.ready,
InitialExpected: initialStatus.expected,
InitialRunning: initialStatus.running,
InitialMissing: initialStatus.missing,
InitialFailed: initialStatus.failed,
RuntimePluginsLoading: runtimeLoading,
RuntimeMissing: runtimeMissing,
Expected: len(expected),
Installed: len(installed),
Ignored: ignored,
Running: running,
Missing: missing,
Failed: failed,
UpdatedAt: now,
Platform: string(c.config.Platform),
MaxRetries: c.config.PluginLocalMaxRetryCount,
}
c.localReadinessSnapshot.Store(snapshot)
}

// isInitialPluginsReady checks if all initial plugins have been started
func (c *ControlPanel) isInitialPluginsReady(
current []plugin_entities.PluginUniqueIdentifier,
) initialPluginsStatus {
initialSet := c.getInitialPluginSet()
if len(initialSet) == 0 && len(current) > 0 {
// First startup, lock the initial plugin set
c.lockInitialPlugins(current)
initialSet = c.getInitialPluginSet()
}

missingList := make([]string, 0)
failedList := make([]string, 0)
running := 0
expected := 0

for _, id := range current {
idStr := id.String()
if !initialSet[idStr] {
continue
}

expected++
if c.localPluginRuntimes.Exists(id) {
running++
continue
}

if retry, ok := c.localPluginFailsRecord.Load(id); ok && retry.RetryCount >= c.config.PluginLocalMaxRetryCount {
failedList = append(failedList, idStr)
continue
}
missingList = append(missingList, idStr)
}

return initialPluginsStatus{
ready: len(missingList) == 0,
expected: expected,
running: running,
missing: missingList,
failed: failedList,
}
}

// lockInitialPlugins locks the initial plugin set (only on first call)
func (c *ControlPanel) lockInitialPlugins(
plugins []plugin_entities.PluginUniqueIdentifier,
) {
c.initialPlugins.lock.Lock()
defer c.initialPlugins.lock.Unlock()

if c.initialPlugins.ready {
return
}

for _, id := range plugins {
c.initialPlugins.ids[id.String()] = true
}
c.initialPlugins.ready = true
}

// getInitialPluginSet returns the initial plugin set (read-only)
func (c *ControlPanel) getInitialPluginSet() map[string]bool {
c.initialPlugins.lock.RLock()
defer c.initialPlugins.lock.RUnlock()

result := make(map[string]bool)
for k, v := range c.initialPlugins.ids {
result[k] = v
}
return result
}
20 changes: 12 additions & 8 deletions internal/core/control_panel/server_local.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
func (c *ControlPanel) startLocalMonitor() {
log.Info("start to handle new plugins", "path", c.config.PluginInstalledPath)
log.Info("launch plugins with max concurrency", "concurrency", c.config.PluginLocalLaunchingConcurrent)
log.Info("plugin max retry count", "max_retry_count", c.config.PluginLocalMaxRetryCount)

c.handleNewLocalPlugins()
// sync every 30 seconds
Expand Down Expand Up @@ -83,10 +84,11 @@ func (c *ControlPanel) handleNewLocalPlugins() {
retry = LocalPluginFailsRecord{
RetryCount: 0,
LastTriedAt: time.Now(),
LastError: "",
}
}

if retry.RetryCount >= MAX_RETRY_COUNT {
if retry.RetryCount >= c.config.PluginLocalMaxRetryCount {
continue
}

Expand Down Expand Up @@ -114,6 +116,7 @@ func (c *ControlPanel) handleNewLocalPlugins() {
c.localPluginFailsRecord.Store(uniquePluginIdentifier, LocalPluginFailsRecord{
RetryCount: retry.RetryCount + 1,
LastTriedAt: time.Now(),
LastError: err.Error(),
})
} else {
// reset the failure record
Expand All @@ -124,17 +127,18 @@ func (c *ControlPanel) handleNewLocalPlugins() {

// wait for all plugins to be launched
wg.Wait()

// update readiness snapshot
c.updateLocalReadinessSnapshot(plugins)
}

var (
MAX_RETRY_COUNT = int32(15)

RETRY_WAIT_INTERVAL_MAP = map[int32]time.Duration{
0: 0 * time.Second,
3: 30 * time.Second,
8: 60 * time.Second,
MAX_RETRY_COUNT: 240 * time.Second,
// stop
0: 0 * time.Second,
1: 15 * time.Second,
2: 30 * time.Second,
3: 60 * time.Second,
4: 120 * time.Second,
}
)

Expand Down
35 changes: 35 additions & 0 deletions internal/core/plugin_manager/readiness.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package plugin_manager

import (
controlpanel "github.com/langgenius/dify-plugin-daemon/internal/core/control_panel"
"github.com/langgenius/dify-plugin-daemon/internal/types/app"
)

type ReadinessReport struct {
Ready bool
Reason string
Plugins *controlpanel.LocalReadinessSnapshot
}

func (p *PluginManager) Readiness() ReadinessReport {
if p == nil || p.config == nil {
return ReadinessReport{Ready: false, Reason: "manager_not_initialized"}
}

if p.config.Platform != app.PLATFORM_LOCAL {
return ReadinessReport{Ready: true, Reason: "non_local_platform"}
}

snapshot, ok := p.controlPanel.LocalReadiness()
if !ok {
return ReadinessReport{Ready: false, Reason: "plugin_monitor_not_ready"}
}

if snapshot.Ready {
return ReadinessReport{Ready: true, Reason: "plugins_ready", Plugins: &snapshot}
}
if len(snapshot.Failed) > 0 {
return ReadinessReport{Ready: false, Reason: "plugins_failed", Plugins: &snapshot}
}
return ReadinessReport{Ready: false, Reason: "plugins_starting", Plugins: &snapshot}
}
29 changes: 29 additions & 0 deletions internal/server/controllers/ready_check.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package controllers

import (
"net/http"

"github.com/gin-gonic/gin"
"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager"
)

func ReadyCheck() gin.HandlerFunc {
return func(c *gin.Context) {
report := plugin_manager.Manager().Readiness()
if report.Ready {
c.JSON(http.StatusOK, gin.H{
"status": "ok",
"ready": true,
"reason": report.Reason,
})
return
}

c.JSON(http.StatusServiceUnavailable, gin.H{
"status": "unready",
"ready": false,
"reason": report.Reason,
"detail": report.Plugins,
})
}
}
3 changes: 2 additions & 1 deletion internal/server/http_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,15 @@ engine := gin.New()
engine.Use(log.LoggerMiddleware())
} else {
engine.Use(log.LoggerMiddlewareWithConfig(log.LoggerConfig{
SkipPaths: []string{"/health/check"},
SkipPaths: []string{"/health/check", "/ready/check"},
}))
}
engine.Use(controllers.CollectActiveRequests())
engine.NoRoute(func(c *gin.Context) {
c.JSON(http.StatusNotFound, gin.H{"code": "not_found", "message": "route not found"})
})
engine.GET("/health/check", controllers.HealthCheck(config))
engine.GET("/ready/check", controllers.ReadyCheck())

endpointGroup := engine.Group("/e")
serverlessTransactionGroup := engine.Group("/backwards-invocation")
Expand Down
Loading