From 16864851f3d3841a758806f85a82af69bc8b96f5 Mon Sep 17 00:00:00 2001 From: Yeuoly Date: Thu, 20 Mar 2025 14:14:01 +0800 Subject: [PATCH] feat: graceful-shutdown --- .../plugin_manager/debugging_runtime/run.go | 2 +- .../debugging_runtime/server_test.go | 6 +-- .../core/plugin_manager/install_to_local.go | 4 +- .../core/plugin_manager/local_runtime/io.go | 17 +++++-- .../core/plugin_manager/local_runtime/run.go | 47 +++++++++++++++---- .../core/plugin_manager/local_runtime/type.go | 2 + .../core/plugin_manager/runtime_lifetime.go | 2 +- internal/core/plugin_manager/uninstall.go | 7 ++- internal/core/plugin_manager/watcher.go | 2 +- internal/service/install_plugin.go | 7 ++- pkg/entities/plugin_entities/runtime.go | 4 +- 11 files changed, 74 insertions(+), 26 deletions(-) diff --git a/internal/core/plugin_manager/debugging_runtime/run.go b/internal/core/plugin_manager/debugging_runtime/run.go index 352a92938..881dffdc7 100644 --- a/internal/core/plugin_manager/debugging_runtime/run.go +++ b/internal/core/plugin_manager/debugging_runtime/run.go @@ -17,7 +17,7 @@ func (r *RemotePluginRuntime) Stopped() bool { return !r.alive } -func (r *RemotePluginRuntime) Stop() { +func (r *RemotePluginRuntime) Stop(graceful bool) { r.alive = false if r.conn == nil { return diff --git a/internal/core/plugin_manager/debugging_runtime/server_test.go b/internal/core/plugin_manager/debugging_runtime/server_test.go index c12ef556f..9111cb9db 100644 --- a/internal/core/plugin_manager/debugging_runtime/server_test.go +++ b/internal/core/plugin_manager/debugging_runtime/server_test.go @@ -135,7 +135,7 @@ func TestAcceptConnection(t *testing.T) { } gotConnection = true - runtime.Stop() + runtime.Stop(false) } }() @@ -282,7 +282,7 @@ func TestNoHandleShakeIn10Seconds(t *testing.T) { return } - runtime.Stop() + runtime.Stop(false) } }() @@ -346,7 +346,7 @@ func TestIncorrectHandshake(t *testing.T) { return } - runtime.Stop() + runtime.Stop(false) } }() diff --git a/internal/core/plugin_manager/install_to_local.go b/internal/core/plugin_manager/install_to_local.go index 5b1908e09..3a425a7b1 100644 --- a/internal/core/plugin_manager/install_to_local.go +++ b/internal/core/plugin_manager/install_to_local.go @@ -57,7 +57,7 @@ func (p *PluginManager) InstallToLocal( Event: PluginInstallEventInfo, Data: "Timeout", }) - runtime.Stop() + runtime.Stop(false) return case err := <-errChan: if err != nil { @@ -66,7 +66,7 @@ func (p *PluginManager) InstallToLocal( Event: PluginInstallEventError, Data: err.Error(), }) - runtime.Stop() + runtime.Stop(false) return } case <-launchedChan: diff --git a/internal/core/plugin_manager/local_runtime/io.go b/internal/core/plugin_manager/local_runtime/io.go index 384e1509b..cbf98ba73 100644 --- a/internal/core/plugin_manager/local_runtime/io.go +++ b/internal/core/plugin_manager/local_runtime/io.go @@ -1,6 +1,8 @@ package local_runtime import ( + "sync/atomic" + "github.com/langgenius/dify-plugin-daemon/internal/core/plugin_daemon/access_types" "github.com/langgenius/dify-plugin-daemon/internal/utils/log" "github.com/langgenius/dify-plugin-daemon/internal/utils/parser" @@ -8,12 +10,19 @@ import ( "github.com/langgenius/dify-plugin-daemon/pkg/entities/plugin_entities" ) -func (r *LocalPluginRuntime) Listen(session_id string) *entities.Broadcast[plugin_entities.SessionMessage] { +func (r *LocalPluginRuntime) Listen(sessionId string) *entities.Broadcast[plugin_entities.SessionMessage] { listener := entities.NewBroadcast[plugin_entities.SessionMessage]() + listener.OnClose(func() { - removeStdioHandlerListener(r.ioIdentity, session_id) + removeStdioHandlerListener(r.ioIdentity, sessionId) + // decrease the active sessions + atomic.AddInt32(&r.activeSessions, -1) }) - setupStdioEventListener(r.ioIdentity, session_id, func(b []byte) { + + // increase the active sessions + atomic.AddInt32(&r.activeSessions, 1) + + setupStdioEventListener(r.ioIdentity, sessionId, func(b []byte) { // unmarshal the session message data, err := parser.UnmarshalJsonBytes[plugin_entities.SessionMessage](b) if err != nil { @@ -26,6 +35,6 @@ func (r *LocalPluginRuntime) Listen(session_id string) *entities.Broadcast[plugi return listener } -func (r *LocalPluginRuntime) Write(session_id string, action access_types.PluginAccessAction, data []byte) { +func (r *LocalPluginRuntime) Write(sessionId string, action access_types.PluginAccessAction, data []byte) { writeToStdioHandler(r.ioIdentity, append(data, '\n')) } diff --git a/internal/core/plugin_manager/local_runtime/run.go b/internal/core/plugin_manager/local_runtime/run.go index 56bc3148e..c33e62ead 100644 --- a/internal/core/plugin_manager/local_runtime/run.go +++ b/internal/core/plugin_manager/local_runtime/run.go @@ -7,6 +7,7 @@ import ( "os" "os/exec" "sync" + "time" "github.com/langgenius/dify-plugin-daemon/internal/utils/log" "github.com/langgenius/dify-plugin-daemon/internal/utils/routine" @@ -218,13 +219,43 @@ func (r *LocalPluginRuntime) WaitStopped() <-chan bool { } // Stop stops the plugin -func (r *LocalPluginRuntime) Stop() { - // inherit from PluginRuntime - r.PluginRuntime.Stop() - - // get stdio - stdio := getStdioHandler(r.ioIdentity) - if stdio != nil { - stdio.Stop() +func (r *LocalPluginRuntime) Stop(graceful bool) { + // inherit from PluginRuntime, set status to stopped + r.PluginRuntime.Stop(graceful) + + if graceful { + routine.Submit(map[string]string{ + "module": "plugin_manager", + "type": "local", + "function": "Stop", + }, func() { + // stop the plugin by closing stdio + // normally, Runtime.run will blocks a goroutine by listening on the stdio + // plugin were considered as running until it's stdio is closed, just by closing the stdio, we can reach the goal + // but to be more robust, we want a graceful stop. + // After calling Stop, new requests will be routed to new runtime + defer func() { + stdio := getStdioHandler(r.ioIdentity) + if stdio != nil { + stdio.Stop() + } + }() + + ticker := time.NewTicker(1 * time.Second) + defer ticker.Stop() + timeout := time.After(360 * time.Second) + + for { + select { + case <-ticker.C: + if r.activeSessions == 0 { + return + } + case <-timeout: + return + } + } + }) } + } diff --git a/internal/core/plugin_manager/local_runtime/type.go b/internal/core/plugin_manager/local_runtime/type.go index 51acb6f1c..956634e98 100644 --- a/internal/core/plugin_manager/local_runtime/type.go +++ b/internal/core/plugin_manager/local_runtime/type.go @@ -38,6 +38,8 @@ type LocalPluginRuntime struct { waitStoppedChan []chan bool isNotFirstStart bool + + activeSessions int32 } type LocalPluginRuntimeConfig struct { diff --git a/internal/core/plugin_manager/runtime_lifetime.go b/internal/core/plugin_manager/runtime_lifetime.go index 6858dcebc..444262ee2 100644 --- a/internal/core/plugin_manager/runtime_lifetime.go +++ b/internal/core/plugin_manager/runtime_lifetime.go @@ -19,7 +19,7 @@ func (p *PluginManager) fullDuplexLifecycle( errChan chan error, ) { // stop plugin when the plugin reaches the end of its lifetime - defer r.Stop() + defer r.Stop(false) // cleanup plugin runtime state and working directory defer r.Cleanup() diff --git a/internal/core/plugin_manager/uninstall.go b/internal/core/plugin_manager/uninstall.go index 729275acb..d826f5d59 100644 --- a/internal/core/plugin_manager/uninstall.go +++ b/internal/core/plugin_manager/uninstall.go @@ -6,7 +6,10 @@ import ( // UninstallFromLocal uninstalls a plugin from local storage // once deleted, local runtime will automatically shutdown and exit after several time -func (p *PluginManager) UninstallFromLocal(identity plugin_entities.PluginUniqueIdentifier) error { +func (p *PluginManager) UninstallFromLocal( + identity plugin_entities.PluginUniqueIdentifier, + graceful bool, +) error { if err := p.installedBucket.Delete(identity); err != nil { return err } @@ -16,6 +19,6 @@ func (p *PluginManager) UninstallFromLocal(identity plugin_entities.PluginUnique // no runtime to shutdown, already uninstalled return nil } - runtime.Stop() + runtime.Stop(graceful) return nil } diff --git a/internal/core/plugin_manager/watcher.go b/internal/core/plugin_manager/watcher.go index 5e1900c2e..b3ae86e1a 100644 --- a/internal/core/plugin_manager/watcher.go +++ b/internal/core/plugin_manager/watcher.go @@ -114,7 +114,7 @@ func (p *PluginManager) removeUninstalledLocalPlugins() { } if !exists { - runtime.Stop() + runtime.Stop(false) } return true diff --git a/internal/service/install_plugin.go b/internal/service/install_plugin.go index d54711a96..7a45159d6 100644 --- a/internal/service/install_plugin.go +++ b/internal/service/install_plugin.go @@ -418,6 +418,7 @@ func UpgradePlugin( ) { err = manager.UninstallFromLocal( plugin_entities.PluginUniqueIdentifier(upgradeResponse.DeletedPlugin.PluginUniqueIdentifier), + true, ) if err != nil { return err @@ -615,9 +616,11 @@ func UninstallPlugin( if deleteResponse.Installation.RuntimeType == string( plugin_entities.PLUGIN_RUNTIME_TYPE_LOCAL, ) { - err = manager.UninstallFromLocal(pluginUniqueIdentifier) + err = manager.UninstallFromLocal(pluginUniqueIdentifier, false) if err != nil { - return exception.InternalServerError(fmt.Errorf("failed to uninstall plugin: %s", err.Error())).ToResponse() + return exception.InternalServerError( + fmt.Errorf("failed to uninstall plugin: %s", err.Error()), + ).ToResponse() } } } diff --git a/pkg/entities/plugin_entities/runtime.go b/pkg/entities/plugin_entities/runtime.go index a898eee40..264b27b23 100644 --- a/pkg/entities/plugin_entities/runtime.go +++ b/pkg/entities/plugin_entities/runtime.go @@ -83,7 +83,7 @@ type ( PluginClusterLifetime interface { // stop the plugin - Stop() + Stop(graceful bool) // add a function to be called when the plugin stops OnStop(func()) // trigger the stop event @@ -114,7 +114,7 @@ func (r *PluginRuntime) Stopped() bool { return r.State.Status == PLUGIN_RUNTIME_STATUS_STOPPED } -func (r *PluginRuntime) Stop() { +func (r *PluginRuntime) Stop(graceful bool) { r.State.Status = PLUGIN_RUNTIME_STATUS_STOPPED }