BatchFlow 采用分层架构设计,通过统一的 BatchExecutor 接口支持多种数据源类型,同时为不同类型的数据源提供最适合的实现方式。
flowchart TB
A[Application] --> B["BatchFlow<br/><span style='font-size:12px;color:#888'>用户API层</span>"]
B --> C["gopipeline<br/><span style='font-size:12px;color:#888'>异步批量处理</span>"]
C --> D["BatchExecutor<br/><span style='font-size:12px;color:#888'>统一执行接口</span>"]
D --> E[SQL数据库]
D --> F[NoSQL数据库]
- BatchExecutor 作为所有数据源驱动的统一接口
- 不同类型数据源可选择最适合的实现方式
- 保持API一致性的同时避免过度抽象
- BatchProcessor 不是必须的,仅用于SQL数据库的代码复用
- NoSQL数据库可直接实现 BatchExecutor,避免不必要的抽象层
- 测试环境使用 MockExecutor 直接实现
- BatchFlow: 用户API和管道管理
- BatchExecutor: 执行控制和指标收集
- BatchProcessor: SQL数据库的核心处理逻辑(可选)
- SQLDriver: 数据库特定的SQL生成
| 数据源类型 | 实现方式 | 架构路径 | 优势 |
|---|---|---|---|
| SQL数据库 (MySQL/PostgreSQL/SQLite) |
ThrottledBatchExecutor(可选限流 WithConcurrencyLimit) + BatchProcessor + SQLDriver | BatchFlow → ThrottledBatchExecutor → BatchProcessor → SQLDriver → DB | 代码复用、标准化、易扩展、可节流 |
| NoSQL数据库 (Redis/MongoDB) |
直接实现BatchExecutor | BatchFlow → CustomExecutor → DB | 避免抽象层、性能优化、灵活性 |
| 消息推送 (钉钉机器人/微信/邮件) |
直接实现BatchExecutor | BatchFlow → CustomExecutor → API | 批量推送、错误重试、灵活配置 |
| API调用 (REST/GraphQL) |
直接实现BatchExecutor | BatchFlow → CustomExecutor → HTTP | 批量请求、并发控制、统一处理 |
| 测试环境 | MockExecutor | BatchFlow → MockExecutor → Memory | 快速测试、无依赖 |
Application
↓
BatchFlow.Submit()
↓
gopipeline (异步批量处理)
↓
ThrottledBatchExecutor.ExecuteBatch()
├── 可选并发限流(WithConcurrencyLimit)
├── 指标收集
├── 错误处理
└── 调用BatchProcessor
↓
SQLBatchProcessor.ExecuteBatch()
├── 调用SQLDriver生成SQL
├── 执行数据库操作
└── 处理事务
↓
SQLDriver.GenerateInsertSQL()
├── MySQL: INSERT ... ON DUPLICATE KEY UPDATE
├── PostgreSQL: INSERT ... ON CONFLICT DO UPDATE
└── SQLite: INSERT OR REPLACE
↓
Database Connection
优势:
- 代码复用:所有SQL数据库共享执行逻辑
- 标准化:统一的错误处理和指标收集
- 易扩展:新增SQL数据库只需实现SQLDriver
适用场景:
- 关系型数据库
- 需要复杂SQL语法的场景
- 需要事务支持的场景
Application
↓
BatchFlow.Submit()
↓
gopipeline (异步批量处理)
↓
CustomExecutor.ExecuteBatch()
├── 指标收集
├── 错误处理
└── 直接数据库操作
↓
Database Client
├── Redis: Pipeline操作
├── MongoDB: BulkWrite操作
└── 其他NoSQL特定操作
优势:
- 性能优化:避免不必要的抽象层
- 灵活性:可使用数据库特定的优化特性
- 简洁性:减少代码层次
适用场景:
- NoSQL数据库
- 需要特定优化的场景
- 数据模型与SQL差异较大的场景
- 实现SQLDriver接口
type TiDBDriver struct{}
func (d *TiDBDriver) GenerateInsertSQL(schema batchflow.SchemaInterface, data []map[string]any) (string, []any, error) {
// TiDB特定的SQL生成逻辑
return sql, args, nil
}- 创建工厂方法
func NewTiDBBatchFlow(ctx context.Context, db *sql.DB, config PipelineConfig) *BatchFlow {
executor := batchflow.NewSQLThrottledBatchExecutorWithDriver(db, &TiDBDriver{})
return NewBatchFlow(ctx, config.BufferSize, config.FlushSize, config.FlushInterval, executor)
}- 直接实现BatchExecutor接口
type MongoExecutor struct {
client *mongo.Client
metricsReporter batchflow.MetricsReporter
}
func (e *MongoExecutor) ExecuteBatch(ctx context.Context, schema batchflow.SchemaInterface, data []map[string]any) error {
// MongoDB特定的批量操作逻辑
collection := e.client.Database("mydb").Collection(schema.Name)
docs := make([]interface{}, len(data))
for i, row := range data {
docs[i] = row
}
_, err := collection.InsertMany(ctx, docs)
return err
}
func (e *MongoExecutor) WithMetricsReporter(reporter batchflow.MetricsReporter) batchflow.BatchExecutor {
e.metricsReporter = reporter
return e
}- 创建工厂方法
func NewMongoBatchFlow(ctx context.Context, client *mongo.Client, config PipelineConfig) *BatchFlow {
executor := &MongoExecutor{client: client}
return NewBatchFlow(ctx, config.BufferSize, config.FlushSize, config.FlushInterval, executor)
}type BatchExecutor interface {
ExecuteBatch(ctx context.Context, schema *Schema, data []map[string]any) error
}
// 说明:指标配置应在具体类型或能力接口上进行(如在 ThrottledBatchExecutor 上调用 WithMetricsReporter)。
// 在仅持有 BatchExecutor 的通用路径,框架通过只读探测 MetricsReporter() 判断是否已有 Reporter;若无,则内部使用 Noop 兜底,不写回执行器。职责:
- 统一的批量执行接口
- 指标报告器管理
- 所有数据库驱动的入口点
type BatchProcessor interface {
ExecuteBatch(ctx context.Context, schema *Schema, data []map[string]any) error
}职责:
- SQL数据库的核心处理逻辑
- 与 ThrottledBatchExecutor 配合使用
- NoSQL数据库可跳过此层
type SQLDriver interface {
GenerateInsertSQL(schema *Schema, data []map[string]any) (string, []any, error)
}职责:
- 生成数据库特定的SQL语句
- 处理不同数据库的语法差异
- 支持不同的冲突处理策略
- 使用指针传递减少内存复制
- 按Schema分组减少数据库操作次数
- 全局SQLDriver实例共享
- 异步批量处理管道
- 支持多goroutine并发提交
- 自动背压控制
- 批量INSERT语句
- 数据库特定的优化语法
- 连接池复用(用户管理)
- 使用MockExecutor进行无依赖测试
- 测试各个组件的独立功能
- 验证SQL生成逻辑
- 真实数据库环境测试
- 多数据库兼容性验证
- 性能基准测试
- 验证不同实现方式的正确性
- 确保接口一致性
- 测试扩展能力
- 灵活性: 支持SQL和NoSQL数据库的不同实现方式
- 可扩展性: 易于添加新的数据库支持
- 性能: 避免过度抽象,允许数据库特定优化
- 一致性: 统一的API和错误处理
- 可测试性: 完善的Mock支持和测试策略
- 代码复用: SQL数据库共享通用逻辑
- 职责分离: 清晰的组件边界和职责划分
这种架构设计既保持了灵活性,又避免了过度工程化,为不同类型的数据库提供了最适合的实现方式。