Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 93 additions & 0 deletions cmd/slim/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
package main

import (
"bytes"
"encoding/json"
"flag"
"io"
"log/slog"
"os"

"github.com/langgenius/dify-plugin-daemon/pkg/slim"
)

func main() {
slog.SetDefault(slog.New(slog.NewTextHandler(io.Discard, nil)))
id := flag.String("id", "", "plugin unique identifier")
action := flag.String("action", "", "plugin access action")
args := flag.String("args", "", "plugin invocation parameters (JSON); if omitted, read from stdin")
configFile := flag.String("config", "", "path to JSON config file (replaces env vars)")
flag.Parse()

if *id == "" || *action == "" {
fatal(slim.NewError(slim.ErrInvalidInput, "usage: slim -id <id> -action <action> [-args '<json>'] [-config <path>]"))
}

argsJSON := *args
if argsJSON == "" {
b, err := io.ReadAll(os.Stdin)
if err != nil {
fatal(slim.NewError(slim.ErrInvalidInput, "failed to read stdin: "+err.Error()))
}
if len(bytes.TrimSpace(b)) == 0 {
fatal(slim.NewError(slim.ErrInvalidInput, "no -args flag and no JSON on stdin"))
}
argsJSON = string(b)
}

ctx, err := slim.NewInvokeContext(*id, *action, argsJSON)
if err != nil {
fatal(err)
}

var cfg *slim.SlimConfig
if *configFile != "" {
cfg, err = slim.LoadConfigFromFile(*configFile)
} else {
cfg, err = slim.LoadConfig()
}
if err != nil {
fatal(err)
}

out := slim.NewOutputWriter(os.Stdout)

switch cfg.Mode {
case slim.ModeLocal:
err = slim.RunLocal(ctx, &cfg.Local, out)
case slim.ModeRemote:
err = slim.RunRemote(ctx, &cfg.Remote, out)
default:
err = slim.NewError(slim.ErrUnknownMode, cfg.Mode)
}

if err != nil {
fatal(err)
}
}

func fatal(err error) {
exitCode := slim.ExitPluginError
var errorToMarshal *slim.SlimError

if se, ok := err.(*slim.SlimError); ok {
errorToMarshal = se
exitCode = se.ExitCode()
} else {
// Wrap non-SlimError types to ensure they are marshalled to JSON correctly.
errorToMarshal = slim.NewError(slim.ErrPluginExec, err.Error())
}

b, marshalErr := json.Marshal(errorToMarshal)
if marshalErr != nil {
// This should be practically impossible since SlimError is designed for JSON.
// As a last resort, print a hardcoded error message.
os.Stderr.Write([]byte(`{"code":"INTERNAL_ERROR","message":"failed to marshal error to JSON"}`))
os.Stderr.Write([]byte("\n"))
os.Exit(slim.ExitPluginError)
}

os.Stderr.Write(b)
os.Stderr.Write([]byte("\n"))
os.Exit(exitCode)
}
42 changes: 42 additions & 0 deletions internal/core/local_runtime/constructor_slim.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package local_runtime

import (
"sync"

"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager/basic_runtime"
"github.com/langgenius/dify-plugin-daemon/internal/types/app"
"github.com/langgenius/dify-plugin-daemon/pkg/entities/plugin_entities"
"github.com/langgenius/dify-plugin-daemon/pkg/plugin_packager/decoder"
)

// NewLocalPluginRuntime creates a LocalPluginRuntime with a known working path.
// Unlike ConstructPluginRuntime, it does not compute a checksum-based path,
// which avoids walking the full directory tree (including .venv / .uv-cache).
func NewLocalPluginRuntime(
appConfig *app.Config,
pluginDecoder decoder.PluginDecoder,
manifest plugin_entities.PluginDeclaration,
workingPath string,
) *LocalPluginRuntime {
return &LocalPluginRuntime{
PluginRuntime: plugin_entities.PluginRuntime{
Config: manifest,
State: plugin_entities.PluginRuntimeState{
Status: plugin_entities.PLUGIN_RUNTIME_STATUS_PENDING,
Verified: manifest.Verified,
WorkingPath: workingPath,
},
},
BasicChecksum: basic_runtime.BasicChecksum{
Decoder: pluginDecoder,
},
scheduleStatus: ScheduleStatusStopped,
defaultPythonInterpreterPath: appConfig.PythonInterpreterPath,
uvPath: appConfig.UvPath,
appConfig: appConfig,
instances: []*PluginInstance{},
instanceLocker: &sync.RWMutex{},
notifiers: []PluginRuntimeNotifier{},
notifierLock: &sync.Mutex{},
}
}
7 changes: 4 additions & 3 deletions internal/server/constants/constants.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
package constants

const (
X_PLUGIN_ID = "X-Plugin-ID"
X_API_KEY = "X-Api-Key"
X_ADMIN_API_KEY = "X-Admin-Api-Key"
X_PLUGIN_ID = "X-Plugin-ID"
X_API_KEY = "X-Api-Key"
X_ADMIN_API_KEY = "X-Admin-Api-Key"
PluginUniqueIdentifier = "X-Plugin-Unique-Identifier"

CONTEXT_KEY_PLUGIN_INSTALLATION = "plugin_installation"
CONTEXT_KEY_PLUGIN_UNIQUE_IDENTIFIER = "plugin_unique_identifier"
Expand Down
18 changes: 17 additions & 1 deletion internal/server/http_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (

// server starts a http server and returns a function to stop it
func (app *App) server(config *app.Config) func() {
engine := gin.New()
engine := gin.New()
engine.Use(log.RecoveryMiddleware())
engine.Use(log.TraceMiddleware())
// OpenTelemetry middleware (extracts upstream trace context and starts server spans)
Expand All @@ -42,6 +42,7 @@ engine := gin.New()
serverlessTransactionGroup := engine.Group("/backwards-invocation")
pluginGroup := engine.Group("/plugin/:tenant_id")
pprofGroup := engine.Group("/debug/pprof")
invokeGroup := engine.Group("/v2/invoke")

if config.AdminApiEnabled {
if len(config.AdminApiKey) < 10 {
Expand Down Expand Up @@ -72,6 +73,7 @@ engine := gin.New()
app.serverlessTransactionGroup(serverlessTransactionGroup, config)
app.pluginGroup(pluginGroup, config)
app.pprofGroup(pprofGroup, config)
app.invokeGroup(invokeGroup, config)

srv := &http.Server{
Addr: fmt.Sprintf("%s:%d", config.ServerHost, config.ServerPort),
Expand Down Expand Up @@ -212,3 +214,17 @@ func (app *App) pprofGroup(group *gin.RouterGroup, config *app.Config) {
group.GET("/threadcreate", controllers.PprofThreadcreate)
}
}

func (app *App) invokeGroup(group *gin.RouterGroup, config *app.Config) {
group.Use(CheckingKey(config.ServerKey))
dispatchGroup := group.Group("/dispatch")
dispatchGroup.Use(controllers.CollectActiveDispatchRequests())
dispatchGroup.Use(app.FetchPluginDirect())
dispatchGroup.Use(app.RedirectPluginInvoke())
dispatchGroup.Use(app.InitClusterID())

dispatchGroup.POST("/agent_strategy/invoke",
controllers.InvokeAgentStrategy(config))

app.setupGeneratedRoutes(dispatchGroup, config)
}
24 changes: 23 additions & 1 deletion internal/server/middleware.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,10 +182,32 @@ func (app *App) InitClusterID() gin.HandlerFunc {
func (app *App) AdminAPIKey(key string) gin.HandlerFunc {
return func(ctx *gin.Context) {
if ctx.GetHeader(constants.X_ADMIN_API_KEY) != key {
ctx.AbortWithStatusJSON(401, gin.H{"message": "unauthorized"})
ctx.AbortWithStatusJSON(
401,
exception.UnauthorizedError().ToResponse())
return
}
ctx.Next()
}
}

func (app *App) FetchPluginDirect() gin.HandlerFunc {
return func(ctx *gin.Context) {
identifier := ctx.Request.Header.Get(constants.PluginUniqueIdentifier)
if identifier == "" {
ctx.AbortWithStatusJSON(
400,
exception.BadRequestError(errors.New("X-Plugin-Unique-Identifier header is required")).ToResponse())
return
}
pluginID, err := plugin_entities.NewPluginUniqueIdentifier(identifier)
if err != nil {
ctx.AbortWithStatusJSON(400,
exception.UniqueIdentifierError(err).ToResponse(),
)
return
}
ctx.Set(constants.CONTEXT_KEY_PLUGIN_UNIQUE_IDENTIFIER, pluginID)
ctx.Next()
}
}
159 changes: 159 additions & 0 deletions pkg/slim/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
package slim

import (
"encoding/json"
"os"

"github.com/google/uuid"
"github.com/langgenius/dify-plugin-daemon/internal/types/app"
)

type RequestMeta struct {
TenantID string `json:"tenant_id"`
UserID string `json:"user_id"`
Data json.RawMessage `json:"data"`
}

type LocalConfig struct {
Folder string `json:"folder"`
PythonPath string `json:"python_path"`
UvPath string `json:"uv_path"`
PythonEnvInitTimeout int `json:"python_env_init_timeout"`
MaxExecutionTimeout int `json:"max_execution_timeout"`
PipMirrorURL string `json:"pip_mirror_url"`
PipExtraArgs string `json:"pip_extra_args"`
MarketplaceURL string `json:"marketplace_url"`
}

type RemoteConfig struct {
DaemonAddr string `json:"daemon_addr"`
DaemonKey string `json:"daemon_key"`
}

type InvokeContext struct {
PluginID string
Action string
Request RequestMeta
}

type SlimConfig struct {
Mode string `json:"mode"`
Local LocalConfig `json:"local"`
Remote RemoteConfig `json:"remote"`
}

func NewInvokeContext(id, action, argsJSON string) (*InvokeContext, error) {
var req RequestMeta
if err := json.Unmarshal([]byte(argsJSON), &req); err != nil {
return nil, NewError(ErrInvalidArgsJSON, err.Error())
}
if req.TenantID == "" {
req.TenantID = uuid.Nil.String()
}
return &InvokeContext{
PluginID: id,
Action: action,
Request: req,
}, nil
}

func LoadConfigFromFile(path string) (*SlimConfig, error) {
data, err := os.ReadFile(path)
if err != nil {
return nil, NewError(ErrConfigLoad, err.Error())
}

var cfg SlimConfig
if err := json.Unmarshal(data, &cfg); err != nil {
return nil, NewError(ErrConfigLoad, err.Error())
}

if err := fillDefaults(&cfg); err != nil {
return nil, err
}
return &cfg, nil
}

func LoadConfig() (*SlimConfig, error) {
cfg := &SlimConfig{
Mode: env("SLIM_MODE", ModeRemote),
}

switch cfg.Mode {
case ModeLocal:
cfg.Local = LocalConfig{
Folder: env("SLIM_FOLDER", ""),
PythonPath: env("SLIM_PYTHON_PATH", ""),
UvPath: env("SLIM_UV_PATH", ""),
PythonEnvInitTimeout: envInt("SLIM_PYTHON_ENV_INIT_TIMEOUT", 0),
MaxExecutionTimeout: envInt("SLIM_MAX_EXECUTION_TIMEOUT", 0),
PipMirrorURL: env("SLIM_PIP_MIRROR_URL", ""),
PipExtraArgs: env("SLIM_PIP_EXTRA_ARGS", ""),
MarketplaceURL: env("SLIM_MARKETPLACE_URL", ""),
}
case ModeRemote:
cfg.Remote = RemoteConfig{
DaemonAddr: env("SLIM_DAEMON_ADDR", ""),
DaemonKey: env("SLIM_DAEMON_KEY", ""),
}
}

if err := fillDefaults(cfg); err != nil {
return nil, err
}
return cfg, nil
}

func (lc *LocalConfig) toAppConfig() *app.Config {
return &app.Config{
PluginWorkingPath: lc.Folder,
PluginInstalledPath: lc.Folder,
PluginPackageCachePath: lc.Folder,
PythonInterpreterPath: lc.PythonPath,
UvPath: lc.UvPath,
PythonEnvInitTimeout: lc.PythonEnvInitTimeout,
PluginMaxExecutionTimeout: lc.MaxExecutionTimeout,
PipMirrorUrl: lc.PipMirrorURL,
PipExtraArgs: lc.PipExtraArgs,
PipPreferBinary: true,
PipVerbose: true,
PluginRuntimeBufferSize: 1024,
PluginRuntimeMaxBufferSize: 5242880,
Platform: app.PLATFORM_LOCAL,
}
}

func fillDefaults(cfg *SlimConfig) error {
if cfg.Mode == "" {
cfg.Mode = ModeRemote
}

if cfg.Mode == ModeLocal {
if cfg.Local.Folder == "" {
return NewError(ErrConfigInvalid, "local.folder is required")
}
if cfg.Local.PythonPath == "" {
cfg.Local.PythonPath = "python3"
}
if cfg.Local.PythonEnvInitTimeout == 0 {
cfg.Local.PythonEnvInitTimeout = 120
}
if cfg.Local.MaxExecutionTimeout == 0 {
cfg.Local.MaxExecutionTimeout = 600
}
if cfg.Local.MarketplaceURL == "" {
cfg.Local.MarketplaceURL = "https://marketplace.dify.ai"
}
}

if cfg.Mode == ModeRemote {
if cfg.Remote.DaemonAddr == "" {
return NewError(ErrConfigInvalid, "remote.daemon_addr is required")
}
if cfg.Remote.DaemonKey == "" {
return NewError(ErrConfigInvalid, "remote.daemon_key is required")
}
}

return nil
}
Loading
Loading