Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion internal/core/plugin_manager/debugging_runtime/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ func (r *RemotePluginRuntime) Stopped() bool {
return !r.alive
}

func (r *RemotePluginRuntime) Stop() {
func (r *RemotePluginRuntime) Stop(graceful bool) {
r.alive = false
if r.conn == nil {
return
Expand Down
6 changes: 3 additions & 3 deletions internal/core/plugin_manager/debugging_runtime/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ func TestAcceptConnection(t *testing.T) {
}

gotConnection = true
runtime.Stop()
runtime.Stop(false)
}
}()

Expand Down Expand Up @@ -282,7 +282,7 @@ func TestNoHandleShakeIn10Seconds(t *testing.T) {
return
}

runtime.Stop()
runtime.Stop(false)
}
}()

Expand Down Expand Up @@ -346,7 +346,7 @@ func TestIncorrectHandshake(t *testing.T) {
return
}

runtime.Stop()
runtime.Stop(false)
}
}()

Expand Down
4 changes: 2 additions & 2 deletions internal/core/plugin_manager/install_to_local.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func (p *PluginManager) InstallToLocal(
Event: PluginInstallEventInfo,
Data: "Timeout",
})
runtime.Stop()
runtime.Stop(false)
return
case err := <-errChan:
if err != nil {
Expand All @@ -66,7 +66,7 @@ func (p *PluginManager) InstallToLocal(
Event: PluginInstallEventError,
Data: err.Error(),
})
runtime.Stop()
runtime.Stop(false)
return
}
case <-launchedChan:
Expand Down
17 changes: 13 additions & 4 deletions internal/core/plugin_manager/local_runtime/io.go
Original file line number Diff line number Diff line change
@@ -1,19 +1,28 @@
package local_runtime

import (
"sync/atomic"

"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_daemon/access_types"
"github.com/langgenius/dify-plugin-daemon/internal/utils/log"
"github.com/langgenius/dify-plugin-daemon/internal/utils/parser"
"github.com/langgenius/dify-plugin-daemon/pkg/entities"
"github.com/langgenius/dify-plugin-daemon/pkg/entities/plugin_entities"
)

func (r *LocalPluginRuntime) Listen(session_id string) *entities.Broadcast[plugin_entities.SessionMessage] {
func (r *LocalPluginRuntime) Listen(sessionId string) *entities.Broadcast[plugin_entities.SessionMessage] {
listener := entities.NewBroadcast[plugin_entities.SessionMessage]()

listener.OnClose(func() {
removeStdioHandlerListener(r.ioIdentity, session_id)
removeStdioHandlerListener(r.ioIdentity, sessionId)
// decrease the active sessions
atomic.AddInt32(&r.activeSessions, -1)
})
setupStdioEventListener(r.ioIdentity, session_id, func(b []byte) {

// increase the active sessions
atomic.AddInt32(&r.activeSessions, 1)

setupStdioEventListener(r.ioIdentity, sessionId, func(b []byte) {
// unmarshal the session message
data, err := parser.UnmarshalJsonBytes[plugin_entities.SessionMessage](b)
if err != nil {
Expand All @@ -26,6 +35,6 @@ func (r *LocalPluginRuntime) Listen(session_id string) *entities.Broadcast[plugi
return listener
}

func (r *LocalPluginRuntime) Write(session_id string, action access_types.PluginAccessAction, data []byte) {
func (r *LocalPluginRuntime) Write(sessionId string, action access_types.PluginAccessAction, data []byte) {
writeToStdioHandler(r.ioIdentity, append(data, '\n'))
}
47 changes: 39 additions & 8 deletions internal/core/plugin_manager/local_runtime/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"os"
"os/exec"
"sync"
"time"

"github.com/langgenius/dify-plugin-daemon/internal/utils/log"
"github.com/langgenius/dify-plugin-daemon/internal/utils/routine"
Expand Down Expand Up @@ -218,13 +219,43 @@ func (r *LocalPluginRuntime) WaitStopped() <-chan bool {
}

// Stop stops the plugin
func (r *LocalPluginRuntime) Stop() {
// inherit from PluginRuntime
r.PluginRuntime.Stop()

// get stdio
stdio := getStdioHandler(r.ioIdentity)
if stdio != nil {
stdio.Stop()
func (r *LocalPluginRuntime) Stop(graceful bool) {
// inherit from PluginRuntime, set status to stopped
r.PluginRuntime.Stop(graceful)

if graceful {
routine.Submit(map[string]string{
"module": "plugin_manager",
"type": "local",
"function": "Stop",
}, func() {
// stop the plugin by closing stdio
// normally, Runtime.run will blocks a goroutine by listening on the stdio
// plugin were considered as running until it's stdio is closed, just by closing the stdio, we can reach the goal
// but to be more robust, we want a graceful stop.
// After calling Stop, new requests will be routed to new runtime
defer func() {
stdio := getStdioHandler(r.ioIdentity)
if stdio != nil {
stdio.Stop()
}
}()

ticker := time.NewTicker(1 * time.Second)
defer ticker.Stop()
timeout := time.After(360 * time.Second)

for {
select {
case <-ticker.C:
if r.activeSessions == 0 {
return
}
case <-timeout:
return
}
}
})
}

}
2 changes: 2 additions & 0 deletions internal/core/plugin_manager/local_runtime/type.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ type LocalPluginRuntime struct {
waitStoppedChan []chan bool

isNotFirstStart bool

activeSessions int32
}

type LocalPluginRuntimeConfig struct {
Expand Down
2 changes: 1 addition & 1 deletion internal/core/plugin_manager/runtime_lifetime.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ func (p *PluginManager) fullDuplexLifecycle(
errChan chan error,
) {
// stop plugin when the plugin reaches the end of its lifetime
defer r.Stop()
defer r.Stop(false)

// cleanup plugin runtime state and working directory
defer r.Cleanup()
Expand Down
7 changes: 5 additions & 2 deletions internal/core/plugin_manager/uninstall.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,10 @@ import (

// UninstallFromLocal uninstalls a plugin from local storage
// once deleted, local runtime will automatically shutdown and exit after several time
func (p *PluginManager) UninstallFromLocal(identity plugin_entities.PluginUniqueIdentifier) error {
func (p *PluginManager) UninstallFromLocal(
identity plugin_entities.PluginUniqueIdentifier,
graceful bool,
) error {
if err := p.installedBucket.Delete(identity); err != nil {
return err
}
Expand All @@ -16,6 +19,6 @@ func (p *PluginManager) UninstallFromLocal(identity plugin_entities.PluginUnique
// no runtime to shutdown, already uninstalled
return nil
}
runtime.Stop()
runtime.Stop(graceful)
return nil
}
2 changes: 1 addition & 1 deletion internal/core/plugin_manager/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ func (p *PluginManager) removeUninstalledLocalPlugins() {
}

if !exists {
runtime.Stop()
runtime.Stop(false)
}

return true
Expand Down
7 changes: 5 additions & 2 deletions internal/service/install_plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -418,6 +418,7 @@ func UpgradePlugin(
) {
err = manager.UninstallFromLocal(
plugin_entities.PluginUniqueIdentifier(upgradeResponse.DeletedPlugin.PluginUniqueIdentifier),
true,
)
if err != nil {
return err
Expand Down Expand Up @@ -615,9 +616,11 @@ func UninstallPlugin(
if deleteResponse.Installation.RuntimeType == string(
plugin_entities.PLUGIN_RUNTIME_TYPE_LOCAL,
) {
err = manager.UninstallFromLocal(pluginUniqueIdentifier)
err = manager.UninstallFromLocal(pluginUniqueIdentifier, false)
if err != nil {
return exception.InternalServerError(fmt.Errorf("failed to uninstall plugin: %s", err.Error())).ToResponse()
return exception.InternalServerError(
fmt.Errorf("failed to uninstall plugin: %s", err.Error()),
).ToResponse()
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/entities/plugin_entities/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ type (

PluginClusterLifetime interface {
// stop the plugin
Stop()
Stop(graceful bool)
// add a function to be called when the plugin stops
OnStop(func())
// trigger the stop event
Expand Down Expand Up @@ -114,7 +114,7 @@ func (r *PluginRuntime) Stopped() bool {
return r.State.Status == PLUGIN_RUNTIME_STATUS_STOPPED
}

func (r *PluginRuntime) Stop() {
func (r *PluginRuntime) Stop(graceful bool) {
r.State.Status = PLUGIN_RUNTIME_STATUS_STOPPED
}

Expand Down