Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,12 @@ REDIS_SENTINEL_USERNAME=
REDIS_SENTINEL_PASSWORD=
REDIS_SENTINEL_SOCKET_TIMEOUT=0.1

# redis cluster
# Cluster Format: `<ip1>:<port1>,<ip2:<port2>`
REDIS_USE_CLUSTERS=false
REDIS_CLUSTERS=
REDIS_CLUSTERS_PASSWORD=

DB_TYPE=postgresql
DB_USERNAME=postgres
DB_PASSWORD=difyai123456
Expand Down
13 changes: 12 additions & 1 deletion internal/core/plugin_manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,17 @@ func (p *PluginManager) Launch(configuration *app.Config) {
); err != nil {
log.Panic("init redis sentinel client failed: %s", err.Error())
}
} else {
} else if configuration.RedisUseClusters {
// use redis cluster mode
if err := cache.InitRedisClusterClient(
configuration.RedisClusters,
configuration.RedisClustersPassword,
configuration.RedisUseSsl,
); err != nil {
log.Panic("init redis cluster client failed: %s", err.Error())
}
log.Info("redis cluster client initialized")
}else {
if err := cache.InitRedisClient(
fmt.Sprintf("%s:%d", configuration.RedisHost, configuration.RedisPort),
configuration.RedisUser,
Expand All @@ -129,6 +139,7 @@ func (p *PluginManager) Launch(configuration *app.Config) {
); err != nil {
log.Panic("init redis client failed: %s", err.Error())
}
log.Info("redis standalone client initialized")
}

invocation, err := calldify.NewDifyInvocationDaemon(
Expand Down
5 changes: 5 additions & 0 deletions internal/types/app/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,11 @@ type Config struct {
RedisSentinelPassword string `envconfig:"REDIS_SENTINEL_PASSWORD"`
RedisSentinelSocketTimeout float64 `envconfig:"REDIS_SENTINEL_SOCKET_TIMEOUT"`

// redis clusters
RedisUseClusters bool `envconfig:"REDIS_USE_CLUSTERS"`
RedisClusters []string `envconfig:"REDIS_CLUSTERS"`
RedisClustersPassword string `envconfig:"REDIS_CLUSTERS_PASSWORD"`

// database
DBType string `envconfig:"DB_TYPE" default:"postgresql"`
DBUsername string `envconfig:"DB_USERNAME" validate:"required"`
Expand Down
47 changes: 44 additions & 3 deletions pkg/utils/cache/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,25 @@ func InitRedisSentinelClient(sentinels []string, masterName, username, password,
return nil
}

// InitRedisClusterClient 初始化集群 Redis 客户端
func InitRedisClusterClient(addrs []string, password string, useSsl bool) error {
opts := &redis.ClusterOptions{
Addrs: addrs,
Password: password,
}

if useSsl {
opts.TLSConfig = &tls.Config{}
}

client = redis.NewClusterClient(opts)

if _, err := client.Ping(context.Background()).Result(); err != nil {
return err
}
return nil
}

// Close the redis client
func Close() error {
if client == nil {
Expand Down Expand Up @@ -508,11 +527,26 @@ func Expire(key string, time time.Duration, context ...redis.Cmdable) (bool, err
return getCmdable(context...).Expire(ctx, serialKey(key), time).Result()
}

func Transaction(fn func(redis.Pipeliner) error) error {
func Transaction(fn func(redis.Pipeliner) error, keys ...string) error {
if client == nil {
return ErrDBNotInit
}

// Fix: If no keys provided, use plain Pipeline instead of Watch transaction
if len(keys) == 0 {
_, err := client.TxPipelined(ctx, fn)
if err == redis.Nil {
return nil
}
return err
}

// Serialize watch keys
watchKeys := make([]string, len(keys))
for i, key := range keys {
watchKeys[i] = serialKey(key)
}

return client.Watch(ctx, func(tx *redis.Tx) error {
_, err := tx.TxPipelined(ctx, func(p redis.Pipeliner) error {
return fn(p)
Expand All @@ -521,7 +555,7 @@ func Transaction(fn func(redis.Pipeliner) error) error {
return nil
}
return err
})
}, watchKeys...)
}

func Publish(channel string, message any, context ...redis.Cmdable) error {
Expand All @@ -537,8 +571,15 @@ func Publish(channel string, message any, context ...redis.Cmdable) error {
}

func Subscribe[T any](channel string) (<-chan T, func()) {
pubsub := client.Subscribe(ctx, channel)
ch := make(chan T)

if client == nil {
log.Error("redis client not initialized")
close(ch)
return ch, func() {}
}

pubsub := client.Subscribe(ctx, channel)
connectionEstablished := make(chan bool)

go func() {
Expand Down
4 changes: 2 additions & 2 deletions pkg/utils/cache/redis_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func TestRedisTransaction(t *testing.T) {
}

return errors.New("test transaction error")
})
}, strings.Join([]string{TEST_PREFIX, "key"}, ":"))

if err == nil {
t.Errorf("transaction should return error")
Expand Down Expand Up @@ -93,7 +93,7 @@ func TestRedisTransaction(t *testing.T) {
}

return nil
})
}, strings.Join([]string{TEST_PREFIX, "key"}, ":"))

if err != nil {
t.Errorf("transaction should not return error")
Expand Down