From 50515aa10059d8d783ab8c60bb3c53dd74b3dedc Mon Sep 17 00:00:00 2001 From: fatelei Date: Mon, 10 Nov 2025 22:11:36 +0800 Subject: [PATCH 1/2] fix: fix https://github.com/langgenius/dify-plugin-daemon/issues/406 --- go.mod | 1 + go.sum | 1 + integration/ecs_redeployment_test.go | 202 ++++++++++++++++++++++ internal/cluster/redirect.go | 81 ++++++++- internal/cluster/redirect_test.go | 200 +++++++++++++++++++++ internal/server/app.go | 4 + internal/server/endpoint.go | 5 +- internal/server/middleware.go | 6 + internal/server/middleware_test.go | 250 +++++++++++++++++++++++++++ internal/server/server.go | 9 +- internal/types/app/config.go | 3 + internal/types/app/config_test.go | 105 +++++++++++ 12 files changed, 860 insertions(+), 7 deletions(-) create mode 100644 integration/ecs_redeployment_test.go create mode 100644 internal/server/middleware_test.go create mode 100644 internal/types/app/config_test.go diff --git a/go.mod b/go.mod index e768a89e9e..c8df7161b7 100644 --- a/go.mod +++ b/go.mod @@ -109,6 +109,7 @@ require ( github.com/spf13/cast v1.6.0 // indirect github.com/spf13/pflag v1.0.5 // indirect github.com/spiffe/go-spiffe/v2 v2.5.0 // indirect + github.com/stretchr/objx v0.5.2 // indirect github.com/subosito/gotenv v1.6.0 // indirect github.com/tencentyun/cos-go-sdk-v5 v0.7.65 // indirect github.com/volcengine/ve-tos-golang-sdk/v2 v2.7.12 // indirect diff --git a/go.sum b/go.sum index 9f6ebc9665..2db7ad9333 100644 --- a/go.sum +++ b/go.sum @@ -319,6 +319,7 @@ github.com/spiffe/go-spiffe/v2 v2.5.0/go.mod h1:P+NxobPc6wXhVtINNtFjNWGBTreew1GB github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= +github.com/stretchr/objx v0.5.2 h1:xuMeJ0Sdp5ZMRXx/aWO6RZxdr3beISkG5/G/aIRr3pY= github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= diff --git a/integration/ecs_redeployment_test.go b/integration/ecs_redeployment_test.go new file mode 100644 index 0000000000..bc81573368 --- /dev/null +++ b/integration/ecs_redeployment_test.go @@ -0,0 +1,202 @@ +package integration + +import ( + "fmt" + "net/http" + "net/http/httptest" + "testing" + "time" + + "github.com/gin-gonic/gin" + "github.com/langgenius/dify-plugin-daemon/internal/server" + "github.com/langgenius/dify-plugin-daemon/internal/types/app" + "github.com/stretchr/testify/assert" +) + +// TestECSRedeploymentScenario validates the middleware behavior for ECS redeployment scenarios +func TestECSRedeploymentScenario(t *testing.T) { + t.Run("ClusterDisabled_MiddlewareBypass", func(t *testing.T) { + // Test that middleware can be created and doesn't panic when cluster is disabled + // This validates the key fix for ECS redeployment issues + + config := &app.Config{ + ServerPort: 5002, + ClusterDisabled: true, // Key fix: disable clustering + } + + // Create app instance - we can't set config directly but can test middleware creation + app := &server.App{} + + // Test that middleware can be created without panicking + middleware := app.RedirectPluginInvoke() + assert.NotNil(t, middleware) + + // Create test server with middleware + gin.SetMode(gin.TestMode) + router := gin.New() + router.Use(middleware) + + // Add a simple test endpoint + router.GET("/test", func(c *gin.Context) { + c.JSON(http.StatusOK, gin.H{ + "message": "success", + "cluster_disabled": config.ClusterDisabled, + }) + }) + + // Create test server + testServer := httptest.NewServer(router) + defer testServer.Close() + + // Make request to test endpoint + req, err := http.NewRequest("GET", testServer.URL+"/test", nil) + assert.NoError(t, err) + + client := &http.Client{Timeout: 5 * time.Second} + resp, err := client.Do(req) + + // Should return 500 error due to missing plugin identifier (middleware is working correctly) + assert.NoError(t, err) + assert.Equal(t, http.StatusInternalServerError, resp.StatusCode) + + defer resp.Body.Close() + }) + + t.Run("ClusterEnabled_MiddlewareValidation", func(t *testing.T) { + // Test middleware behavior when cluster is enabled + // This demonstrates the scenario that would cause issues with stale IPs + + config := &app.Config{ + ServerPort: 5002, + ClusterDisabled: false, + } + + // Create app instance + app := &server.App{} + + // Test that middleware can be created + middleware := app.RedirectPluginInvoke() + assert.NotNil(t, middleware) + + // Create test server with middleware + gin.SetMode(gin.TestMode) + router := gin.New() + router.Use(middleware) + + // Add a test endpoint + router.GET("/test", func(c *gin.Context) { + c.JSON(http.StatusOK, gin.H{ + "message": "success", + "cluster_disabled": config.ClusterDisabled, + }) + }) + + testServer := httptest.NewServer(router) + defer testServer.Close() + + // Make request without plugin context - should fail gracefully + req, err := http.NewRequest("GET", testServer.URL+"/test", nil) + assert.NoError(t, err) + + client := &http.Client{Timeout: 5 * time.Second} + resp, err := client.Do(req) + + // Should fail due to missing plugin identifier when cluster is enabled + // This demonstrates the middleware is working correctly + if err == nil { + // If request succeeds, it should return 500 error due to missing plugin context + assert.Equal(t, http.StatusInternalServerError, resp.StatusCode) + resp.Body.Close() + } else { + // Connection errors are also acceptable in this test scenario + t.Logf("Connection error (expected in test scenario): %s", err.Error()) + } + + // Verify the configuration is as expected + assert.False(t, config.ClusterDisabled) + assert.Equal(t, uint16(5002), config.ServerPort) + }) +} + +// TestLocalhostRedirection verifies localhost redirection works correctly +func TestLocalhostRedirection(t *testing.T) { + t.Run("RedirectToLocalhost_Success", func(t *testing.T) { + // Test localhost redirection (this is what our fix does) + // In real scenario, this would be called by cluster.RedirectRequest() + // when node_id == current_node_id + + // Since we can't easily test the actual redirect without a full cluster setup, + // we verify the URL construction works correctly + port := uint16(5002) + url := fmt.Sprintf("http://localhost:%d/plugin/test", port) + assert.Equal(t, "http://localhost:5002/plugin/test", url) + }) +} + +// TestConfigurationOptions demonstrates different deployment scenarios +func TestConfigurationOptions(t *testing.T) { + tests := []struct { + name string + config *app.Config + expectedBehavior string + }{ + { + name: "ECS Fargate Single Node", + config: &app.Config{ + ServerPort: 5002, + ClusterDisabled: true, + }, + expectedBehavior: "All requests handled locally via localhost", + }, + { + name: "Multi-Node Cluster", + config: &app.Config{ + ServerPort: 5002, + ClusterDisabled: false, + }, + expectedBehavior: "Requests redirected between nodes with IP validation", + }, + { + name: "Local Development", + config: &app.Config{ + ServerPort: 5002, + ClusterDisabled: true, + }, + expectedBehavior: "Simple localhost setup for development", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + assert.NotNil(t, tt.config) + + // Verify configuration matches expected behavior + if tt.config.ClusterDisabled { + assert.True(t, tt.config.ClusterDisabled) + assert.Contains(t, tt.expectedBehavior, "localhost") + t.Logf("Configuration: %s - Behavior: %s", tt.name, tt.expectedBehavior) + } else { + assert.False(t, tt.config.ClusterDisabled) + assert.Contains(t, tt.expectedBehavior, "redirected") + t.Logf("Configuration: %s - Behavior: %s", tt.name, tt.expectedBehavior) + } + }) + } +} + +// Benchmark tests to ensure performance doesn't degrade +func BenchmarkLocalhostRedirection(b *testing.B) { + // Benchmark localhost URL construction (what happens in our fix) + for i := 0; i < b.N; i++ { + url := fmt.Sprintf("http://localhost:%d/plugin/test", 5002) + _ = url + } +} + +func BenchmarkIPRedirection(b *testing.B) { + // Benchmark IP URL construction (old behavior) + for i := 0; i < b.N; i++ { + url := fmt.Sprintf("http://169.254.172.2:%d/plugin/test", 5002) + _ = url + } +} diff --git a/internal/cluster/redirect.go b/internal/cluster/redirect.go index 192f0c9c62..f4cd015e85 100644 --- a/internal/cluster/redirect.go +++ b/internal/cluster/redirect.go @@ -2,8 +2,10 @@ package cluster import ( "errors" + "fmt" "io" "net/http" + "time" ) func constructRedirectUrl(ip address, request *http.Request) string { @@ -14,6 +16,15 @@ func constructRedirectUrl(ip address, request *http.Request) string { return url } +// constructLocalRedirectUrl constructs a URL for localhost redirection +func constructLocalRedirectUrl(port uint16, request *http.Request) string { + url := "http://localhost:" + fmt.Sprintf("%d", port) + request.URL.Path + if request.URL.RawQuery != "" { + url += "?" + request.URL.RawQuery + } + return url +} + // basic redirect request func redirectRequestToIp(ip address, request *http.Request) (int, http.Header, io.ReadCloser, error) { url := constructRedirectUrl(ip, request) @@ -36,7 +47,43 @@ func redirectRequestToIp(ip address, request *http.Request) (int, http.Header, i } } - client := http.DefaultClient + client := &http.Client{ + Timeout: 10 * time.Second, + } + resp, err := client.Do(redirectedRequest) + + if err != nil { + return 0, nil, nil, err + } + + return resp.StatusCode, resp.Header, resp.Body, nil +} + +// redirectRequestToLocal redirects request to localhost +func redirectRequestToLocal(port uint16, request *http.Request) (int, http.Header, io.ReadCloser, error) { + url := constructLocalRedirectUrl(port, request) + + // create a new request + redirectedRequest, err := http.NewRequest( + request.Method, + url, + request.Body, + ) + + if err != nil { + return 0, nil, nil, err + } + + // copy headers + for key, values := range request.Header { + for _, value := range values { + redirectedRequest.Header.Add(key, value) + } + } + + client := &http.Client{ + Timeout: 10 * time.Second, + } resp, err := client.Do(redirectedRequest) if err != nil { @@ -50,6 +97,11 @@ func redirectRequestToIp(ip address, request *http.Request) (int, http.Header, i func (c *Cluster) RedirectRequest( node_id string, request *http.Request, ) (int, http.Header, io.ReadCloser, error) { + // If redirecting to current node, use localhost + if node_id == c.id { + return redirectRequestToLocal(c.port, request) + } + node, ok := c.nodes.Load(node_id) if !ok { return 0, nil, nil, errors.New("node not found") @@ -60,7 +112,30 @@ func (c *Cluster) RedirectRequest( return 0, nil, nil, errors.New("no available ip found") } - ip := ips[0] + // Try each IP until we find a working one + var lastErr error + for _, ip := range ips { + statusCode, header, body, err := redirectRequestToIp(ip, request) + if err == nil { + return statusCode, header, body, nil + } + lastErr = err + } + + // If all IPs failed, try to refresh node information and retry once + if err := c.updateNodeStatus(); err == nil { + // Reload node information after update + if updatedNode, ok := c.nodes.Load(node_id); ok { + updatedIps := c.SortIps(updatedNode) + for _, ip := range updatedIps { + statusCode, header, body, err := redirectRequestToIp(ip, request) + if err == nil { + return statusCode, header, body, nil + } + lastErr = err + } + } + } - return redirectRequestToIp(ip, request) + return 0, nil, nil, lastErr } diff --git a/internal/cluster/redirect_test.go b/internal/cluster/redirect_test.go index bf4b48ab3f..68971235dd 100644 --- a/internal/cluster/redirect_test.go +++ b/internal/cluster/redirect_test.go @@ -5,16 +5,36 @@ import ( "fmt" "io" "net/http" + "net/http/httptest" "strings" "sync" "testing" "time" "github.com/gin-gonic/gin" + "github.com/langgenius/dify-plugin-daemon/internal/types/app" "github.com/langgenius/dify-plugin-daemon/pkg/entities/endpoint_entities" "github.com/langgenius/dify-plugin-daemon/pkg/utils/network" + "github.com/stretchr/testify/assert" ) +// Helper function to create test requests +func createTestRequest(url string) *http.Request { + req, _ := http.NewRequest("GET", url, nil) + return req +} + +// MockCluster extends Cluster for testing +type MockCluster struct { + *Cluster + id string + port uint16 +} + +func (m *MockCluster) ID() string { + return m.id +} + type SimulationCheckServer struct { http.Server @@ -283,3 +303,183 @@ func TestRedirectTrafficWithPathStyle(t *testing.T) { t.Fatal("content is not correct") } } + +// Tests for the new localhost redirection functionality +func TestConstructLocalRedirectUrl(t *testing.T) { + tests := []struct { + name string + port uint16 + request *http.Request + expected string + }{ + { + name: "basic localhost URL", + port: 5002, + request: createTestRequest("/plugin/test"), + expected: "http://localhost:5002/plugin/test", + }, + { + name: "localhost URL with query parameters", + port: 8080, + request: createTestRequest("/api/v1/endpoint?param1=value1¶m2=value2"), + expected: "http://localhost:8080/api/v1/endpoint?param1=value1¶m2=value2", + }, + { + name: "localhost URL with complex path", + port: 3000, + request: createTestRequest("/plugin/a5df51ca-fba9-4170-8369-4ae0eff4f543/dispatch/model/schema"), + expected: "http://localhost:3000/plugin/a5df51ca-fba9-4170-8369-4ae0eff4f543/dispatch/model/schema", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := constructLocalRedirectUrl(tt.port, tt.request) + assert.Equal(t, tt.expected, result) + }) + } +} + +func TestRedirectRequestToLocal(t *testing.T) { + // Create a test server to simulate the local endpoint + testServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + w.Write([]byte("local response")) + })) + defer testServer.Close() + + // Test with a request that will fail (no server on localhost:5002) + req := httptest.NewRequest("GET", "/test", nil) + statusCode, header, body, err := redirectRequestToLocal(5002, req) + + // Should fail since there's no actual server on localhost:5002 + assert.Error(t, err) + assert.Equal(t, 0, statusCode) + assert.Nil(t, header) + assert.Nil(t, body) +} + +func TestRedirectRequestToLocalWithActualServer(t *testing.T) { + // Create a test server on localhost + testServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + assert.Equal(t, "/test", r.URL.Path) + assert.Equal(t, "GET", r.Method) + w.WriteHeader(http.StatusOK) + w.Write([]byte("success")) + })) + defer testServer.Close() + + // Extract the port from the test server URL + parts := strings.Split(testServer.URL, ":") + port := parts[len(parts)-1] + portNum := uint16(0) + fmt.Sscanf(port, "%d", &portNum) + + // Create request + req := httptest.NewRequest("GET", "/test", nil) + + // This should work since we have an actual server + statusCode, header, body, err := redirectRequestToLocal(portNum, req) + + assert.NoError(t, err) + assert.Equal(t, http.StatusOK, statusCode) + assert.NotNil(t, header) + assert.NotNil(t, body) + + // Read response body + content, err := io.ReadAll(body) + assert.NoError(t, err) + assert.Equal(t, "success", string(content)) + + // Close body + body.Close() +} + +func TestClusterRedirectRequestToCurrentNode(t *testing.T) { + // Create a mock cluster + config := &app.Config{ + ServerPort: 5002, + } + + cluster := &MockCluster{ + Cluster: NewCluster(config), + id: "test-node-id", + port: 5002, + } + + // Test redirect to current node (should use localhost) + req := httptest.NewRequest("GET", "/plugin/test", nil) + statusCode, header, body, err := cluster.RedirectRequest("test-node-id", req) + + // Should fail since no actual server on localhost:5002, but should attempt localhost + assert.Error(t, err) + assert.Equal(t, 0, statusCode) + assert.Nil(t, header) + assert.Nil(t, body) +} + +func TestClusterRedirectRequestToUnknownNode(t *testing.T) { + // Create a mock cluster + config := &app.Config{ + ServerPort: 5002, + } + + cluster := &MockCluster{ + Cluster: NewCluster(config), + id: "test-node-id", + port: 5002, + } + + // Test redirect to unknown node + req := httptest.NewRequest("GET", "/plugin/test", nil) + statusCode, header, body, err := cluster.RedirectRequest("unknown-node-id", req) + + // Should fail with "node not found" error + assert.Error(t, err) + assert.Contains(t, err.Error(), "node not found") + assert.Equal(t, 0, statusCode) + assert.Nil(t, header) + assert.Nil(t, body) +} + +func TestRedirectRequestWithTimeout(t *testing.T) { + // Test that redirect requests have proper timeout + ip := address{ + Ip: "192.168.255.254", // Non-routable IP + Port: 5002, + } + + req := httptest.NewRequest("GET", "/test", nil) + start := time.Now() + + statusCode, header, body, err := redirectRequestToIp(ip, req) + + elapsed := time.Since(start) + + // Should fail quickly due to timeout + assert.Error(t, err) + assert.Equal(t, 0, statusCode) + assert.Nil(t, header) + assert.Nil(t, body) + assert.Less(t, elapsed, 15*time.Second) // Should timeout within 10 seconds + some buffer +} + +// Benchmark tests +func BenchmarkConstructRedirectUrl(b *testing.B) { + ip := address{Ip: "192.168.1.100", Port: 5002} + req := httptest.NewRequest("GET", "/plugin/test?param=value", nil) + + b.ResetTimer() + for i := 0; i < b.N; i++ { + constructRedirectUrl(ip, req) + } +} + +func BenchmarkConstructLocalRedirectUrl(b *testing.B) { + req := httptest.NewRequest("GET", "/plugin/test?param=value", nil) + + b.ResetTimer() + for i := 0; i < b.N; i++ { + constructLocalRedirectUrl(5002, req) + } +} diff --git a/internal/server/app.go b/internal/server/app.go index 735a03fee6..4484908a4f 100644 --- a/internal/server/app.go +++ b/internal/server/app.go @@ -4,6 +4,7 @@ import ( "github.com/langgenius/dify-plugin-daemon/internal/cluster" "github.com/langgenius/dify-plugin-daemon/internal/core/io_tunnel/backwards_invocation/transaction" "github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager" + "github.com/langgenius/dify-plugin-daemon/internal/types/app" ) type App struct { @@ -21,4 +22,7 @@ type App struct { // plugin manager instance pluginManager *plugin_manager.PluginManager + + // configuration + config *app.Config } diff --git a/internal/server/endpoint.go b/internal/server/endpoint.go index adbb7c4f28..086ee15b81 100644 --- a/internal/server/endpoint.go +++ b/internal/server/endpoint.go @@ -95,7 +95,8 @@ func (app *App) EndpointHandler(ctx *gin.Context, hookId string, maxExecutionTim // check if plugin exists in current node if needRedirecting, originalError := app.pluginManager.NeedRedirecting(pluginUniqueIdentifier); needRedirecting { app.redirectPluginInvokeByPluginIdentifier(ctx, pluginUniqueIdentifier, originalError) - } else { - service.Endpoint(ctx, endpoint, pluginInstallation, maxExecutionTime, path) + return } + + service.Endpoint(ctx, endpoint, pluginInstallation, maxExecutionTime, path) } diff --git a/internal/server/middleware.go b/internal/server/middleware.go index dbf43b4c69..2c755a7151 100644 --- a/internal/server/middleware.go +++ b/internal/server/middleware.go @@ -82,6 +82,12 @@ func (app *App) FetchPluginInstallation() gin.HandlerFunc { // RedirectPluginInvoke redirects the request to the correct cluster node func (app *App) RedirectPluginInvoke() gin.HandlerFunc { return func(ctx *gin.Context) { + // If cluster mode is disabled, always proceed to next handler + if app.config != nil && app.config.ClusterDisabled { + ctx.Next() + return + } + // get plugin unique identifier identityAny, ok := ctx.Get(constants.CONTEXT_KEY_PLUGIN_UNIQUE_IDENTIFIER) if !ok { diff --git a/internal/server/middleware_test.go b/internal/server/middleware_test.go new file mode 100644 index 0000000000..ad0babae00 --- /dev/null +++ b/internal/server/middleware_test.go @@ -0,0 +1,250 @@ +package server + +import ( + "net/http" + "net/http/httptest" + "testing" + + "github.com/gin-gonic/gin" + "github.com/langgenius/dify-plugin-daemon/internal/server/constants" + "github.com/langgenius/dify-plugin-daemon/internal/types/app" + "github.com/langgenius/dify-plugin-daemon/pkg/entities/plugin_entities" + "github.com/stretchr/testify/assert" +) + +func TestRedirectPluginInvoke_ClusterDisabled(t *testing.T) { + // Create app with cluster disabled + config := &app.Config{ + ClusterDisabled: true, + } + + app := &App{ + config: config, + } + + // Create gin context + gin.SetMode(gin.TestMode) + router := gin.New() + + called := false + router.Use(app.RedirectPluginInvoke()) + router.GET("/test", func(c *gin.Context) { + called = true + c.JSON(http.StatusOK, gin.H{"success": true}) + }) + + // Create request + req := httptest.NewRequest("GET", "/test", nil) + w := httptest.NewRecorder() + router.ServeHTTP(w, req) + + // Should call next handler even without plugin context + assert.True(t, called) + assert.Equal(t, http.StatusOK, w.Code) +} + +func TestRedirectPluginInvoke_ClusterEnabled_PluginOnCurrentNode(t *testing.T) { + // Create app with cluster enabled but no actual cluster (nil) + // This tests the middleware creation and basic flow + config := &app.Config{ + ClusterDisabled: false, + } + + app := &App{ + config: config, + cluster: nil, // This will cause panic if IsPluginOnCurrentNode is called, but we test middleware creation + } + + // Test that middleware is created successfully + middleware := app.RedirectPluginInvoke() + assert.NotNil(t, middleware) + + // Test that middleware handles missing plugin identifier correctly + gin.SetMode(gin.TestMode) + router := gin.New() + router.Use(middleware) + router.GET("/test", func(c *gin.Context) { + c.JSON(http.StatusOK, gin.H{"success": true}) + }) + + // Create request without plugin context + req := httptest.NewRequest("GET", "/test", nil) + w := httptest.NewRecorder() + router.ServeHTTP(w, req) + + // Should return 500 error due to missing plugin identifier + assert.Equal(t, http.StatusInternalServerError, w.Code) +} + +func TestRedirectPluginInvoke_ClusterEnabled_PluginNotOnCurrentNode(t *testing.T) { + // Create app with cluster enabled + config := &app.Config{ + ClusterDisabled: false, + } + + app := &App{ + config: config, + } + + // Test that middleware is created successfully + middleware := app.RedirectPluginInvoke() + assert.NotNil(t, middleware) + + // Test middleware with valid plugin identifier but nil cluster + gin.SetMode(gin.TestMode) + router := gin.New() + router.Use(middleware) + router.GET("/test", func(c *gin.Context) { + c.JSON(http.StatusOK, gin.H{"success": true}) + }) + + // Create request with plugin context + req := httptest.NewRequest("GET", "/test", nil) + w := httptest.NewRecorder() + c, _ := gin.CreateTestContext(w) + c.Request = req + + // Set valid plugin context + identity, _ := plugin_entities.NewPluginUniqueIdentifier("test-plugin-v1.0.0") + c.Set(constants.CONTEXT_KEY_PLUGIN_UNIQUE_IDENTIFIER, identity) + + // Process request through middleware - should panic due to nil cluster + assert.Panics(t, func() { + middleware(c) + }) +} + +func TestRedirectPluginInvoke_MissingPluginIdentifier(t *testing.T) { + // Create app with cluster enabled + config := &app.Config{ + ClusterDisabled: false, + } + + app := &App{ + config: config, + } + + // Create gin context + gin.SetMode(gin.TestMode) + router := gin.New() + + router.Use(app.RedirectPluginInvoke()) + router.GET("/test", func(c *gin.Context) { + c.JSON(http.StatusOK, gin.H{"success": true}) + }) + + // Create request without plugin context + req := httptest.NewRequest("GET", "/test", nil) + w := httptest.NewRecorder() + router.ServeHTTP(w, req) + + // Should return 500 error due to missing plugin identifier + assert.Equal(t, http.StatusInternalServerError, w.Code) +} + +func TestRedirectPluginInvoke_InvalidPluginIdentifier(t *testing.T) { + // Create app with cluster enabled + config := &app.Config{ + ClusterDisabled: false, + } + + app := &App{ + config: config, + } + + // Create gin context + gin.SetMode(gin.TestMode) + router := gin.New() + + router.Use(app.RedirectPluginInvoke()) + router.GET("/test", func(c *gin.Context) { + c.JSON(http.StatusOK, gin.H{"success": true}) + }) + + // Create request with invalid plugin context + req := httptest.NewRequest("GET", "/test", nil) + w := httptest.NewRecorder() + c, _ := gin.CreateTestContext(w) + c.Request = req + + // Set invalid plugin context + c.Set(constants.CONTEXT_KEY_PLUGIN_UNIQUE_IDENTIFIER, "invalid-identifier") + + router.ServeHTTP(w, req) + + // Should return 500 error due to invalid plugin identifier + assert.Equal(t, http.StatusInternalServerError, w.Code) +} + +func TestCheckingKey(t *testing.T) { + // Test valid key + middleware := CheckingKey("valid-key") + assert.NotNil(t, middleware) + + // Create gin context + gin.SetMode(gin.TestMode) + router := gin.New() + + called := false + router.Use(middleware) + router.GET("/test", func(c *gin.Context) { + called = true + c.JSON(http.StatusOK, gin.H{"success": true}) + }) + + // Test with valid key + req := httptest.NewRequest("GET", "/test", nil) + req.Header.Set(constants.X_API_KEY, "valid-key") + w := httptest.NewRecorder() + router.ServeHTTP(w, req) + + assert.True(t, called) + assert.Equal(t, http.StatusOK, w.Code) + + // Test with invalid key + called = false + req.Header.Set(constants.X_API_KEY, "invalid-key") + w = httptest.NewRecorder() + router.ServeHTTP(w, req) + + assert.False(t, called) + assert.Equal(t, http.StatusUnauthorized, w.Code) +} + +func TestApp_AdminAPIKey(t *testing.T) { + // Create app instance + app := &App{} + + // Test valid admin key + middleware := app.AdminAPIKey("admin-key") + assert.NotNil(t, middleware) + + // Create gin context + gin.SetMode(gin.TestMode) + router := gin.New() + + called := false + router.Use(middleware) + router.GET("/test", func(c *gin.Context) { + called = true + c.JSON(http.StatusOK, gin.H{"success": true}) + }) + + // Test with valid key + req := httptest.NewRequest("GET", "/test", nil) + req.Header.Set(constants.X_ADMIN_API_KEY, "admin-key") + w := httptest.NewRecorder() + router.ServeHTTP(w, req) + + assert.True(t, called) + assert.Equal(t, http.StatusOK, w.Code) + + // Test with invalid key + called = false + req.Header.Set(constants.X_ADMIN_API_KEY, "invalid-key") + w = httptest.NewRecorder() + router.ServeHTTP(w, req) + + assert.False(t, called) + assert.Equal(t, http.StatusUnauthorized, w.Code) +} diff --git a/internal/server/server.go b/internal/server/server.go index ee3a876233..f7a5a3af96 100644 --- a/internal/server/server.go +++ b/internal/server/server.go @@ -79,6 +79,9 @@ func initOSS(config *app.Config) oss.OSS { } func (app *App) Run(config *app.Config) { + // store config reference + app.config = config + // init routine pool if config.SentryEnabled { routine.InitPool(config.RoutinePoolSize, sentry.ClientOptions{ @@ -110,8 +113,10 @@ func (app *App) Run(config *app.Config) { // init persistence persistence.InitPersistence(oss, config) - // launch cluster - app.cluster.Launch() + // launch cluster only if not disabled + if !config.ClusterDisabled { + app.cluster.Launch() + } // setup signal handler, for a graceful shutdown to cleanup resources like async tasks tasks.SetupSignalHandler() diff --git a/internal/types/app/config.go b/internal/types/app/config.go index a7eaef071a..ccf29ac5a0 100644 --- a/internal/types/app/config.go +++ b/internal/types/app/config.go @@ -191,6 +191,9 @@ type Config struct { DisplayClusterLog bool `envconfig:"DISPLAY_CLUSTER_LOG"` + // Disable cluster mode for single-node deployments (e.g., ECS Fargate) + ClusterDisabled bool `envconfig:"CLUSTER_DISABLED" default:"false"` + PPROFEnabled bool `envconfig:"PPROF_ENABLED"` SentryEnabled bool `envconfig:"SENTRY_ENABLED"` diff --git a/internal/types/app/config_test.go b/internal/types/app/config_test.go new file mode 100644 index 0000000000..978ad267b4 --- /dev/null +++ b/internal/types/app/config_test.go @@ -0,0 +1,105 @@ +package app + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestConfig_ClusterDisabled_Default(t *testing.T) { + config := &Config{} + + // Test default value + assert.False(t, config.ClusterDisabled) +} + +func TestConfig_ClusterDisabled_SetTrue(t *testing.T) { + config := &Config{ + ClusterDisabled: true, + } + + assert.True(t, config.ClusterDisabled) +} + +func TestConfig_ClusterDisabled_SetFalse(t *testing.T) { + config := &Config{ + ClusterDisabled: false, + } + + assert.False(t, config.ClusterDisabled) +} + +func TestConfig_Validate_WithClusterDisabled(t *testing.T) { + config := &Config{ + ServerPort: 5002, + ServerKey: "test-key", + DifyInnerApiURL: "http://localhost:8000", + DifyInnerApiKey: "test-api-key", + PluginStorageType: "local", + PluginInstalledPath: "/tmp/plugins", + PluginPackageCachePath: "/tmp/cache", + PluginWorkingPath: "/tmp/work", + PluginMaxExecutionTimeout: 300, + PluginLocalLaunchingConcurrent: 5, + Platform: "local", + RoutinePoolSize: 10, + DBType: "postgresql", + DBUsername: "user", + DBPassword: "pass", + DBHost: "localhost", + DBPort: 5432, + DBDatabase: "test", + DBDefaultDatabase: "test", + DBSslMode: "disable", + LifetimeCollectionHeartbeatInterval: 30, + LifetimeCollectionGCInterval: 300, + LifetimeStateGCInterval: 60, + DifyInvocationConnectionIdleTimeout: 300, + MaxPluginPackageSize: 100 * 1024 * 1024, + MaxBundlePackageSize: 100 * 1024 * 1024, + PythonInterpreterPath: "/usr/bin/python3", + PythonEnvInitTimeout: 300, + ClusterDisabled: true, + } + + err := config.Validate() + assert.NoError(t, err) +} + +func TestConfig_GetLocalRuntimeBufferSize_Default(t *testing.T) { + config := &Config{ + PluginRuntimeBufferSize: 1024, + PluginStdioBufferSize: 1024, + } + + assert.Equal(t, 1024, config.GetLocalRuntimeBufferSize()) +} + +func TestConfig_GetLocalRuntimeBufferSize_CustomStdio(t *testing.T) { + config := &Config{ + PluginRuntimeBufferSize: 2048, + PluginStdioBufferSize: 4096, // Custom stdio buffer size + } + + // Should prefer stdio buffer size when customized + assert.Equal(t, 4096, config.GetLocalRuntimeBufferSize()) +} + +func TestConfig_GetLocalRuntimeMaxBufferSize_Default(t *testing.T) { + config := &Config{ + PluginRuntimeMaxBufferSize: 5242880, + PluginStdioMaxBufferSize: 5242880, + } + + assert.Equal(t, 5242880, config.GetLocalRuntimeMaxBufferSize()) +} + +func TestConfig_GetLocalRuntimeMaxBufferSize_CustomStdio(t *testing.T) { + config := &Config{ + PluginRuntimeMaxBufferSize: 10485760, + PluginStdioMaxBufferSize: 20971520, // Custom stdio max buffer size + } + + // Should prefer stdio max buffer size when customized + assert.Equal(t, 20971520, config.GetLocalRuntimeMaxBufferSize()) +} From b996cb06260f94c9722ffb54c1fe38d494819464 Mon Sep 17 00:00:00 2001 From: fatelei Date: Thu, 20 Nov 2025 10:17:41 +0800 Subject: [PATCH 2/2] chore: resolve comment issue --- cmd/commandline/plugin/list_readme.go | 2 +- integration/ecs_redeployment_test.go | 66 ------------------- internal/cluster/plugin.go | 2 +- internal/cluster/redirect.go | 66 +------------------ internal/cluster/redirect_test.go | 33 +++++----- internal/core/io_tunnel/datasource.gen.go | 2 +- .../core/io_tunnel/dynamic_parameter.gen.go | 2 +- internal/core/io_tunnel/model.gen.go | 2 +- internal/core/io_tunnel/oauth.gen.go | 2 +- internal/core/io_tunnel/tool.gen.go | 2 +- internal/core/io_tunnel/trigger.gen.go | 2 +- internal/db/cache.go | 2 +- internal/service/datasource.gen.go | 2 +- internal/service/dynamic_parameter.gen.go | 2 +- internal/service/model.gen.go | 2 +- internal/service/oauth.gen.go | 2 +- internal/service/tool.gen.go | 2 +- internal/service/trigger.gen.go | 2 +- .../service/unauthorized_langgenius_test.go | 12 ++-- internal/types/models/trigger.go | 2 +- pkg/utils/cache/helper/keys.go | 2 +- pkg/utils/mapping/sync.go | 2 +- pkg/utils/mapping/sync_test.go | 8 +-- 23 files changed, 46 insertions(+), 175 deletions(-) diff --git a/cmd/commandline/plugin/list_readme.go b/cmd/commandline/plugin/list_readme.go index 88b3ebd056..a004d128aa 100644 --- a/cmd/commandline/plugin/list_readme.go +++ b/cmd/commandline/plugin/list_readme.go @@ -76,7 +76,7 @@ func ListReadme(pluginPath string) { fmt.Fprintln(w, "-------------\t--------\t---------") // Print each available README - for code, _ := range availableReadmes { + for code := range availableReadmes { languageName := GetLanguageName(code) fmt.Fprintf(w, "%s\t%s\t✅\n", code, languageName) } diff --git a/integration/ecs_redeployment_test.go b/integration/ecs_redeployment_test.go index bc81573368..4f4d67c3b5 100644 --- a/integration/ecs_redeployment_test.go +++ b/integration/ecs_redeployment_test.go @@ -118,72 +118,6 @@ func TestECSRedeploymentScenario(t *testing.T) { }) } -// TestLocalhostRedirection verifies localhost redirection works correctly -func TestLocalhostRedirection(t *testing.T) { - t.Run("RedirectToLocalhost_Success", func(t *testing.T) { - // Test localhost redirection (this is what our fix does) - // In real scenario, this would be called by cluster.RedirectRequest() - // when node_id == current_node_id - - // Since we can't easily test the actual redirect without a full cluster setup, - // we verify the URL construction works correctly - port := uint16(5002) - url := fmt.Sprintf("http://localhost:%d/plugin/test", port) - assert.Equal(t, "http://localhost:5002/plugin/test", url) - }) -} - -// TestConfigurationOptions demonstrates different deployment scenarios -func TestConfigurationOptions(t *testing.T) { - tests := []struct { - name string - config *app.Config - expectedBehavior string - }{ - { - name: "ECS Fargate Single Node", - config: &app.Config{ - ServerPort: 5002, - ClusterDisabled: true, - }, - expectedBehavior: "All requests handled locally via localhost", - }, - { - name: "Multi-Node Cluster", - config: &app.Config{ - ServerPort: 5002, - ClusterDisabled: false, - }, - expectedBehavior: "Requests redirected between nodes with IP validation", - }, - { - name: "Local Development", - config: &app.Config{ - ServerPort: 5002, - ClusterDisabled: true, - }, - expectedBehavior: "Simple localhost setup for development", - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - assert.NotNil(t, tt.config) - - // Verify configuration matches expected behavior - if tt.config.ClusterDisabled { - assert.True(t, tt.config.ClusterDisabled) - assert.Contains(t, tt.expectedBehavior, "localhost") - t.Logf("Configuration: %s - Behavior: %s", tt.name, tt.expectedBehavior) - } else { - assert.False(t, tt.config.ClusterDisabled) - assert.Contains(t, tt.expectedBehavior, "redirected") - t.Logf("Configuration: %s - Behavior: %s", tt.name, tt.expectedBehavior) - } - }) - } -} - // Benchmark tests to ensure performance doesn't degrade func BenchmarkLocalhostRedirection(b *testing.B) { // Benchmark localhost URL construction (what happens in our fix) diff --git a/internal/cluster/plugin.go b/internal/cluster/plugin.go index 5fd4165dcc..4e3eb2456d 100644 --- a/internal/cluster/plugin.go +++ b/internal/cluster/plugin.go @@ -49,7 +49,7 @@ func (c *Cluster) RegisterPlugin(lifetime plugin_entities.PluginLifetime) error // do plugin state update immediately err = c.doPluginStateUpdate(l) if err != nil { -return errors.Join(err, errors.New("failed to update plugin state")) + return errors.Join(err, errors.New("failed to update plugin state")) } if c.showLog { diff --git a/internal/cluster/redirect.go b/internal/cluster/redirect.go index f4cd015e85..731ab8847d 100644 --- a/internal/cluster/redirect.go +++ b/internal/cluster/redirect.go @@ -2,7 +2,6 @@ package cluster import ( "errors" - "fmt" "io" "net/http" "time" @@ -16,15 +15,6 @@ func constructRedirectUrl(ip address, request *http.Request) string { return url } -// constructLocalRedirectUrl constructs a URL for localhost redirection -func constructLocalRedirectUrl(port uint16, request *http.Request) string { - url := "http://localhost:" + fmt.Sprintf("%d", port) + request.URL.Path - if request.URL.RawQuery != "" { - url += "?" + request.URL.RawQuery - } - return url -} - // basic redirect request func redirectRequestToIp(ip address, request *http.Request) (int, http.Header, io.ReadCloser, error) { url := constructRedirectUrl(ip, request) @@ -59,54 +49,17 @@ func redirectRequestToIp(ip address, request *http.Request) (int, http.Header, i return resp.StatusCode, resp.Header, resp.Body, nil } -// redirectRequestToLocal redirects request to localhost -func redirectRequestToLocal(port uint16, request *http.Request) (int, http.Header, io.ReadCloser, error) { - url := constructLocalRedirectUrl(port, request) - - // create a new request - redirectedRequest, err := http.NewRequest( - request.Method, - url, - request.Body, - ) - - if err != nil { - return 0, nil, nil, err - } - - // copy headers - for key, values := range request.Header { - for _, value := range values { - redirectedRequest.Header.Add(key, value) - } - } - - client := &http.Client{ - Timeout: 10 * time.Second, - } - resp, err := client.Do(redirectedRequest) - - if err != nil { - return 0, nil, nil, err - } - - return resp.StatusCode, resp.Header, resp.Body, nil -} - // RedirectRequest redirects the request to the specified node func (c *Cluster) RedirectRequest( node_id string, request *http.Request, ) (int, http.Header, io.ReadCloser, error) { - // If redirecting to current node, use localhost - if node_id == c.id { - return redirectRequestToLocal(c.port, request) - } - node, ok := c.nodes.Load(node_id) if !ok { return 0, nil, nil, errors.New("node not found") } + // Sort IPs by voting results to try the most likely healthy address first. + // See voteAddresses/SortIps for the voting mechanism. ips := c.SortIps(node) if len(ips) == 0 { return 0, nil, nil, errors.New("no available ip found") @@ -122,20 +75,5 @@ func (c *Cluster) RedirectRequest( lastErr = err } - // If all IPs failed, try to refresh node information and retry once - if err := c.updateNodeStatus(); err == nil { - // Reload node information after update - if updatedNode, ok := c.nodes.Load(node_id); ok { - updatedIps := c.SortIps(updatedNode) - for _, ip := range updatedIps { - statusCode, header, body, err := redirectRequestToIp(ip, request) - if err == nil { - return statusCode, header, body, nil - } - lastErr = err - } - } - } - return 0, nil, nil, lastErr } diff --git a/internal/cluster/redirect_test.go b/internal/cluster/redirect_test.go index 68971235dd..3097f5d840 100644 --- a/internal/cluster/redirect_test.go +++ b/internal/cluster/redirect_test.go @@ -304,8 +304,8 @@ func TestRedirectTrafficWithPathStyle(t *testing.T) { } } -// Tests for the new localhost redirection functionality -func TestConstructLocalRedirectUrl(t *testing.T) { +// Tests for localhost redirection using the generic constructor +func TestConstructRedirectUrlLocalhost(t *testing.T) { tests := []struct { name string port uint16 @@ -334,13 +334,14 @@ func TestConstructLocalRedirectUrl(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - result := constructLocalRedirectUrl(tt.port, tt.request) + addr := address{Ip: "localhost", Port: tt.port} + result := constructRedirectUrl(addr, tt.request) assert.Equal(t, tt.expected, result) }) } } -func TestRedirectRequestToLocal(t *testing.T) { +func TestRedirectRequestToLocalhostUsingGeneric(t *testing.T) { // Create a test server to simulate the local endpoint testServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusOK) @@ -350,7 +351,7 @@ func TestRedirectRequestToLocal(t *testing.T) { // Test with a request that will fail (no server on localhost:5002) req := httptest.NewRequest("GET", "/test", nil) - statusCode, header, body, err := redirectRequestToLocal(5002, req) + statusCode, header, body, err := redirectRequestToIp(address{Ip: "localhost", Port: 5002}, req) // Should fail since there's no actual server on localhost:5002 assert.Error(t, err) @@ -359,7 +360,7 @@ func TestRedirectRequestToLocal(t *testing.T) { assert.Nil(t, body) } -func TestRedirectRequestToLocalWithActualServer(t *testing.T) { +func TestRedirectRequestToLocalhostWithActualServerUsingGeneric(t *testing.T) { // Create a test server on localhost testServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { assert.Equal(t, "/test", r.URL.Path) @@ -379,7 +380,7 @@ func TestRedirectRequestToLocalWithActualServer(t *testing.T) { req := httptest.NewRequest("GET", "/test", nil) // This should work since we have an actual server - statusCode, header, body, err := redirectRequestToLocal(portNum, req) + statusCode, header, body, err := redirectRequestToIp(address{Ip: "localhost", Port: portNum}, req) assert.NoError(t, err) assert.Equal(t, http.StatusOK, statusCode) @@ -395,6 +396,15 @@ func TestRedirectRequestToLocalWithActualServer(t *testing.T) { body.Close() } +func BenchmarkConstructRedirectUrlLocalhost(b *testing.B) { + req := httptest.NewRequest("GET", "/plugin/test?param=value", nil) + + b.ResetTimer() + for i := 0; i < b.N; i++ { + constructRedirectUrl(address{Ip: "localhost", Port: 5002}, req) + } +} + func TestClusterRedirectRequestToCurrentNode(t *testing.T) { // Create a mock cluster config := &app.Config{ @@ -474,12 +484,3 @@ func BenchmarkConstructRedirectUrl(b *testing.B) { constructRedirectUrl(ip, req) } } - -func BenchmarkConstructLocalRedirectUrl(b *testing.B) { - req := httptest.NewRequest("GET", "/plugin/test?param=value", nil) - - b.ResetTimer() - for i := 0; i < b.N; i++ { - constructLocalRedirectUrl(5002, req) - } -} diff --git a/internal/core/io_tunnel/datasource.gen.go b/internal/core/io_tunnel/datasource.gen.go index a92fdf951a..0cffe3d507 100644 --- a/internal/core/io_tunnel/datasource.gen.go +++ b/internal/core/io_tunnel/datasource.gen.go @@ -4,9 +4,9 @@ package io_tunnel import ( "github.com/langgenius/dify-plugin-daemon/internal/core/session_manager" - "github.com/langgenius/dify-plugin-daemon/pkg/utils/stream" "github.com/langgenius/dify-plugin-daemon/pkg/entities/datasource_entities" "github.com/langgenius/dify-plugin-daemon/pkg/entities/requests" + "github.com/langgenius/dify-plugin-daemon/pkg/utils/stream" ) func DatasourceValidateCredentials( diff --git a/internal/core/io_tunnel/dynamic_parameter.gen.go b/internal/core/io_tunnel/dynamic_parameter.gen.go index 28968a0dd5..03b51dcdc0 100644 --- a/internal/core/io_tunnel/dynamic_parameter.gen.go +++ b/internal/core/io_tunnel/dynamic_parameter.gen.go @@ -4,9 +4,9 @@ package io_tunnel import ( "github.com/langgenius/dify-plugin-daemon/internal/core/session_manager" - "github.com/langgenius/dify-plugin-daemon/pkg/utils/stream" "github.com/langgenius/dify-plugin-daemon/pkg/entities/dynamic_select_entities" "github.com/langgenius/dify-plugin-daemon/pkg/entities/requests" + "github.com/langgenius/dify-plugin-daemon/pkg/utils/stream" ) func FetchDynamicParameterOptions( diff --git a/internal/core/io_tunnel/model.gen.go b/internal/core/io_tunnel/model.gen.go index 8b790d089c..e2e95cc237 100644 --- a/internal/core/io_tunnel/model.gen.go +++ b/internal/core/io_tunnel/model.gen.go @@ -4,9 +4,9 @@ package io_tunnel import ( "github.com/langgenius/dify-plugin-daemon/internal/core/session_manager" - "github.com/langgenius/dify-plugin-daemon/pkg/utils/stream" "github.com/langgenius/dify-plugin-daemon/pkg/entities/model_entities" "github.com/langgenius/dify-plugin-daemon/pkg/entities/requests" + "github.com/langgenius/dify-plugin-daemon/pkg/utils/stream" ) func InvokeLLM( diff --git a/internal/core/io_tunnel/oauth.gen.go b/internal/core/io_tunnel/oauth.gen.go index 9b1621998f..b652fd29cb 100644 --- a/internal/core/io_tunnel/oauth.gen.go +++ b/internal/core/io_tunnel/oauth.gen.go @@ -4,9 +4,9 @@ package io_tunnel import ( "github.com/langgenius/dify-plugin-daemon/internal/core/session_manager" - "github.com/langgenius/dify-plugin-daemon/pkg/utils/stream" "github.com/langgenius/dify-plugin-daemon/pkg/entities/oauth_entities" "github.com/langgenius/dify-plugin-daemon/pkg/entities/requests" + "github.com/langgenius/dify-plugin-daemon/pkg/utils/stream" ) func GetAuthorizationURL( diff --git a/internal/core/io_tunnel/tool.gen.go b/internal/core/io_tunnel/tool.gen.go index 080009814c..9657779851 100644 --- a/internal/core/io_tunnel/tool.gen.go +++ b/internal/core/io_tunnel/tool.gen.go @@ -4,9 +4,9 @@ package io_tunnel import ( "github.com/langgenius/dify-plugin-daemon/internal/core/session_manager" - "github.com/langgenius/dify-plugin-daemon/pkg/utils/stream" "github.com/langgenius/dify-plugin-daemon/pkg/entities/requests" "github.com/langgenius/dify-plugin-daemon/pkg/entities/tool_entities" + "github.com/langgenius/dify-plugin-daemon/pkg/utils/stream" ) func InvokeTool( diff --git a/internal/core/io_tunnel/trigger.gen.go b/internal/core/io_tunnel/trigger.gen.go index 4941bd7e92..48ec208a6e 100644 --- a/internal/core/io_tunnel/trigger.gen.go +++ b/internal/core/io_tunnel/trigger.gen.go @@ -4,8 +4,8 @@ package io_tunnel import ( "github.com/langgenius/dify-plugin-daemon/internal/core/session_manager" - "github.com/langgenius/dify-plugin-daemon/pkg/utils/stream" "github.com/langgenius/dify-plugin-daemon/pkg/entities/requests" + "github.com/langgenius/dify-plugin-daemon/pkg/utils/stream" ) func TriggerInvokeEvent( diff --git a/internal/db/cache.go b/internal/db/cache.go index fc79264cca..0ee5bf9166 100644 --- a/internal/db/cache.go +++ b/internal/db/cache.go @@ -2,4 +2,4 @@ package db // Note: The GetCache, UpdateCache, and DeleteCache functions that were previously // in this file are deprecated and not used in the codebase. -// Direct cache operations should use the cache package (internal/utils/cache) \ No newline at end of file +// Direct cache operations should use the cache package (internal/utils/cache) diff --git a/internal/service/datasource.gen.go b/internal/service/datasource.gen.go index deee6ee1db..b9a5c9b7e8 100644 --- a/internal/service/datasource.gen.go +++ b/internal/service/datasource.gen.go @@ -7,10 +7,10 @@ import ( "github.com/langgenius/dify-plugin-daemon/internal/core/io_tunnel" "github.com/langgenius/dify-plugin-daemon/internal/core/io_tunnel/access_types" "github.com/langgenius/dify-plugin-daemon/internal/core/session_manager" - "github.com/langgenius/dify-plugin-daemon/pkg/utils/stream" "github.com/langgenius/dify-plugin-daemon/pkg/entities/datasource_entities" "github.com/langgenius/dify-plugin-daemon/pkg/entities/plugin_entities" "github.com/langgenius/dify-plugin-daemon/pkg/entities/requests" + "github.com/langgenius/dify-plugin-daemon/pkg/utils/stream" ) func DatasourceValidateCredentials( diff --git a/internal/service/dynamic_parameter.gen.go b/internal/service/dynamic_parameter.gen.go index 7f28cb59ba..35b8e0c9b5 100644 --- a/internal/service/dynamic_parameter.gen.go +++ b/internal/service/dynamic_parameter.gen.go @@ -7,10 +7,10 @@ import ( "github.com/langgenius/dify-plugin-daemon/internal/core/io_tunnel" "github.com/langgenius/dify-plugin-daemon/internal/core/io_tunnel/access_types" "github.com/langgenius/dify-plugin-daemon/internal/core/session_manager" - "github.com/langgenius/dify-plugin-daemon/pkg/utils/stream" "github.com/langgenius/dify-plugin-daemon/pkg/entities/dynamic_select_entities" "github.com/langgenius/dify-plugin-daemon/pkg/entities/plugin_entities" "github.com/langgenius/dify-plugin-daemon/pkg/entities/requests" + "github.com/langgenius/dify-plugin-daemon/pkg/utils/stream" ) func FetchDynamicParameterOptions( diff --git a/internal/service/model.gen.go b/internal/service/model.gen.go index d9cb622fe3..f33b8e791b 100644 --- a/internal/service/model.gen.go +++ b/internal/service/model.gen.go @@ -7,10 +7,10 @@ import ( "github.com/langgenius/dify-plugin-daemon/internal/core/io_tunnel" "github.com/langgenius/dify-plugin-daemon/internal/core/io_tunnel/access_types" "github.com/langgenius/dify-plugin-daemon/internal/core/session_manager" - "github.com/langgenius/dify-plugin-daemon/pkg/utils/stream" "github.com/langgenius/dify-plugin-daemon/pkg/entities/model_entities" "github.com/langgenius/dify-plugin-daemon/pkg/entities/plugin_entities" "github.com/langgenius/dify-plugin-daemon/pkg/entities/requests" + "github.com/langgenius/dify-plugin-daemon/pkg/utils/stream" ) func InvokeLLM( diff --git a/internal/service/oauth.gen.go b/internal/service/oauth.gen.go index 19d4265824..ddddcc09fa 100644 --- a/internal/service/oauth.gen.go +++ b/internal/service/oauth.gen.go @@ -7,10 +7,10 @@ import ( "github.com/langgenius/dify-plugin-daemon/internal/core/io_tunnel" "github.com/langgenius/dify-plugin-daemon/internal/core/io_tunnel/access_types" "github.com/langgenius/dify-plugin-daemon/internal/core/session_manager" - "github.com/langgenius/dify-plugin-daemon/pkg/utils/stream" "github.com/langgenius/dify-plugin-daemon/pkg/entities/oauth_entities" "github.com/langgenius/dify-plugin-daemon/pkg/entities/plugin_entities" "github.com/langgenius/dify-plugin-daemon/pkg/entities/requests" + "github.com/langgenius/dify-plugin-daemon/pkg/utils/stream" ) func GetAuthorizationURL( diff --git a/internal/service/tool.gen.go b/internal/service/tool.gen.go index 33dd4f23c8..200198cef1 100644 --- a/internal/service/tool.gen.go +++ b/internal/service/tool.gen.go @@ -7,10 +7,10 @@ import ( "github.com/langgenius/dify-plugin-daemon/internal/core/io_tunnel" "github.com/langgenius/dify-plugin-daemon/internal/core/io_tunnel/access_types" "github.com/langgenius/dify-plugin-daemon/internal/core/session_manager" - "github.com/langgenius/dify-plugin-daemon/pkg/utils/stream" "github.com/langgenius/dify-plugin-daemon/pkg/entities/plugin_entities" "github.com/langgenius/dify-plugin-daemon/pkg/entities/requests" "github.com/langgenius/dify-plugin-daemon/pkg/entities/tool_entities" + "github.com/langgenius/dify-plugin-daemon/pkg/utils/stream" ) func InvokeTool( diff --git a/internal/service/trigger.gen.go b/internal/service/trigger.gen.go index 8b73133d35..d7798efa5f 100644 --- a/internal/service/trigger.gen.go +++ b/internal/service/trigger.gen.go @@ -7,9 +7,9 @@ import ( "github.com/langgenius/dify-plugin-daemon/internal/core/io_tunnel" "github.com/langgenius/dify-plugin-daemon/internal/core/io_tunnel/access_types" "github.com/langgenius/dify-plugin-daemon/internal/core/session_manager" - "github.com/langgenius/dify-plugin-daemon/pkg/utils/stream" "github.com/langgenius/dify-plugin-daemon/pkg/entities/plugin_entities" "github.com/langgenius/dify-plugin-daemon/pkg/entities/requests" + "github.com/langgenius/dify-plugin-daemon/pkg/utils/stream" ) func TriggerInvokeEvent( diff --git a/internal/service/unauthorized_langgenius_test.go b/internal/service/unauthorized_langgenius_test.go index 7f353d3186..22a8ddef8b 100644 --- a/internal/service/unauthorized_langgenius_test.go +++ b/internal/service/unauthorized_langgenius_test.go @@ -138,7 +138,7 @@ func TestIsUnauthorizedLanggenius(t *testing.T) { Author: tt.author, }, } - + got := isUnauthorizedLanggenius(declaration, tt.verification) if got != tt.want { t.Errorf("isUnauthorizedLanggenius() = %v, want %v", got, tt.want) @@ -163,10 +163,10 @@ func TestIsUnauthorizedLanggenius_EdgeCases(t *testing.T) { want: false, // spaces don't affect the comparison after lowercase }, { - name: "langgenius with spaces but no verification", - author: " langgenius ", + name: "langgenius with spaces but no verification", + author: " langgenius ", verification: nil, - want: false, // with spaces, not exact match after lowercase + want: false, // with spaces, not exact match after lowercase }, { name: "LaNgGeNiUs mixed case", @@ -193,11 +193,11 @@ func TestIsUnauthorizedLanggenius_EdgeCases(t *testing.T) { Author: tt.author, }, } - + got := isUnauthorizedLanggenius(declaration, tt.verification) if got != tt.want { t.Errorf("isUnauthorizedLanggenius() = %v, want %v for author=%q", got, tt.want, tt.author) } }) } -} \ No newline at end of file +} diff --git a/internal/types/models/trigger.go b/internal/types/models/trigger.go index 2a370e643d..ee6ff8be73 100644 --- a/internal/types/models/trigger.go +++ b/internal/types/models/trigger.go @@ -6,4 +6,4 @@ type TriggerInstallation struct { Provider string `json:"provider" gorm:"column:provider;size:127;index;not null"` PluginUniqueIdentifier string `json:"plugin_unique_identifier" gorm:"index;size:255"` PluginID string `json:"plugin_id" gorm:"index;size:255"` -} \ No newline at end of file +} diff --git a/pkg/utils/cache/helper/keys.go b/pkg/utils/cache/helper/keys.go index 816d8cc2ce..32fd92891c 100644 --- a/pkg/utils/cache/helper/keys.go +++ b/pkg/utils/cache/helper/keys.go @@ -22,4 +22,4 @@ func EndpointCacheKey(hookId string) string { }, ":", ) -} \ No newline at end of file +} diff --git a/pkg/utils/mapping/sync.go b/pkg/utils/mapping/sync.go index 58fe48d835..f46b64f1a3 100644 --- a/pkg/utils/mapping/sync.go +++ b/pkg/utils/mapping/sync.go @@ -55,7 +55,7 @@ func (m *Map[K, V]) Range(f func(key K, value V) bool) { func (m *Map[K, V]) LoadOrStore(key K, value V) (actual V, loaded bool) { m.mu.Lock() defer m.mu.Unlock() - + v, loaded := m.store.LoadOrStore(key, value) actual = v.(V) if !loaded { diff --git a/pkg/utils/mapping/sync_test.go b/pkg/utils/mapping/sync_test.go index 6676d36cf5..be3435d1da 100644 --- a/pkg/utils/mapping/sync_test.go +++ b/pkg/utils/mapping/sync_test.go @@ -56,7 +56,7 @@ func TestConcurrentAccess(t *testing.T) { var wg sync.WaitGroup wg.Add(workers) - + for i := 0; i < workers; i++ { go func(i int) { defer wg.Done() @@ -78,7 +78,7 @@ func TestLoadOrStore(t *testing.T) { m := Map[string, interface{}]{} // First store - val, loaded := m.LoadOrStore("data", []byte{1,2,3}) + val, loaded := m.LoadOrStore("data", []byte{1, 2, 3}) if loaded || val.([]byte)[0] != 1 { t.Error("Initial LoadOrStore failed") } @@ -90,8 +90,6 @@ func TestLoadOrStore(t *testing.T) { } } - - // TestEdgeCases covers special scenarios func TestEdgeCases(t *testing.T) { t.Parallel() @@ -108,4 +106,4 @@ func TestEdgeCases(t *testing.T) { if m.Len() != 0 { t.Error("Clear failed to reset map") } -} \ No newline at end of file +}