diff --git a/go.mod b/go.mod index f434cd31f..feff53f3d 100644 --- a/go.mod +++ b/go.mod @@ -25,6 +25,7 @@ require ( go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.39.0 go.opentelemetry.io/otel/sdk v1.39.0 go.opentelemetry.io/otel/sdk/metric v1.39.0 + go.opentelemetry.io/otel/trace v1.39.0 golang.org/x/tools v0.38.0 gorm.io/driver/mysql v1.5.7 gorm.io/gorm v1.30.0 @@ -146,7 +147,6 @@ require ( go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.60.0 // indirect go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.39.0 // indirect go.opentelemetry.io/otel/metric v1.39.0 // indirect - go.opentelemetry.io/otel/trace v1.39.0 // indirect go.opentelemetry.io/proto/otlp v1.9.0 // indirect golang.org/x/mod v0.29.0 // indirect golang.org/x/oauth2 v0.32.0 // indirect diff --git a/internal/core/control_panel/daemon.go b/internal/core/control_panel/daemon.go index 73a29ecfe..c51ac6cbc 100644 --- a/internal/core/control_panel/daemon.go +++ b/internal/core/control_panel/daemon.go @@ -2,6 +2,7 @@ package controlpanel import ( "sync" + "sync/atomic" "time" "github.com/langgenius/dify-plugin-daemon/internal/core/debugging_runtime" @@ -69,11 +70,17 @@ type ControlPanel struct { plugin_entities.PluginUniqueIdentifier, *debugging_runtime.RemotePluginRuntime, ] + + // initial plugin set (locked at startup) + initialPlugins *initialPluginSet + + localReadinessSnapshot atomic.Pointer[LocalReadinessSnapshot] } type LocalPluginFailsRecord struct { RetryCount int32 LastTriedAt time.Time + LastError string } // create a new control panel as the engine of the local plugin daemon @@ -97,5 +104,10 @@ func NewControlPanel( // local plugin installation lock localPluginInstallationLock: lock.NewGranularityLock(), + + // initial plugin set + initialPlugins: &initialPluginSet{ + ids: make(map[string]bool), + }, } } diff --git a/internal/core/control_panel/readiness.go b/internal/core/control_panel/readiness.go new file mode 100644 index 000000000..4202eb752 --- /dev/null +++ b/internal/core/control_panel/readiness.go @@ -0,0 +1,208 @@ +package controlpanel + +import ( + "sync" + "time" + + "github.com/langgenius/dify-plugin-daemon/pkg/entities/plugin_entities" +) + +type LocalReadinessSnapshot struct { + // ⭐ Core: readiness is only based on initial plugin state + // Once Pod is ready, it will never become not ready due to runtime new plugins + Ready bool + + // Initial plugin state (locked at Pod startup, never changed afterward) + InitialPluginsReady bool + InitialExpected int + InitialRunning int + InitialMissing []string + InitialFailed []string + + // Runtime added plugin state (not related to readiness, for monitoring only) + RuntimePluginsLoading int + RuntimeMissing []string + + // Total statistics (including initial + runtime) + Expected int + Running int + Missing []string + Failed []string + UpdatedAt time.Time + Platform string + Installed int + Ignored int + MaxRetries int32 +} + +type initialPluginSet struct { + lock sync.RWMutex + ids map[string]bool // plugin id → true + ready bool // whether it has been locked +} + +type initialPluginsStatus struct { + ready bool + expected int + running int + missing []string + failed []string +} + +func (c *ControlPanel) LocalReadiness() (LocalReadinessSnapshot, bool) { + ptr := c.localReadinessSnapshot.Load() + if ptr == nil { + return LocalReadinessSnapshot{}, false + } + return *ptr, true +} + +func (c *ControlPanel) updateLocalReadinessSnapshot( + installed []plugin_entities.PluginUniqueIdentifier, +) { + now := time.Now() + + expected := make([]plugin_entities.PluginUniqueIdentifier, 0, len(installed)) + ignored := 0 + for _, id := range installed { + if _, ok := c.localPluginWatchIgnoreList.Load(id); ok { + ignored++ + continue + } + expected = append(expected, id) + } + + // Calculate total plugin state + missing := make([]string, 0) + failed := make([]string, 0) + running := 0 + for _, id := range expected { + if c.localPluginRuntimes.Exists(id) { + running++ + continue + } + + if retry, ok := c.localPluginFailsRecord.Load(id); ok && retry.RetryCount >= c.config.PluginLocalMaxRetryCount { + failed = append(failed, id.String()) + continue + } + missing = append(missing, id.String()) + } + + // Calculate initial plugin state + initialStatus := c.isInitialPluginsReady(expected) + + // Calculate runtime added plugins + runtimeMissing := make([]string, 0) + runtimeLoading := 0 + + initialSet := c.getInitialPluginSet() + for _, id := range expected { + idStr := id.String() + if !initialSet[idStr] { + // This is a plugin added at runtime + if !c.localPluginRuntimes.Exists(id) { + if retry, ok := c.localPluginFailsRecord.Load(id); !ok || retry.RetryCount < c.config.PluginLocalMaxRetryCount { + runtimeMissing = append(runtimeMissing, idStr) + runtimeLoading++ + } + } + } + } + + // 🔑 Key: readiness ONLY depends on initial plugins + // Once ready, it will never become not ready due to runtime plugin additions + snapshot := &LocalReadinessSnapshot{ + Ready: initialStatus.ready, + InitialPluginsReady: initialStatus.ready, + InitialExpected: initialStatus.expected, + InitialRunning: initialStatus.running, + InitialMissing: initialStatus.missing, + InitialFailed: initialStatus.failed, + RuntimePluginsLoading: runtimeLoading, + RuntimeMissing: runtimeMissing, + Expected: len(expected), + Installed: len(installed), + Ignored: ignored, + Running: running, + Missing: missing, + Failed: failed, + UpdatedAt: now, + Platform: string(c.config.Platform), + MaxRetries: c.config.PluginLocalMaxRetryCount, + } + c.localReadinessSnapshot.Store(snapshot) +} + +// isInitialPluginsReady checks if all initial plugins have been started +func (c *ControlPanel) isInitialPluginsReady( + current []plugin_entities.PluginUniqueIdentifier, +) initialPluginsStatus { + initialSet := c.getInitialPluginSet() + if len(initialSet) == 0 && len(current) > 0 { + // First startup, lock the initial plugin set + c.lockInitialPlugins(current) + initialSet = c.getInitialPluginSet() + } + + missingList := make([]string, 0) + failedList := make([]string, 0) + running := 0 + expected := 0 + + for _, id := range current { + idStr := id.String() + if !initialSet[idStr] { + continue + } + + expected++ + if c.localPluginRuntimes.Exists(id) { + running++ + continue + } + + if retry, ok := c.localPluginFailsRecord.Load(id); ok && retry.RetryCount >= c.config.PluginLocalMaxRetryCount { + failedList = append(failedList, idStr) + continue + } + missingList = append(missingList, idStr) + } + + return initialPluginsStatus{ + ready: len(missingList) == 0, + expected: expected, + running: running, + missing: missingList, + failed: failedList, + } +} + +// lockInitialPlugins locks the initial plugin set (only on first call) +func (c *ControlPanel) lockInitialPlugins( + plugins []plugin_entities.PluginUniqueIdentifier, +) { + c.initialPlugins.lock.Lock() + defer c.initialPlugins.lock.Unlock() + + if c.initialPlugins.ready { + return + } + + for _, id := range plugins { + c.initialPlugins.ids[id.String()] = true + } + c.initialPlugins.ready = true +} + +// getInitialPluginSet returns the initial plugin set (read-only) +func (c *ControlPanel) getInitialPluginSet() map[string]bool { + c.initialPlugins.lock.RLock() + defer c.initialPlugins.lock.RUnlock() + + result := make(map[string]bool) + for k, v := range c.initialPlugins.ids { + result[k] = v + } + return result +} diff --git a/internal/core/control_panel/server_local.go b/internal/core/control_panel/server_local.go index b6957f46c..a6bca3817 100644 --- a/internal/core/control_panel/server_local.go +++ b/internal/core/control_panel/server_local.go @@ -15,6 +15,7 @@ import ( func (c *ControlPanel) startLocalMonitor() { log.Info("start to handle new plugins", "path", c.config.PluginInstalledPath) log.Info("launch plugins with max concurrency", "concurrency", c.config.PluginLocalLaunchingConcurrent) + log.Info("plugin max retry count", "max_retry_count", c.config.PluginLocalMaxRetryCount) c.handleNewLocalPlugins() // sync every 30 seconds @@ -83,10 +84,11 @@ func (c *ControlPanel) handleNewLocalPlugins() { retry = LocalPluginFailsRecord{ RetryCount: 0, LastTriedAt: time.Now(), + LastError: "", } } - if retry.RetryCount >= MAX_RETRY_COUNT { + if retry.RetryCount >= c.config.PluginLocalMaxRetryCount { continue } @@ -114,6 +116,7 @@ func (c *ControlPanel) handleNewLocalPlugins() { c.localPluginFailsRecord.Store(uniquePluginIdentifier, LocalPluginFailsRecord{ RetryCount: retry.RetryCount + 1, LastTriedAt: time.Now(), + LastError: err.Error(), }) } else { // reset the failure record @@ -124,17 +127,18 @@ func (c *ControlPanel) handleNewLocalPlugins() { // wait for all plugins to be launched wg.Wait() + + // update readiness snapshot + c.updateLocalReadinessSnapshot(plugins) } var ( - MAX_RETRY_COUNT = int32(15) - RETRY_WAIT_INTERVAL_MAP = map[int32]time.Duration{ - 0: 0 * time.Second, - 3: 30 * time.Second, - 8: 60 * time.Second, - MAX_RETRY_COUNT: 240 * time.Second, - // stop + 0: 0 * time.Second, + 1: 15 * time.Second, + 2: 30 * time.Second, + 3: 60 * time.Second, + 4: 120 * time.Second, } ) diff --git a/internal/core/plugin_manager/readiness.go b/internal/core/plugin_manager/readiness.go new file mode 100644 index 000000000..5ae03cb99 --- /dev/null +++ b/internal/core/plugin_manager/readiness.go @@ -0,0 +1,35 @@ +package plugin_manager + +import ( + controlpanel "github.com/langgenius/dify-plugin-daemon/internal/core/control_panel" + "github.com/langgenius/dify-plugin-daemon/internal/types/app" +) + +type ReadinessReport struct { + Ready bool + Reason string + Plugins *controlpanel.LocalReadinessSnapshot +} + +func (p *PluginManager) Readiness() ReadinessReport { + if p == nil || p.config == nil { + return ReadinessReport{Ready: false, Reason: "manager_not_initialized"} + } + + if p.config.Platform != app.PLATFORM_LOCAL { + return ReadinessReport{Ready: true, Reason: "non_local_platform"} + } + + snapshot, ok := p.controlPanel.LocalReadiness() + if !ok { + return ReadinessReport{Ready: false, Reason: "plugin_monitor_not_ready"} + } + + if snapshot.Ready { + return ReadinessReport{Ready: true, Reason: "plugins_ready", Plugins: &snapshot} + } + if len(snapshot.Failed) > 0 { + return ReadinessReport{Ready: false, Reason: "plugins_failed", Plugins: &snapshot} + } + return ReadinessReport{Ready: false, Reason: "plugins_starting", Plugins: &snapshot} +} diff --git a/internal/server/controllers/ready_check.go b/internal/server/controllers/ready_check.go new file mode 100644 index 000000000..fc2a12c8e --- /dev/null +++ b/internal/server/controllers/ready_check.go @@ -0,0 +1,29 @@ +package controllers + +import ( + "net/http" + + "github.com/gin-gonic/gin" + "github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager" +) + +func ReadyCheck() gin.HandlerFunc { + return func(c *gin.Context) { + report := plugin_manager.Manager().Readiness() + if report.Ready { + c.JSON(http.StatusOK, gin.H{ + "status": "ok", + "ready": true, + "reason": report.Reason, + }) + return + } + + c.JSON(http.StatusServiceUnavailable, gin.H{ + "status": "unready", + "ready": false, + "reason": report.Reason, + "detail": report.Plugins, + }) + } +} diff --git a/internal/server/http_server.go b/internal/server/http_server.go index f28bbfcc1..484130e1a 100644 --- a/internal/server/http_server.go +++ b/internal/server/http_server.go @@ -29,7 +29,7 @@ engine := gin.New() engine.Use(log.LoggerMiddleware()) } else { engine.Use(log.LoggerMiddlewareWithConfig(log.LoggerConfig{ - SkipPaths: []string{"/health/check"}, + SkipPaths: []string{"/health/check", "/ready/check"}, })) } engine.Use(controllers.CollectActiveRequests()) @@ -37,6 +37,7 @@ engine := gin.New() c.JSON(http.StatusNotFound, gin.H{"code": "not_found", "message": "route not found"}) }) engine.GET("/health/check", controllers.HealthCheck(config)) + engine.GET("/ready/check", controllers.ReadyCheck()) endpointGroup := engine.Group("/e") serverlessTransactionGroup := engine.Group("/backwards-invocation") diff --git a/internal/types/app/config.go b/internal/types/app/config.go index 056388722..19fac8fc2 100644 --- a/internal/types/app/config.go +++ b/internal/types/app/config.go @@ -107,6 +107,9 @@ type Config struct { // local launching max concurrent PluginLocalLaunchingConcurrent int `envconfig:"PLUGIN_LOCAL_LAUNCHING_CONCURRENT" validate:"required"` + // plugin local max retry count + PluginLocalMaxRetryCount int32 `envconfig:"PLUGIN_LOCAL_MAX_RETRY_COUNT" default:"15"` + // platform like local or aws lambda Platform PlatformType `envconfig:"PLATFORM" validate:"required"` diff --git a/internal/types/app/default.go b/internal/types/app/default.go index 3adb08a59..c4346abd8 100644 --- a/internal/types/app/default.go +++ b/internal/types/app/default.go @@ -33,6 +33,7 @@ func (config *Config) SetDefault() { setDefaultString(&config.PluginMediaCachePath, "assets") setDefaultString(&config.PersistenceStoragePath, "persistence") setDefaultInt(&config.PluginLocalLaunchingConcurrent, 2) + setDefaultInt(&config.PluginLocalMaxRetryCount, 15) setDefaultInt(&config.PersistenceStorageMaxSize, 100*1024*1024) setDefaultString(&config.PluginPackageCachePath, "plugin_packages") setDefaultString(&config.PythonInterpreterPath, "/usr/bin/python3")