特点:
- 兼容方法重试和消息重试
- 支持泛型数据
- 支持不同重试策略
- 可自定义哪些错误重试
- 方法重试 支持泛型返回值
exec := func(ctx context.Context) (string, error) {
fmt.Println("Retry")
return "", NewRetryError("retry", errors.New("happy"))
}
retry.Do(
context.Background(),
exec,
WithStrategy(NewConstantBackoff(3, time.Second)), // 重试3次,每次间隔1s
WithShouldRetryFunc(func(err error) bool {
return true // 所有错误都可重试
}),
)- 消息重试 支持泛型消息类型
- example1: 自定义消息类型
type example struct {
Name string `json:"name"`
ID int `json:"id"`
}
var messages = []example{{Name: "a", ID: 1}, {Name: "b", ID: 2}, {Name: "c", ID: 3}}
h := retry.NewRetryHandler(
NewMessageHandler( // MessageHandler接口实现
func(ctx context.Context, msg example) error {
logrus.Info(msg)
time.Sleep(200 * time.Millisecond) // 模拟消息处理时间
return NewRetryError("", errors.New("something happened")) // 返回可重试错误
}, // Try: 尝试处理消息方法
func(ctx context.Context, msg example, err error) {
logrus.Errorf("error: %v, message: %v", err, msg)
}, // Final: 消息无法处理(如果超过重试次数或发生不可重试错误)时终极方法, 比如忽略/发送死信队列/存储数据库等
),
WithStrategy(NewConstantBackoff(3, 100*time.Millisecond)), //最多可重试3次,每次间隔100ms
)
// 处理消息
var ctx context.Context = context.Background()
for _, msg := range messages {
if msg.ID == 1 { // 消息重试3次
}
if msg.ID == 2 { // 重试2次后因context超时停止
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, 300*time.Millisecond)
defer cancel()
}
if msg.ID == 3 { // 关闭handler,消息不再可用
h.Close()
}
if err := h.Try(ctx, msg); err != nil {
logrus.Errorf("error: %v, message: %v", err, msg)
}
}- example2: redis消息处理, 也可适用于kafka或其它消息组件
// 需要先创建redis客户端
var rds redis.Client = *redis.NewClient(&redis.Options{
Addr: "localhost:6380", // error: connect refused
})
// 自定义哪些错误可以重试,默认所有
func shouldRetry func(error) bool {
return true
}
type redisItem struct {
key string
value interface{}
}
h := NewRetryHandler(
NewMessageHandler(
func(ctx context.Context, msg *redisItem) error {
err := rds.Set(ctx, msg.key, msg.value, time.Minute).Err()
if err != nil {
logrus.Errorf("error: %v, message: %v ", err, msg)
return err
}
logrus.Infof("success: %v", msg)
return nil
}, // Try
func(ctx context.Context, msg *redisItem, err error) {
logrus.Errorf("error: %v, message: %v ", err, msg)
}, // Final
),
WithStrategy(NewConstantBackoff(3, 100*time.Millisecond)),
WithShouldRetryFunc(shouldRetry),
)
for _, msg := range messages {
h.Try(context.Background(), &redisItem{
key: msg.Name,
value: msg.ID,
})
}