From 9d4871848dab6b629b6d71256db765445bacdc55 Mon Sep 17 00:00:00 2001 From: zicorn Date: Thu, 8 May 2025 20:51:03 +0800 Subject: [PATCH 1/5] =?UTF-8?q?[add]=20redis=E6=96=B0=E5=A2=9Ecluster?= =?UTF-8?q?=E4=B8=8Esentinel=20-=20cluster=E5=B7=B2=E7=BB=8F=E8=BF=87?= =?UTF-8?q?=E6=B5=8B=E8=AF=95=EF=BC=8Csentinel=E6=9A=82=E6=9C=AA=E6=B5=8B?= =?UTF-8?q?=E8=AF=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- internal/core/plugin_manager/manager.go | 12 +++++++++++- internal/types/app/config.go | 5 +++++ pkg/utils/cache/redis.go | 19 +++++++++++++++++++ pkg/utils/cache/redis_auto_type.go | 6 +++--- 4 files changed, 38 insertions(+), 4 deletions(-) diff --git a/internal/core/plugin_manager/manager.go b/internal/core/plugin_manager/manager.go index e0858eef5..269631df2 100644 --- a/internal/core/plugin_manager/manager.go +++ b/internal/core/plugin_manager/manager.go @@ -119,7 +119,17 @@ func (p *PluginManager) Launch(configuration *app.Config) { ); err != nil { log.Panic("init redis sentinel client failed: %s", err.Error()) } - } else { + } else if configuration.RedisUseClusters { + // use redis cluster mode + if err := cache.InitRedisClusterClient( + configuration.RedisClusters, + configuration.RedisClustersPassword, + configuration.RedisUseSsl, + ); err != nil { + log.Panic("init redis cluster client failed: %s", err.Error()) + } + log.Info("redis cluster client initialized") + }else { if err := cache.InitRedisClient( fmt.Sprintf("%s:%d", configuration.RedisHost, configuration.RedisPort), configuration.RedisUser, diff --git a/internal/types/app/config.go b/internal/types/app/config.go index 1808c1e68..15840091e 100644 --- a/internal/types/app/config.go +++ b/internal/types/app/config.go @@ -121,6 +121,11 @@ type Config struct { RedisSentinelPassword string `envconfig:"REDIS_SENTINEL_PASSWORD"` RedisSentinelSocketTimeout float64 `envconfig:"REDIS_SENTINEL_SOCKET_TIMEOUT"` + // redis clusters + RedisUseClusters bool `envconfig:"REDIS_USE_CLUSTERS"` + RedisClusters []string `envconfig:"REDIS_CLUSTERS"` + RedisClustersPassword string `envconfig:"REDIS_CLUSTERS_PASSWORD"` + // database DBType string `envconfig:"DB_TYPE" default:"postgresql"` DBUsername string `envconfig:"DB_USERNAME" validate:"required"` diff --git a/pkg/utils/cache/redis.go b/pkg/utils/cache/redis.go index d0130d6b0..f85cb994e 100644 --- a/pkg/utils/cache/redis.go +++ b/pkg/utils/cache/redis.go @@ -73,6 +73,25 @@ func InitRedisSentinelClient(sentinels []string, masterName, username, password, return nil } +// InitRedisClusterClient 初始化集群 Redis 客户端 +func InitRedisClusterClient(addrs []string, password string, useSsl bool) error { + opts := &redis.ClusterOptions{ + Addrs: addrs, + Password: password, + } + + if useSsl { + opts.TLSConfig = &tls.Config{} + } + + client := redis.NewClusterClient(opts) + + if _, err := client.Ping(context.Background()).Result(); err != nil { + return err + } + return nil +} + // Close the redis client func Close() error { if client == nil { diff --git a/pkg/utils/cache/redis_auto_type.go b/pkg/utils/cache/redis_auto_type.go index f151607a4..4b776d816 100644 --- a/pkg/utils/cache/redis_auto_type.go +++ b/pkg/utils/cache/redis_auto_type.go @@ -10,7 +10,7 @@ import ( // Set the value with key func AutoSet[T any](key string, value T, context ...redis.Cmdable) error { - if client == nil { + if redisClient == nil { return ErrDBNotInit } @@ -32,7 +32,7 @@ func AutoGet[T any](key string, context ...redis.Cmdable) (*T, error) { // Get the value with key, fallback to getter if not found, and set the value to cache func AutoGetWithGetter[T any](key string, getter func() (*T, error), context ...redis.Cmdable) (*T, error) { - if client == nil { + if redisClient == nil { return nil, ErrDBNotInit } @@ -66,7 +66,7 @@ func AutoGetWithGetter[T any](key string, getter func() (*T, error), context ... // Delete the value with key func AutoDelete[T any](key string, context ...redis.Cmdable) (int64, error) { - if client == nil { + if redisClient == nil { return 0, ErrDBNotInit } From b53f431713dcc1be157bc5e7dce452c939b6b886 Mon Sep 17 00:00:00 2001 From: zicorn Date: Sat, 10 May 2025 17:54:34 +0800 Subject: [PATCH 2/5] [update] convert chinese comment to english --- internal/core/plugin_manager/manager.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/internal/core/plugin_manager/manager.go b/internal/core/plugin_manager/manager.go index 269631df2..793fc71ec 100644 --- a/internal/core/plugin_manager/manager.go +++ b/internal/core/plugin_manager/manager.go @@ -128,7 +128,7 @@ func (p *PluginManager) Launch(configuration *app.Config) { ); err != nil { log.Panic("init redis cluster client failed: %s", err.Error()) } - log.Info("redis cluster client initialized") + log.Info("redis cluster client initialized") }else { if err := cache.InitRedisClient( fmt.Sprintf("%s:%d", configuration.RedisHost, configuration.RedisPort), @@ -139,6 +139,7 @@ func (p *PluginManager) Launch(configuration *app.Config) { ); err != nil { log.Panic("init redis client failed: %s", err.Error()) } + log.Info("redis standalone client initialized") } invocation, err := calldify.NewDifyInvocationDaemon( From 9ca77d8e96597e5bfa9aa617d485bc8d87861972 Mon Sep 17 00:00:00 2001 From: zicorn Date: Thu, 22 May 2025 18:30:52 +0800 Subject: [PATCH 3/5] [update] fix conflict --- .env.example | 6 ++++++ pkg/utils/cache/redis.go | 11 +++++++++-- pkg/utils/cache/redis_auto_type.go | 6 +++--- 3 files changed, 18 insertions(+), 5 deletions(-) diff --git a/.env.example b/.env.example index d248ef563..94166eb60 100644 --- a/.env.example +++ b/.env.example @@ -91,6 +91,12 @@ REDIS_SENTINEL_USERNAME= REDIS_SENTINEL_PASSWORD= REDIS_SENTINEL_SOCKET_TIMEOUT=0.1 +# redis cluster +# Cluster Format: `:,` +REDIS_USE_CLUSTERS=false +REDIS_CLUSTERS= +REDIS_CLUSTERS_PASSWORD= + DB_TYPE=postgresql DB_USERNAME=postgres DB_PASSWORD=difyai123456 diff --git a/pkg/utils/cache/redis.go b/pkg/utils/cache/redis.go index f85cb994e..1c2cb8703 100644 --- a/pkg/utils/cache/redis.go +++ b/pkg/utils/cache/redis.go @@ -84,7 +84,7 @@ func InitRedisClusterClient(addrs []string, password string, useSsl bool) error opts.TLSConfig = &tls.Config{} } - client := redis.NewClusterClient(opts) + client = redis.NewClusterClient(opts) if _, err := client.Ping(context.Background()).Result(); err != nil { return err @@ -556,8 +556,15 @@ func Publish(channel string, message any, context ...redis.Cmdable) error { } func Subscribe[T any](channel string) (<-chan T, func()) { - pubsub := client.Subscribe(ctx, channel) ch := make(chan T) + + if client == nil { + log.Error("redis client not initialized") + close(ch) + return ch, func() {} + } + + pubsub := client.Subscribe(ctx, channel) connectionEstablished := make(chan bool) go func() { diff --git a/pkg/utils/cache/redis_auto_type.go b/pkg/utils/cache/redis_auto_type.go index 4b776d816..f151607a4 100644 --- a/pkg/utils/cache/redis_auto_type.go +++ b/pkg/utils/cache/redis_auto_type.go @@ -10,7 +10,7 @@ import ( // Set the value with key func AutoSet[T any](key string, value T, context ...redis.Cmdable) error { - if redisClient == nil { + if client == nil { return ErrDBNotInit } @@ -32,7 +32,7 @@ func AutoGet[T any](key string, context ...redis.Cmdable) (*T, error) { // Get the value with key, fallback to getter if not found, and set the value to cache func AutoGetWithGetter[T any](key string, getter func() (*T, error), context ...redis.Cmdable) (*T, error) { - if redisClient == nil { + if client == nil { return nil, ErrDBNotInit } @@ -66,7 +66,7 @@ func AutoGetWithGetter[T any](key string, getter func() (*T, error), context ... // Delete the value with key func AutoDelete[T any](key string, context ...redis.Cmdable) (int64, error) { - if redisClient == nil { + if client == nil { return 0, ErrDBNotInit } From 0d2966b0c15a3a5818ef904c42757c8649dc79ef Mon Sep 17 00:00:00 2001 From: zicorn Date: Tue, 16 Dec 2025 17:45:08 +0800 Subject: [PATCH 4/5] [update] fix conflict --- pkg/utils/cache/redis.go | 15 +++++++++++++-- pkg/utils/cache/redis_test.go | 4 ++-- 2 files changed, 15 insertions(+), 4 deletions(-) diff --git a/pkg/utils/cache/redis.go b/pkg/utils/cache/redis.go index 1c2cb8703..87f8ec9d2 100644 --- a/pkg/utils/cache/redis.go +++ b/pkg/utils/cache/redis.go @@ -527,11 +527,22 @@ func Expire(key string, time time.Duration, context ...redis.Cmdable) (bool, err return getCmdable(context...).Expire(ctx, serialKey(key), time).Result() } -func Transaction(fn func(redis.Pipeliner) error) error { +func Transaction(fn func(redis.Pipeliner) error, keys ...string) error { if client == nil { return ErrDBNotInit } + // 如果没有提供 keys,则返回错误 + if len(keys) == 0 { + return errors.New("redis: Watch requires at least one key") + } + + // 将 keys 进行序列化处理 + watchKeys := make([]string, len(keys)) + for i, key := range keys { + watchKeys[i] = serialKey(key) + } + return client.Watch(ctx, func(tx *redis.Tx) error { _, err := tx.TxPipelined(ctx, func(p redis.Pipeliner) error { return fn(p) @@ -540,7 +551,7 @@ func Transaction(fn func(redis.Pipeliner) error) error { return nil } return err - }) + }, watchKeys...) } func Publish(channel string, message any, context ...redis.Cmdable) error { diff --git a/pkg/utils/cache/redis_test.go b/pkg/utils/cache/redis_test.go index fae22d56b..3448f06e7 100644 --- a/pkg/utils/cache/redis_test.go +++ b/pkg/utils/cache/redis_test.go @@ -57,7 +57,7 @@ func TestRedisTransaction(t *testing.T) { } return errors.New("test transaction error") - }) + }, strings.Join([]string{TEST_PREFIX, "key"}, ":")) if err == nil { t.Errorf("transaction should return error") @@ -93,7 +93,7 @@ func TestRedisTransaction(t *testing.T) { } return nil - }) + }, strings.Join([]string{TEST_PREFIX, "key"}, ":")) if err != nil { t.Errorf("transaction should not return error") From a39d243ee4c4fc432b6f5c5cbb6ee2c63344ff62 Mon Sep 17 00:00:00 2001 From: zicorn Date: Wed, 17 Dec 2025 16:26:13 +0800 Subject: [PATCH 5/5] [update] fix key watch --- pkg/utils/cache/redis.go | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/pkg/utils/cache/redis.go b/pkg/utils/cache/redis.go index 87f8ec9d2..3333489a9 100644 --- a/pkg/utils/cache/redis.go +++ b/pkg/utils/cache/redis.go @@ -532,12 +532,16 @@ func Transaction(fn func(redis.Pipeliner) error, keys ...string) error { return ErrDBNotInit } - // 如果没有提供 keys,则返回错误 + // Fix: If no keys provided, use plain Pipeline instead of Watch transaction if len(keys) == 0 { - return errors.New("redis: Watch requires at least one key") + _, err := client.TxPipelined(ctx, fn) + if err == redis.Nil { + return nil + } + return err } - // 将 keys 进行序列化处理 + // Serialize watch keys watchKeys := make([]string, len(keys)) for i, key := range keys { watchKeys[i] = serialKey(key)