diff --git a/.env.example b/.env.example index a86896f178..c2b3f1d05e 100644 --- a/.env.example +++ b/.env.example @@ -173,6 +173,9 @@ OTEL_METRIC_EXPORT_INTERVAL=60000 OTEL_BATCH_EXPORT_TIMEOUT=10000 OTEL_METRIC_EXPORT_TIMEOUT=30000 +# prometheus metrics enabled +PROMETHEUS_ENABLED=true + # FORCE_VERIFYING_SIGNATURE, for security, you should set this to true, pls be sure you know what you are doing # if want to install plugin without verifying signature, set this to false FORCE_VERIFYING_SIGNATURE=true diff --git a/go.mod b/go.mod index f434cd31f9..28d4829651 100644 --- a/go.mod +++ b/go.mod @@ -12,6 +12,7 @@ require ( github.com/hashicorp/go-version v1.7.0 github.com/langgenius/dify-cloud-kit v0.1.1 github.com/panjf2000/ants/v2 v2.10.0 + github.com/prometheus/client_golang v1.23.2 github.com/redis/go-redis/extra/redisotel/v9 v9.17.3 github.com/redis/go-redis/v9 v9.17.3 github.com/spf13/cobra v1.8.1 @@ -25,6 +26,7 @@ require ( go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.39.0 go.opentelemetry.io/otel/sdk v1.39.0 go.opentelemetry.io/otel/sdk/metric v1.39.0 + go.opentelemetry.io/otel/trace v1.39.0 golang.org/x/tools v0.38.0 gorm.io/driver/mysql v1.5.7 gorm.io/gorm v1.30.0 @@ -70,6 +72,7 @@ require ( github.com/aws/aws-sdk-go-v2/service/sts v1.33.19 // indirect github.com/aws/smithy-go v1.22.2 // indirect github.com/aymanbagabas/go-osc52/v2 v2.0.1 // indirect + github.com/beorn7/perks v1.0.1 // indirect github.com/bytedance/gopkg v0.1.3 // indirect github.com/cenkalti/backoff/v5 v5.0.3 // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect @@ -109,7 +112,7 @@ require ( github.com/jbenet/go-context v0.0.0-20150711004518-d14ea06fba99 // indirect github.com/jinzhu/inflection v1.0.0 // indirect github.com/jinzhu/now v1.1.5 // indirect - github.com/klauspost/compress v1.17.8 // indirect + github.com/klauspost/compress v1.18.0 // indirect github.com/lucasb-eyer/go-colorful v1.2.0 // indirect github.com/magiconair/properties v1.8.7 // indirect github.com/mattn/go-localereader v0.0.1 // indirect @@ -118,11 +121,15 @@ require ( github.com/muesli/ansi v0.0.0-20230316100256-276c6243b2f6 // indirect github.com/muesli/cancelreader v0.2.2 // indirect github.com/muesli/termenv v0.15.2 // indirect + github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect github.com/paulmach/orb v0.11.1 // indirect github.com/pierrec/lz4/v4 v4.1.21 // indirect github.com/pkg/errors v0.9.1 // indirect github.com/planetscale/vtprotobuf v0.6.1-0.20240319094008-0393e58bdf10 // indirect github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect + github.com/prometheus/client_model v0.6.2 // indirect + github.com/prometheus/common v0.66.1 // indirect + github.com/prometheus/procfs v0.16.1 // indirect github.com/quic-go/qpack v0.6.0 // indirect github.com/quic-go/quic-go v0.57.1 // indirect github.com/redis/go-redis/extra/rediscmd/v9 v9.17.3 // indirect @@ -146,8 +153,8 @@ require ( go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.60.0 // indirect go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.39.0 // indirect go.opentelemetry.io/otel/metric v1.39.0 // indirect - go.opentelemetry.io/otel/trace v1.39.0 // indirect go.opentelemetry.io/proto/otlp v1.9.0 // indirect + go.yaml.in/yaml/v2 v2.4.2 // indirect golang.org/x/mod v0.29.0 // indirect golang.org/x/oauth2 v0.32.0 // indirect golang.org/x/time v0.12.0 // indirect diff --git a/go.sum b/go.sum index de92252893..d14bf1e1ae 100644 --- a/go.sum +++ b/go.sum @@ -89,6 +89,8 @@ github.com/aws/smithy-go v1.22.2 h1:6D9hW43xKFrRx/tXXfAlIZc4JI+yQe6snnWcQyxSyLQ= github.com/aws/smithy-go v1.22.2/go.mod h1:irrKGvNn1InZwb2d7fkIRNucdfwR8R+Ts3wxYa/cJHg= github.com/aymanbagabas/go-osc52/v2 v2.0.1 h1:HwpRHbFMcZLEVr42D4p7XBqjyuxQH5SMiErDT4WkJ2k= github.com/aymanbagabas/go-osc52/v2 v2.0.1/go.mod h1:uYgXzlJ7ZpABp8OJ+exZzJJhRNQ2ASbcXHWsFqH8hp8= +github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= +github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs= github.com/bsm/ginkgo/v2 v2.12.0/go.mod h1:SwYbGRRDovPVboqFv0tPTcG1sN61LM1Z4ARdbAV9g4c= github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA= @@ -250,8 +252,8 @@ github.com/kelseyhightower/envconfig v1.4.0/go.mod h1:cccZRl6mQpaq41TPp5QxidR+Sa github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= github.com/klauspost/compress v1.13.6/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk= -github.com/klauspost/compress v1.17.8 h1:YcnTYrq7MikUT7k0Yb5eceMmALQPYBW/Xltxn0NAMnU= -github.com/klauspost/compress v1.17.8/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw= +github.com/klauspost/compress v1.18.0 h1:c/Cqfb0r+Yi+JtIEq73FWXVkRonBlf0CRNYc8Zttxdo= +github.com/klauspost/compress v1.18.0/go.mod h1:2Pp+KzxcywXVXMr50+X0Q/Lsb43OQHYWRCY2AiWywWQ= github.com/klauspost/cpuid/v2 v2.3.0 h1:S4CRMLnYUhGeDFDqkGriYKdfoFlDnMtqTiI/sFzhA9Y= github.com/klauspost/cpuid/v2 v2.3.0/go.mod h1:hqwkgyIinND0mEev00jJYCxPNVRVXFQeu1XKlok6oO0= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= @@ -297,6 +299,8 @@ github.com/muesli/cancelreader v0.2.2 h1:3I4Kt4BQjOR54NavqnDogx/MIoWBFa0StPA8ELU github.com/muesli/cancelreader v0.2.2/go.mod h1:3XuTXfFS2VjM+HTLZY9Ak0l6eUKfijIfMUZ4EgX0QYo= github.com/muesli/termenv v0.15.2 h1:GohcuySI0QmI3wN8Ok9PtKGkgkFIk7y6Vpb5PvrY+Wo= github.com/muesli/termenv v0.15.2/go.mod h1:Epx+iuz8sNs7mNKhxzH4fWXGNpZwUaJKRS1noLXviQ8= +github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA= +github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ= github.com/panjf2000/ants/v2 v2.10.0 h1:zhRg1pQUtkyRiOFo2Sbqwjp0GfBNo9cUY2/Grpx1p+8= github.com/panjf2000/ants/v2 v2.10.0/go.mod h1:7ZxyxsqE4vvW0M7LSD8aI3cKwgFhBHbxnlN8mDqHa1I= github.com/panjf2000/gnet/v2 v2.5.5 h1:H+LqGgCHs2mGJq/4n6YELhMjZ027bNgd5Qb8Wj5nbrM= @@ -319,6 +323,14 @@ github.com/planetscale/vtprotobuf v0.6.1-0.20240319094008-0393e58bdf10/go.mod h1 github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U= github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/prometheus/client_golang v1.23.2 h1:Je96obch5RDVy3FDMndoUsjAhG5Edi49h0RJWRi/o0o= +github.com/prometheus/client_golang v1.23.2/go.mod h1:Tb1a6LWHB3/SPIzCoaDXI4I8UHKeFTEQ1YCr+0Gyqmg= +github.com/prometheus/client_model v0.6.2 h1:oBsgwpGs7iVziMvrGhE53c/GrLUsZdHnqNwqPLxwZyk= +github.com/prometheus/client_model v0.6.2/go.mod h1:y3m2F6Gdpfy6Ut/GBsUqTWZqCUvMVzSfMLjcu6wAwpE= +github.com/prometheus/common v0.66.1 h1:h5E0h5/Y8niHc5DlaLlWLArTQI7tMrsfQjHV+d9ZoGs= +github.com/prometheus/common v0.66.1/go.mod h1:gcaUsgf3KfRSwHY4dIMXLPV0K/Wg1oZ8+SbZk/HH/dA= +github.com/prometheus/procfs v0.16.1 h1:hZ15bTNuirocR6u0JZ6BAHHmwS1p8B4P6MRqxtzMyRg= +github.com/prometheus/procfs v0.16.1/go.mod h1:teAbpZRB1iIAJYREa1LsoWUXykVXA1KlTmWl8x/U+Is= github.com/quic-go/qpack v0.6.0 h1:g7W+BMYynC1LbYLSqRt8PBg5Tgwxn214ZZR34VIOjz8= github.com/quic-go/qpack v0.6.0/go.mod h1:lUpLKChi8njB4ty2bFLX2x4gzDqXwUpaO1DP9qMDZII= github.com/quic-go/quic-go v0.57.1 h1:25KAAR9QR8KZrCZRThWMKVAwGoiHIrNbT72ULHTuI10= @@ -446,6 +458,8 @@ go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0= go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= go.uber.org/zap v1.27.0 h1:aJMhYGrd5QSmlpLMr2MftRKl7t8J8PTZPA732ud/XR8= go.uber.org/zap v1.27.0/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E= +go.yaml.in/yaml/v2 v2.4.2 h1:DzmwEr2rDGHl7lsFgAHxmNz/1NlQ7xLIrlN2h5d1eGI= +go.yaml.in/yaml/v2 v2.4.2/go.mod h1:081UH+NErpNdqlCXm3TtEran0rJZGxAYx9hb/ELlsPU= golang.org/x/arch v0.23.0 h1:lKF64A2jF6Zd8L0knGltUnegD62JMFBiCPBmQpToHhg= golang.org/x/arch v0.23.0/go.mod h1:dNHoOeKiyja7GTvF9NJS1l3Z2yntpQNzgrjh1cU103A= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= diff --git a/internal/core/control_panel/metrics_notifier.go b/internal/core/control_panel/metrics_notifier.go new file mode 100644 index 0000000000..747fbb6a33 --- /dev/null +++ b/internal/core/control_panel/metrics_notifier.go @@ -0,0 +1,107 @@ +package controlpanel + +import ( + "github.com/langgenius/dify-plugin-daemon/internal/core/debugging_runtime" + "github.com/langgenius/dify-plugin-daemon/internal/core/local_runtime" + "github.com/langgenius/dify-plugin-daemon/pkg/entities/plugin_entities" + "github.com/langgenius/dify-plugin-daemon/pkg/utils/metrics" +) + +type MetricsNotifier struct{} + +func NewMetricsNotifier() *MetricsNotifier { + return &MetricsNotifier{} +} + +// identifiableRuntime is an interface for types that can provide their identity +type identifiableRuntime interface { + Identity() (plugin_entities.PluginUniqueIdentifier, error) +} + +func (m *MetricsNotifier) OnLocalRuntimeStarting(pluginUniqueIdentifier plugin_entities.PluginUniqueIdentifier) { + pluginID := pluginUniqueIdentifier.PluginID() + metrics.PluginRuntimeStatus.WithLabelValues( + pluginID, + "local", + ).Set(0.5) +} + +func (m *MetricsNotifier) OnLocalRuntimeReady(runtime *local_runtime.LocalPluginRuntime) { + pluginID := pluginIDFromRuntime(runtime) + metrics.PluginRuntimeStatus.WithLabelValues( + pluginID, + "local", + ).Set(1) + metrics.ActivePluginsTotal.WithLabelValues("local").Inc() +} + +func (m *MetricsNotifier) OnLocalRuntimeStartFailed(pluginUniqueIdentifier plugin_entities.PluginUniqueIdentifier, err error) { + pluginID := pluginUniqueIdentifier.PluginID() + metrics.PluginRuntimeStatus.WithLabelValues( + pluginID, + "local", + ).Set(0) + metrics.PluginInstallationsTotal.WithLabelValues( + pluginID, + "failed", + ).Inc() +} + +func (m *MetricsNotifier) OnLocalRuntimeStopped(runtime *local_runtime.LocalPluginRuntime) { + pluginID := pluginIDFromRuntime(runtime) + metrics.PluginRuntimeStatus.WithLabelValues( + pluginID, + "local", + ).Set(0) + metrics.ActivePluginsTotal.WithLabelValues("local").Dec() +} + +func (m *MetricsNotifier) OnLocalRuntimeStop(runtime *local_runtime.LocalPluginRuntime) { + pluginID := pluginIDFromRuntime(runtime) + metrics.PluginRuntimeStatus.WithLabelValues( + pluginID, + "local", + ).Set(0) +} + +func (m *MetricsNotifier) OnLocalRuntimeScaleUp(runtime *local_runtime.LocalPluginRuntime, i int32) { +} + +func (m *MetricsNotifier) OnLocalRuntimeScaleDown(runtime *local_runtime.LocalPluginRuntime, i int32) { +} + +func (m *MetricsNotifier) OnLocalRuntimeInstanceLog( + runtime *local_runtime.LocalPluginRuntime, + instance *local_runtime.PluginInstance, + event plugin_entities.PluginLogEvent, +) { +} + +func (m *MetricsNotifier) OnDebuggingRuntimeConnected(runtime *debugging_runtime.RemotePluginRuntime) { + pluginID := pluginIDFromRuntime(runtime) + metrics.PluginRuntimeStatus.WithLabelValues( + pluginID, + "remote", + ).Set(1) + metrics.ActivePluginsTotal.WithLabelValues("remote").Inc() +} + +func (m *MetricsNotifier) OnDebuggingRuntimeDisconnected(runtime *debugging_runtime.RemotePluginRuntime) { + pluginID := pluginIDFromRuntime(runtime) + metrics.PluginRuntimeStatus.WithLabelValues( + pluginID, + "remote", + ).Set(0) + metrics.ActivePluginsTotal.WithLabelValues("remote").Dec() +} + +// pluginIDFromRuntime extracts the plugin ID from any runtime that implements identifiableRuntime +func pluginIDFromRuntime(runtime identifiableRuntime) string { + if runtime == nil { + return "unknown" + } + if identity, err := runtime.Identity(); err == nil { + return identity.PluginID() + } + return "unknown" +} diff --git a/internal/core/control_panel/metrics_notifier_test.go b/internal/core/control_panel/metrics_notifier_test.go new file mode 100644 index 0000000000..36a3a52335 --- /dev/null +++ b/internal/core/control_panel/metrics_notifier_test.go @@ -0,0 +1,127 @@ +package controlpanel + +import ( + "errors" + "testing" + + "github.com/langgenius/dify-plugin-daemon/pkg/entities/plugin_entities" + "github.com/stretchr/testify/assert" +) + +func TestPluginIDFromIdentifier(t *testing.T) { + tests := []struct { + name string + identifier string + expected string + }{ + { + name: "standard plugin identifier", + identifier: "langgenius/openai:1.0.0@abc123def456789abc123def456789abcd", + expected: "langgenius/openai", + }, + { + name: "plugin without author", + identifier: "openai:1.0.0@abc123def456789abc123def456789abcd", + expected: "openai", + }, + { + name: "complex plugin identifier", + identifier: "author/my-plugin:2.1.3-beta@1234567890abcdef1234567890abcdef", + expected: "author/my-plugin", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + identifier, err := plugin_entities.NewPluginUniqueIdentifier(tt.identifier) + assert.NoError(t, err) + + result := identifier.PluginID() + assert.Equal(t, tt.expected, result) + }) + } +} + +func TestNewMetricsNotifier(t *testing.T) { + notifier := NewMetricsNotifier() + assert.NotNil(t, notifier) + + assert.IsType(t, &MetricsNotifier{}, notifier, "NewMetricsNotifier should return *MetricsNotifier type") +} + +func TestMetricsNotifier_OnLocalRuntimeStarting(t *testing.T) { + notifier := NewMetricsNotifier() + + identifier, err := plugin_entities.NewPluginUniqueIdentifier("langgenius/openai:1.0.0@abc123def456789abc123def456789abcd") + assert.NoError(t, err) + + // This should not panic + assert.NotPanics(t, func() { + notifier.OnLocalRuntimeStarting(identifier) + }) +} + +func TestMetricsNotifier_OnLocalRuntimeStartFailed(t *testing.T) { + notifier := NewMetricsNotifier() + + identifier, err := plugin_entities.NewPluginUniqueIdentifier("langgenius/openai:1.0.0@abc123def456789abc123def456789abcd") + assert.NoError(t, err) + + // This should not panic + assert.NotPanics(t, func() { + notifier.OnLocalRuntimeStartFailed(identifier, assert.AnError) + }) +} + +func TestMetricsNotifier_ScaleEvents(t *testing.T) { + notifier := NewMetricsNotifier() + + // Create a mock runtime (we can't easily create a real one in tests) + // but we can test that the methods don't panic + assert.NotPanics(t, func() { + // These methods have empty implementations currently + // but we test they exist and don't panic + type mockRuntime struct{} + notifier.OnLocalRuntimeScaleUp(nil, 1) + notifier.OnLocalRuntimeScaleDown(nil, 1) + }) +} + +func TestPluginIDFromRuntime(t *testing.T) { + t.Run("nil runtime", func(t *testing.T) { + result := pluginIDFromRuntime(nil) + assert.Equal(t, "unknown", result) + }) + + t.Run("mock runtime with successful identity", func(t *testing.T) { + identifier, _ := plugin_entities.NewPluginUniqueIdentifier("test-plugin:1.0.0@abc123def456789abc123def456789abcd") + + mockRuntime := &mockIdentifiableRuntime{ + identity: identifier, + err: nil, + } + + result := pluginIDFromRuntime(mockRuntime) + assert.Equal(t, "test-plugin", result) + }) + + t.Run("mock runtime with error", func(t *testing.T) { + mockRuntime := &mockIdentifiableRuntime{ + identity: "", + err: errors.New("identity error"), + } + + result := pluginIDFromRuntime(mockRuntime) + assert.Equal(t, "unknown", result) + }) +} + +// mockIdentifiableRuntime is a mock implementation of identifiableRuntime for testing +type mockIdentifiableRuntime struct { + identity plugin_entities.PluginUniqueIdentifier + err error +} + +func (m *mockIdentifiableRuntime) Identity() (plugin_entities.PluginUniqueIdentifier, error) { + return m.identity, m.err +} diff --git a/internal/core/plugin_manager/manager.go b/internal/core/plugin_manager/manager.go index c6bde37ffa..6c637b6f85 100644 --- a/internal/core/plugin_manager/manager.go +++ b/internal/core/plugin_manager/manager.go @@ -87,6 +87,7 @@ func InitGlobalManager(oss oss.OSS, config *app.Config) *PluginManager { // mount control panel notifiers manager.controlPanel.AddNotifier(&controlpanel.StandardLogger{}) manager.controlPanel.AddNotifier(&install_service.InstallListener{}) + manager.controlPanel.AddNotifier(controlpanel.NewMetricsNotifier()) return manager } diff --git a/internal/server/http_server.go b/internal/server/http_server.go index f28bbfcc1b..b8d2871c5a 100644 --- a/internal/server/http_server.go +++ b/internal/server/http_server.go @@ -12,6 +12,7 @@ import ( "github.com/langgenius/dify-plugin-daemon/internal/service" "github.com/langgenius/dify-plugin-daemon/internal/types/app" "github.com/langgenius/dify-plugin-daemon/pkg/utils/log" + "github.com/prometheus/client_golang/prometheus/promhttp" sentrygin "github.com/getsentry/sentry-go/gin" ) @@ -36,6 +37,9 @@ engine := gin.New() engine.NoRoute(func(c *gin.Context) { c.JSON(http.StatusNotFound, gin.H{"code": "not_found", "message": "route not found"}) }) + if config.PrometheusEnabled { + engine.Use(PrometheusMiddleware()) + } engine.GET("/health/check", controllers.HealthCheck(config)) endpointGroup := engine.Group("/e") @@ -43,6 +47,11 @@ engine := gin.New() pluginGroup := engine.Group("/plugin/:tenant_id") pprofGroup := engine.Group("/debug/pprof") + if config.PrometheusEnabled { + metricsGroup := engine.Group("/metrics") + metricsGroup.GET("/", gin.WrapH(promhttp.Handler())) + } + if config.AdminApiEnabled { if len(config.AdminApiKey) < 10 { log.Panic("length of admin api key must be greater than 10") diff --git a/internal/server/middleware.go b/internal/server/middleware.go index 5471575545..7faaeb999b 100644 --- a/internal/server/middleware.go +++ b/internal/server/middleware.go @@ -3,6 +3,8 @@ package server import ( "errors" "io" + "strconv" + "time" "github.com/gin-gonic/gin" "github.com/langgenius/dify-plugin-daemon/internal/db" @@ -13,6 +15,7 @@ import ( "github.com/langgenius/dify-plugin-daemon/pkg/utils/cache" "github.com/langgenius/dify-plugin-daemon/pkg/utils/cache/helper" "github.com/langgenius/dify-plugin-daemon/pkg/utils/log" + "github.com/langgenius/dify-plugin-daemon/pkg/utils/metrics" ) func CheckingKey(key string) gin.HandlerFunc { @@ -187,3 +190,19 @@ func (app *App) AdminAPIKey(key string) gin.HandlerFunc { ctx.Next() } } + +func PrometheusMiddleware() gin.HandlerFunc { + return func(c *gin.Context) { + start := time.Now() + path := c.FullPath() + method := c.Request.Method + + c.Next() + + status := strconv.Itoa(c.Writer.Status()) + duration := time.Since(start).Seconds() + + metrics.HTTPRequestsTotal.WithLabelValues(method, path, status).Inc() + metrics.HTTPRequestDuration.WithLabelValues(method, path).Observe(duration) + } +} diff --git a/internal/server/middleware_test.go b/internal/server/middleware_test.go new file mode 100644 index 0000000000..309c98fdd7 --- /dev/null +++ b/internal/server/middleware_test.go @@ -0,0 +1,56 @@ +package server + +import ( + "net/http" + "net/http/httptest" + "testing" + + "github.com/gin-gonic/gin" + "github.com/stretchr/testify/assert" +) + +func TestPrometheusMiddleware(t *testing.T) { + gin.SetMode(gin.TestMode) + + router := gin.New() + router.Use(PrometheusMiddleware()) + + router.GET("/test", func(c *gin.Context) { + c.String(http.StatusOK, "test") + }) + + router.POST("/api/test", func(c *gin.Context) { + c.String(http.StatusCreated, "created") + }) + + router.GET("/error", func(c *gin.Context) { + c.String(http.StatusInternalServerError, "error") + }) + + t.Run("GET request", func(t *testing.T) { + req, _ := http.NewRequest("GET", "/test", nil) + w := httptest.NewRecorder() + router.ServeHTTP(w, req) + + assert.Equal(t, http.StatusOK, w.Code) + assert.Equal(t, "test", w.Body.String()) + }) + + t.Run("POST request", func(t *testing.T) { + req, _ := http.NewRequest("POST", "/api/test", nil) + w := httptest.NewRecorder() + router.ServeHTTP(w, req) + + assert.Equal(t, http.StatusCreated, w.Code) + assert.Equal(t, "created", w.Body.String()) + }) + + t.Run("Error request", func(t *testing.T) { + req, _ := http.NewRequest("GET", "/error", nil) + w := httptest.NewRecorder() + router.ServeHTTP(w, req) + + assert.Equal(t, http.StatusInternalServerError, w.Code) + assert.Equal(t, "error", w.Body.String()) + }) +} diff --git a/internal/service/base_sse.go b/internal/service/base_sse.go index 8556fed87c..bfe99c55c3 100644 --- a/internal/service/base_sse.go +++ b/internal/service/base_sse.go @@ -12,6 +12,7 @@ import ( "github.com/langgenius/dify-plugin-daemon/pkg/entities" "github.com/langgenius/dify-plugin-daemon/pkg/entities/plugin_entities" routinepkg "github.com/langgenius/dify-plugin-daemon/pkg/routine" + "github.com/langgenius/dify-plugin-daemon/pkg/utils/metrics" "github.com/langgenius/dify-plugin-daemon/pkg/utils/parser" "github.com/langgenius/dify-plugin-daemon/pkg/utils/routine" "github.com/langgenius/dify-plugin-daemon/pkg/utils/stream" @@ -23,7 +24,9 @@ func baseSSEService[R any]( generator func() (*stream.Stream[R], error), ctx *gin.Context, max_timeout_seconds int, + onCompletion func(status string, duration float64), ) { + startTime := time.Now() writer := ctx.Writer writer.WriteHeader(200) writer.Header().Set("Content-Type", "text/event-stream") @@ -46,6 +49,10 @@ func baseSSEService[R any]( if err != nil { writeData(exception.InternalServerError(err).ToResponse()) + duration := time.Since(startTime).Seconds() + if onCompletion != nil { + onCompletion("error", duration) + } close(done) return } @@ -54,15 +61,22 @@ func baseSSEService[R any]( routinepkg.RoutineLabelKeyModule: "service", routinepkg.RoutineLabelKeyMethod: "baseSSEService", }, func() { + status := "success" for pluginDaemonResponse.Next() { chunk, err := pluginDaemonResponse.Read() if err != nil { writeData(exception.InvokePluginError(err).ToResponse()) + status = "error" break } writeData(entities.NewSuccessResponse(chunk)) } + duration := time.Since(startTime).Seconds() + if onCompletion != nil { + onCompletion(status, duration) + } + if atomic.CompareAndSwapInt32(doneClosed, 0, 1) { close(done) } @@ -78,11 +92,19 @@ func baseSSEService[R any]( select { case <-writer.CloseNotify(): pluginDaemonResponse.Close() + duration := time.Since(startTime).Seconds() + if onCompletion != nil { + onCompletion("client_disconnect", duration) + } return case <-done: return case <-timer.C: writeData(exception.InternalServerError(errors.New("killed by timeout")).ToResponse()) + duration := time.Since(startTime).Seconds() + if onCompletion != nil { + onCompletion("timeout", duration) + } if atomic.CompareAndSwapInt32(doneClosed, 0, 1) { close(done) } @@ -98,6 +120,8 @@ func baseSSEWithSession[T any, R any]( ctx *gin.Context, max_timeout_seconds int, ) { + startTime := time.Now() + session, err := createSession( request, access_type, @@ -106,6 +130,8 @@ func baseSSEWithSession[T any, R any]( ctx.Request.Context(), ) if err != nil { + duration := time.Since(startTime).Seconds() + recordPluginInvocationMetrics(request, session, access_type, access_action, "error", duration) ctx.JSON(500, exception.InternalServerError(err).ToResponse()) return } @@ -115,9 +141,81 @@ func baseSSEWithSession[T any, R any]( baseSSEService( func() (*stream.Stream[R], error) { + pluginID, runtimeType := getPluginMetricLabels(session) + + metrics.PluginInvocationsActive.WithLabelValues( + pluginID, + string(access_type), + runtimeType, + ).Inc() + return generator(session) }, ctx, max_timeout_seconds, + func(status string, duration float64) { + pluginID, runtimeType := getPluginMetricLabels(session) + + metrics.PluginInvocationsTotal.WithLabelValues( + pluginID, + string(access_type), + runtimeType, + string(access_action), + status, + ).Inc() + metrics.PluginInvocationDuration.WithLabelValues( + pluginID, + string(access_type), + runtimeType, + string(access_action), + ).Observe(duration) + + metrics.PluginInvocationsActive.WithLabelValues( + pluginID, + string(access_type), + runtimeType, + ).Dec() + }, ) } + +func getPluginMetricLabels(session *session_manager.Session) (pluginID, runtimeType string) { + pluginID = "unknown" + runtimeType = "unknown" + + if session != nil && session.Runtime() != nil { + pluginRuntime := session.Runtime() + if identity, err := pluginRuntime.Identity(); err == nil { + pluginID = identity.PluginID() + } + runtimeType = string(pluginRuntime.Type()) + } + + return +} + +func recordPluginInvocationMetrics[T any]( + request *plugin_entities.InvokePluginRequest[T], + session *session_manager.Session, + access_type access_types.PluginAccessType, + access_action access_types.PluginAccessAction, + status string, + duration float64, +) { + pluginID, runtimeType := getPluginMetricLabels(session) + + metrics.PluginInvocationsTotal.WithLabelValues( + pluginID, + string(access_type), + runtimeType, + string(access_action), + status, + ).Inc() + + metrics.PluginInvocationDuration.WithLabelValues( + pluginID, + string(access_type), + runtimeType, + string(access_action), + ).Observe(duration) +} diff --git a/internal/service/base_sse_test.go b/internal/service/base_sse_test.go new file mode 100644 index 0000000000..cf2fbfc009 --- /dev/null +++ b/internal/service/base_sse_test.go @@ -0,0 +1,23 @@ +package service + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestGetPluginMetricLabels(t *testing.T) { + t.Run("nil session", func(t *testing.T) { + pluginID, runtimeType := getPluginMetricLabels(nil) + assert.Equal(t, "unknown", pluginID) + assert.Equal(t, "unknown", runtimeType) + }) + + t.Run("session with nil runtime", func(t *testing.T) { + // This test would require creating a mock session + // For now, we just verify the function handles nil gracefully + pluginID, runtimeType := getPluginMetricLabels(nil) + assert.Equal(t, "unknown", pluginID) + assert.Equal(t, "unknown", runtimeType) + }) +} diff --git a/internal/service/install_plugin.go b/internal/service/install_plugin.go index b9fad13694..e91f608162 100644 --- a/internal/service/install_plugin.go +++ b/internal/service/install_plugin.go @@ -186,7 +186,7 @@ func ReinstallPluginFromIdentifier( }) return retStream, nil - }, ctx, 1800) + }, ctx, 1800, nil) } /* diff --git a/internal/tasks/install_plugin.go b/internal/tasks/install_plugin.go index 0776fa6800..4388cda8ad 100644 --- a/internal/tasks/install_plugin.go +++ b/internal/tasks/install_plugin.go @@ -11,6 +11,7 @@ import ( "github.com/langgenius/dify-plugin-daemon/pkg/entities/installation_entities" "github.com/langgenius/dify-plugin-daemon/pkg/entities/plugin_entities" "github.com/langgenius/dify-plugin-daemon/pkg/utils/log" + "github.com/langgenius/dify-plugin-daemon/pkg/utils/metrics" ) type PluginInstallJob struct { @@ -37,12 +38,23 @@ func ProcessInstallJob( taskIDs []string, job PluginInstallJob, ) { + startTime := time.Now() + pluginID := job.Identifier.PluginID() + status := "success" + + defer func() { + duration := time.Since(startTime).Seconds() + metrics.PluginInstallationsTotal.WithLabelValues(pluginID, status).Inc() + metrics.PluginInstallationDuration.WithLabelValues(pluginID).Observe(duration) + }() + startTasks(taskIDs) defer endTasks(taskIDs) // if the plugin does not need runtime install, just save the installation to the database if !job.NeedsRuntimeInstall { if err := SaveInstallationForTenantsToDB(tenants, job, runtimeType, source); err != nil { + status = "failed" SetTaskStatusForOnePlugin(taskIDs, job.Identifier, models.InstallTaskStatusFailed, err.Error()) return } @@ -56,6 +68,7 @@ func ProcessInstallJob( // start installation process installationStream, err := manager.Install(ctx, job.Identifier) if err != nil { + status = "failed" SetTaskStatusForOnePlugin(taskIDs, job.Identifier, models.InstallTaskStatusFailed, fmt.Sprintf("failed to start installation: %v", err)) return } @@ -66,9 +79,11 @@ func ProcessInstallJob( case installation_entities.PluginInstallEventInfo: SetTaskMessageForOnePlugin(taskIDs, job.Identifier, resp.Data) case installation_entities.PluginInstallEventError: + status = "failed" SetTaskStatusForOnePlugin(taskIDs, job.Identifier, models.InstallTaskStatusFailed, resp.Data) case installation_entities.PluginInstallEventDone: if err := SaveInstallationForTenantsToDB(tenants, job, runtimeType, source); err != nil { + status = "failed" SetTaskStatusForOnePlugin(taskIDs, job.Identifier, models.InstallTaskStatusFailed, err.Error()) return } @@ -84,6 +99,7 @@ func ProcessInstallJob( } }) if err != nil { + status = "failed" SetTaskStatusForOnePlugin(taskIDs, job.Identifier, models.InstallTaskStatusFailed, err.Error()) } } diff --git a/internal/types/app/config.go b/internal/types/app/config.go index 0563887222..b6f3db4e0f 100644 --- a/internal/types/app/config.go +++ b/internal/types/app/config.go @@ -218,6 +218,8 @@ type Config struct { OtelBatchExportTimeoutMS int `envconfig:"OTEL_BATCH_EXPORT_TIMEOUT" default:"10000"` OtelMetricExportTimeoutMS int `envconfig:"OTEL_METRIC_EXPORT_TIMEOUT" default:"30000"` + PrometheusEnabled bool `envconfig:"PROMETHEUS_ENABLED" default:"true"` + SentryEnabled bool `envconfig:"SENTRY_ENABLED"` SentryDSN string `envconfig:"SENTRY_DSN"` SentryAttachStacktrace bool `envconfig:"SENTRY_ATTACH_STACKTRACE"` diff --git a/pkg/utils/metrics/metrics.go b/pkg/utils/metrics/metrics.go new file mode 100644 index 0000000000..e7a8add63a --- /dev/null +++ b/pkg/utils/metrics/metrics.go @@ -0,0 +1,115 @@ +package metrics + +import ( + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" +) + +var ( + HTTPRequestsTotal = promauto.NewCounterVec( + prometheus.CounterOpts{ + Name: "plugin_http_requests_total", + Help: "Total number of HTTP requests", + }, + []string{"method", "endpoint", "status"}, + ) + + HTTPRequestDuration = promauto.NewHistogramVec( + prometheus.HistogramOpts{ + Name: "plugin_http_request_duration_seconds", + Help: "HTTP request latency in seconds", + Buckets: prometheus.DefBuckets, + }, + []string{"method", "endpoint"}, + ) + + PluginInvocationsTotal = promauto.NewCounterVec( + prometheus.CounterOpts{ + Name: "plugin_invocations_total", + Help: "Total number of plugin invocations", + }, + []string{"plugin_id", "plugin_type", "runtime_type", "operation", "status"}, + ) + + PluginInvocationDuration = promauto.NewHistogramVec( + prometheus.HistogramOpts{ + Name: "plugin_invocation_duration_seconds", + Help: "Plugin invocation latency in seconds", + Buckets: prometheus.DefBuckets, + }, + []string{"plugin_id", "plugin_type", "runtime_type", "operation"}, + ) + + PluginInvocationsActive = promauto.NewGaugeVec( + prometheus.GaugeOpts{ + Name: "plugin_invocations_active", + Help: "Number of active plugin invocations", + }, + []string{"plugin_id", "plugin_type", "runtime_type"}, + ) + + PluginInstallationsTotal = promauto.NewCounterVec( + prometheus.CounterOpts{ + Name: "plugin_installations_total", + Help: "Total number of plugin installations", + }, + []string{"plugin_id", "status"}, + ) + + PluginInstallationDuration = promauto.NewHistogramVec( + prometheus.HistogramOpts{ + Name: "plugin_installation_duration_seconds", + Help: "Plugin installation latency in seconds", + Buckets: prometheus.DefBuckets, + }, + []string{"plugin_id"}, + ) + + PluginRuntimeStatus = promauto.NewGaugeVec( + prometheus.GaugeOpts{ + Name: "plugin_runtime_status", + Help: "Current status of plugin runtime (1=active, 0.5=launching, 0=stopped)", + }, + []string{"plugin_id", "runtime_type"}, + ) + + PluginRestartsTotal = promauto.NewCounterVec( + prometheus.CounterOpts{ + Name: "plugin_restarts_total", + Help: "Total number of plugin restarts", + }, + []string{"plugin_id", "runtime_type"}, + ) + + ActivePluginsTotal = promauto.NewGaugeVec( + prometheus.GaugeOpts{ + Name: "active_plugins_total", + Help: "Total number of active plugins", + }, + []string{"runtime_type"}, + ) + + ActiveSessionsTotal = promauto.NewGauge( + prometheus.GaugeOpts{ + Name: "active_sessions_total", + Help: "Total number of active sessions", + }, + ) + + StorageOperationsTotal = promauto.NewCounterVec( + prometheus.CounterOpts{ + Name: "storage_operations_total", + Help: "Total number of storage operations", + }, + []string{"operation", "storage_type", "status"}, + ) + + StorageOperationDuration = promauto.NewHistogramVec( + prometheus.HistogramOpts{ + Name: "storage_operation_duration_seconds", + Help: "Storage operation latency in seconds", + Buckets: prometheus.DefBuckets, + }, + []string{"operation", "storage_type"}, + ) +) diff --git a/pkg/utils/parser/identity.go b/pkg/utils/parser/identity.go index 8918c76a50..a434501d12 100644 --- a/pkg/utils/parser/identity.go +++ b/pkg/utils/parser/identity.go @@ -1,10 +1,8 @@ package parser -import "fmt" - func MarshalPluginID(author string, name string, version string) string { if author == "" { - return fmt.Sprintf("%s:%s", name, version) + return name + ":" + version } - return fmt.Sprintf("%s/%s:%s", author, name, version) + return author + "/" + name + ":" + version }