From b01ede44287da5f12461048c2d331bf6b195dd7f Mon Sep 17 00:00:00 2001 From: chaoliu Date: Thu, 11 Jun 2026 13:37:37 +0800 Subject: [PATCH 1/3] Add recording_not_empty QA check --- .../episode-qa-checks-mcap-integrity.zh.html | 69 ++++++-- internal/api/handlers/episode_qa_check.go | 163 +++++++++++++++++- .../api/handlers/episode_qa_check_test.go | 113 ++++++++++++ internal/api/handlers/transfer.go | 15 +- 4 files changed, 334 insertions(+), 26 deletions(-) diff --git a/docs/designs/episode-qa-checks-mcap-integrity.zh.html b/docs/designs/episode-qa-checks-mcap-integrity.zh.html index 498953b..1698889 100644 --- a/docs/designs/episode-qa-checks-mcap-integrity.zh.html +++ b/docs/designs/episode-qa-checks-mcap-integrity.zh.html @@ -344,7 +344,7 @@

Keystone / Synapse Development Design

数据运维质检中心与 Episode 自动质检

-

在管理后台“数据运维”板块增加质检中心,作为运营处理异常 episode 的工作台。第一版提供轻量 MCAP 完整性检查、episode 创建后自动质检、手动重新质检和统一质检历史;后续再扩展 robot_type 维度的 Go/Python 脚本质检配置。

+

在管理后台“数据运维”板块增加质检中心,作为运营处理异常 episode 的工作台。第一版提供轻量 MCAP 完整性检查、sidecar 空录制检查、episode 创建后自动质检、手动重新质检和统一质检历史;后续再扩展 robot_type 维度的 Go/Python 脚本质检配置。

设计状态 @@ -369,9 +369,10 @@

数据运维质检中心与 Episode 自动质检

背景

部分异常会导致上传后的 MCAP 无法播放,例如播放器初始化时报错:Expected MCAP magic '89 4d 43 41 50 30 0d 0a', found ...。这类错误通常在预览加载早期暴露,不需要等待全量数据解析。

+

另一类异常是 MCAP 文件边界合法,但录制内容为空:文件只有 sidecar metadata,没有 schema、channel、message 或 chunk。此时头尾 magic 可以通过,但预览器没有任何图像帧可播放。第一版通过 sidecar JSON 的 recording.message_countrecording.topics_recordedtopics_summary 快速拦截这类空录制。

上一版已经验证了轻量 MCAP 头尾 magic 检查的价值。下一版需要把单个详情页按钮升级为后台“质检中心”:运营人员能集中看到待处理 episode、重新触发完整质检 suite、查看最近一次检查结果和历史证据;episode 创建后也应自动进入质检流程。

- 边界说明:头尾 magic 校验只能证明 MCAP 文件边界基本正确,不能证明文件一定可播放。内部 chunk、schema、压缩数据、CRC 或索引仍可能损坏。该检查用于快速拦截明显坏包,后续可以通过更多检查项补足。 + 边界说明:头尾 magic 校验只能证明 MCAP 文件边界基本正确,不能证明文件一定可播放。sidecar 空录制检查依赖 recorder 写出的 JSON 摘要,适合快速拦截明显空包,但不能证明 MCAP 内部一定与 sidecar 一致。内部 chunk、schema、压缩数据、CRC、索引或 sidecar/MCAP 不一致仍需要后续通过 MCAP parser 交叉校验。
@@ -385,7 +386,7 @@

目标

  • 质检中心第一版是运营工作台,默认展示 pending_qafailedneeds_inspection 等可处理 episode。
  • episode 创建后由 Keystone 异步触发自动质检。
  • 前端手动入口统一触发完整质检 suite,而不是触发单个检查项。
  • -
  • 第一版默认质检 suite 固定为 ['mcap_magic']
  • +
  • 第一版默认质检 suite 固定为 ['mcap_magic', 'recording_not_empty']
  • 所有检查项都写入 qa_checks,失败时写入 quality_flag 并将 qa_status 置为 failed
  • 完整 suite 全部通过后,允许自动将可自动流转的 episode 置为 approved
  • qa_status=failed 继续阻止 MCAP 预览、MCAP 下载和云同步。
  • @@ -425,7 +426,7 @@

    数据模型复用

    episodes.quality_flag 面向研究员和运营人员的质量说明 - 失败时写入失败摘要,例如头尾 magic 不匹配;通过时可清空由自动质检写入的失败摘要 + 失败时写入失败摘要,例如头尾 magic 不匹配或 sidecar 显示空录制;通过时可清空由自动质检写入的失败摘要 qa_checks @@ -435,12 +436,12 @@

    数据模型复用

    qa_checks.check_name 检查项标识 - 第一版固定为 mcap_magic,未来可出现 topic_requiredduration_rangepython:<script_name> 等 + 第一版固定为 mcap_magicrecording_not_empty,未来可出现 topic_requiredduration_rangepython:<script_name>qa_checks.check_metadata 结构化检查详情 - 记录 expected/head/tail/file_size 等数据,便于质检中心抽屉展示和问题排查 + 记录 expected/head/tail/file_size、message_count、topics_recorded_count、topics_summary_count 等数据,便于质检中心抽屉展示和问题排查 @@ -449,7 +450,7 @@

    qa_checks.passedqa_status 的区别

    qa_checks.passed -

    单次、单项检查的事实记录。它回答“这一次 mcap_magic 是否通过”。

    +

    单次、单项检查的事实记录。它回答“这一次 mcap_magic 或 recording_not_empty 是否通过”。

    qa_status @@ -537,12 +538,12 @@

    查询质检中心 episode 列表

    "task_id": 88, "robot_type": "arm_bot", "qa_status": "failed", - "quality_flag": "MCAP integrity check failed: tail magic mismatch", + "quality_flag": "Recording sidecar check failed: message_count is zero and no recorded topics", "created_at": "2026-06-05T10:20:00Z", "latest_qa_check": { - "check_name": "mcap_magic", + "check_name": "recording_not_empty", "passed": false, - "details": "MCAP integrity check failed: tail magic mismatch", + "details": "Recording sidecar check failed: message_count is zero and no recorded topics", "checked_at": "2026-06-05T10:30:00Z" } } @@ -572,6 +573,23 @@

    查询 episode 质检历史

    "file_size_bytes": 123456789 }, "checked_at": "2026-06-05T10:30:00Z" + }, + { + "id": 457, + "episode_id": 123, + "check_name": "recording_not_empty", + "passed": false, + "score": 0, + "details": "Recording sidecar check failed: message_count is zero and no recorded topics", + "check_metadata": { + "message_count": 0, + "topics_recorded_count": 0, + "topics_summary_count": 0, + "duration_sec": 6.461, + "file_size_bytes": 1129, + "sidecar_size_bytes": 752 + }, + "checked_at": "2026-06-05T10:30:00Z" } ] } @@ -594,6 +612,13 @@

    运行 episode 完整质检 suite

    "score": 1, "details": "MCAP head and tail magic matched", "checked_at": "2026-06-05T10:35:00Z" + }, + { + "check_name": "recording_not_empty", + "passed": true, + "score": 1, + "details": "Recording sidecar reports messages and topics", + "checked_at": "2026-06-05T10:35:00Z" } ] } @@ -650,7 +675,7 @@

    执行流程

    3 加载 suite -

    第一版硬编码 ['mcap_magic'],未来按 robot_type 加载配置。

    +

    第一版硬编码 ['mcap_magic', 'recording_not_empty'],未来按 robot_type 加载配置。

    4 @@ -716,6 +741,19 @@

    mcap_magic 检查细节

    +
    +

    recording_not_empty 检查细节

    +
      +
    • 读取 episodes.sidecar_path 指向的 JSON sidecar,不直接解析 MCAP。
    • +
    • 先通过 S3 stat 获取 sidecar 对象大小;空对象直接失败。
    • +
    • sidecar 大小超过 4 MiB 时直接失败,避免误读异常大对象。
    • +
    • 解析 recording.message_countrecording.topics_recordedtopics_summary
    • +
    • message_count <= 0 且没有任何 recorded topic 时失败,错误摘要为 Recording sidecar check failed: message_count is zero and no recorded topics
    • +
    • message_count <= 0 或没有 recorded topic 任一单独成立时也失败,分别提示 message_count 为零或无 recorded topics。
    • +
    • 该检查用于快速拦截“合法 MCAP 空录制”;后续可增加 MCAP footer/summary 读取,校验 sidecar 与 MCAP 的 message/channel 统计是否一致。
    • +
    +
    +

    门禁改造

    @@ -739,7 +777,7 @@

    门禁改造

    - + @@ -810,7 +848,8 @@

    测试计划

    Keystone

      -
    • episode 创建后自动入队并执行 mcap_magic
    • +
    • episode 创建后自动入队并执行 mcap_magicrecording_not_empty
    • +
    • sidecar 中 message_count=0topics_recorded=[]topics_summary=[] 时,recording_not_empty 失败并写入结构化证据。
    • 自动质检全部通过时,pending_qaqa_running 更新为 approved
    • 自动质检失败时写入 qa_checks.passed=falsequality_flag,并设置 qa_status=failed
    • 手动重新质检允许 failed 在全部通过后恢复为 approved
    • @@ -843,6 +882,7 @@

      后续 robot_type 与 Python 脚本扩展

      "robot_type": "arm_bot", "checks": [ { "name": "mcap_magic", "runtime": "go" }, + { "name": "recording_not_empty", "runtime": "go" }, { "name": "topic_required", "runtime": "python", "script": "topic_required.py" }, { "name": "duration_range", "runtime": "python", "script": "duration_range.py" } ] @@ -851,6 +891,7 @@

      后续 robot_type 与 Python 脚本扩展

      未来自动批准规则:如果某个机器人类型配置了多个检查脚本,该机器人生产的 episode 必须通过全部已配置检查,才自动把 qa_status 改成 approved;任一失败都改成 failed
    + recording_not_empty sidecar_schema duration_range topic_required @@ -864,7 +905,7 @@

    已确认决策

  • 质检中心放在管理后台“数据运维”板块。
  • 第一版不支持批量质检、批量同步、持久化 job 队列、robot_type 配置 UI 或 Python 脚本执行。
  • episode 创建后触发自动质检;第一版使用进程内轻量队列,接受服务重启丢失未执行任务。
  • -
  • 第一版默认 suite 固定为 ['mcap_magic']
  • +
  • 第一版默认 suite 固定为 ['mcap_magic', 'recording_not_empty']
  • 质检中心和 episode 详情页都使用 POST /api/v1/qa/episodes/:id/run
  • 按钮文案使用 重新质检,不再使用单项检查文案。
  • 所有检查项失败都把 qa_status 置为 failed
  • diff --git a/internal/api/handlers/episode_qa_check.go b/internal/api/handlers/episode_qa_check.go index e4fd95a..5f5c893 100644 --- a/internal/api/handlers/episode_qa_check.go +++ b/internal/api/handlers/episode_qa_check.go @@ -28,7 +28,8 @@ import ( ) const ( - episodeQACheckMcapMagic = "mcap_magic" + episodeQACheckMcapMagic = "mcap_magic" + episodeQACheckRecordingNotEmpty = "recording_not_empty" qaRunModeAuto QARunMode = "auto" qaRunModeManual QARunMode = "manual" @@ -43,6 +44,7 @@ const ( defaultEpisodeQAQueueSize = 256 defaultEpisodeQATimeout = 2 * time.Minute + maxEpisodeQASidecarBytes = 4 * 1024 * 1024 ) var ( @@ -132,10 +134,11 @@ type episodeQACheckOutcome struct { } type episodeQACheckRow struct { - ID int64 `db:"id"` - McapPath string `db:"mcap_path"` - QAStatus string `db:"qa_status"` - Quality sql.NullString `db:"quality_flag"` + ID int64 `db:"id"` + McapPath string `db:"mcap_path"` + SidecarPath string `db:"sidecar_path"` + QAStatus string `db:"qa_status"` + Quality sql.NullString `db:"quality_flag"` } type episodeQARunClaim struct { @@ -520,7 +523,7 @@ func (h *EpisodeQAHandler) RunEpisodeQASuite(ctx context.Context, episodeID int6 } func defaultEpisodeQASuite(_ episodeQACheckRow) []string { - return []string{episodeQACheckMcapMagic} + return []string{episodeQACheckMcapMagic, episodeQACheckRecordingNotEmpty} } func normalizeEpisodeQACheckName(raw string) string { @@ -529,7 +532,7 @@ func normalizeEpisodeQACheckName(raw string) string { func isSupportedEpisodeQACheckName(checkName string) bool { switch checkName { - case episodeQACheckMcapMagic: + case episodeQACheckMcapMagic, episodeQACheckRecordingNotEmpty: return true default: return false @@ -539,7 +542,7 @@ func isSupportedEpisodeQACheckName(checkName string) bool { func (h *EpisodeQAHandler) loadEpisodeForQACheck(ctx context.Context, episodeID int64) (episodeQACheckRow, error) { var row episodeQACheckRow err := h.db.GetContext(ctx, &row, ` - SELECT id, mcap_path, COALESCE(qa_status, '') AS qa_status, quality_flag + SELECT id, mcap_path, COALESCE(sidecar_path, '') AS sidecar_path, COALESCE(qa_status, '') AS qa_status, quality_flag FROM episodes WHERE id = ? AND deleted_at IS NULL LIMIT 1 @@ -648,6 +651,8 @@ func (h *EpisodeQAHandler) runEpisodeQACheck(ctx context.Context, checkName stri switch checkName { case episodeQACheckMcapMagic: return h.runMcapMagicQACheck(ctx, row) + case episodeQACheckRecordingNotEmpty: + return h.runRecordingNotEmptyQACheck(ctx, row) default: return episodeQACheckOutcome{}, fmt.Errorf("unsupported qa check %q", checkName) } @@ -707,6 +712,72 @@ func (h *EpisodeQAHandler) runMcapMagicQACheck(ctx context.Context, row episodeQ return evaluateMcapMagicCheck(size, head, tail, ""), nil } +func (h *EpisodeQAHandler) runRecordingNotEmptyQACheck(ctx context.Context, row episodeQACheckRow) (episodeQACheckOutcome, error) { + if h.s3 == nil { + return episodeQACheckOutcome{}, fmt.Errorf("storage is not configured") + } + + bucket, objectName, ok := resolveEpisodeMcapLocation(h.bucket, row.SidecarPath) + if !ok { + return recordingNotEmptyFailure("Recording sidecar check failed: invalid sidecar_path", map[string]any{ + "sidecar_path": row.SidecarPath, + }), nil + } + + metadata := map[string]any{ + "bucket": bucket, + "object": objectName, + } + + stat, err := h.s3.StatObject(ctx, bucket, objectName, minio.StatObjectOptions{}) + if err != nil { + if isS3NotFound(err) { + return recordingNotEmptyFailure("Recording sidecar check failed: sidecar object not found", metadata), nil + } + return episodeQACheckOutcome{}, fmt.Errorf("stat sidecar object: %w", err) + } + metadata["sidecar_size_bytes"] = stat.Size + if stat.Size <= 0 { + return recordingNotEmptyFailure("Recording sidecar check failed: sidecar object is empty", metadata), nil + } + if stat.Size > maxEpisodeQASidecarBytes { + metadata["max_sidecar_size_bytes"] = maxEpisodeQASidecarBytes + return recordingNotEmptyFailure("Recording sidecar check failed: sidecar object is too large", metadata), nil + } + + data, err := h.readS3Object(ctx, bucket, objectName, maxEpisodeQASidecarBytes) + if err != nil { + if isS3NotFound(err) { + return recordingNotEmptyFailure("Recording sidecar check failed: sidecar object not found", metadata), nil + } + return episodeQACheckOutcome{}, fmt.Errorf("read sidecar object: %w", err) + } + + return evaluateRecordingNotEmptyCheck(data, metadata) +} + +func (h *EpisodeQAHandler) readS3Object(ctx context.Context, bucket, objectName string, maxBytes int64) ([]byte, error) { + obj, err := h.s3.GetObject(ctx, bucket, objectName, minio.GetObjectOptions{}) + if err != nil { + return nil, err + } + defer func() { + if err := obj.Close(); err != nil { + logger.Printf("[EPISODE-QA] S3 object close failed: bucket=%s, object=%s, err=%v", bucket, objectName, err) + } + }() + + limited := io.LimitReader(obj, maxBytes+1) + data, err := io.ReadAll(limited) + if err != nil { + return nil, err + } + if int64(len(data)) > maxBytes { + return nil, fmt.Errorf("object exceeds max size %d bytes", maxBytes) + } + return data, nil +} + func (h *EpisodeQAHandler) readS3ObjectRange(ctx context.Context, bucket, objectName string, start, end int64) ([]byte, error) { var opts minio.GetObjectOptions if err := opts.SetRange(start, end); err != nil { @@ -726,6 +797,82 @@ func (h *EpisodeQAHandler) readS3ObjectRange(ctx context.Context, bucket, object return io.ReadAll(obj) } +func evaluateRecordingNotEmptyCheck(data []byte, metadata map[string]any) (episodeQACheckOutcome, error) { + if metadata == nil { + metadata = map[string]any{} + } + metadata["sidecar_json_bytes"] = len(data) + if len(data) == 0 { + return recordingNotEmptyFailure("Recording sidecar check failed: sidecar JSON is empty", metadata), nil + } + + var sidecar sidecarJSON + if err := json.Unmarshal(data, &sidecar); err != nil { + metadata["parse_error"] = err.Error() + return recordingNotEmptyFailure("Recording sidecar check failed: invalid sidecar JSON", metadata), nil + } + + messageCount := sidecar.Recording.MessageCount + topicsRecordedCount := countNonEmptyStrings(sidecar.Recording.TopicsRecorded) + topicsSummaryCount := countNonEmptySidecarTopics(sidecar.TopicsSummary) + metadata["message_count"] = messageCount + metadata["topics_recorded_count"] = topicsRecordedCount + metadata["topics_summary_count"] = topicsSummaryCount + metadata["duration_sec"] = sidecar.Recording.DurationSec + metadata["file_size_bytes"] = sidecar.Recording.FileSizeBytes + + if messageCount <= 0 && topicsRecordedCount == 0 && topicsSummaryCount == 0 { + return recordingNotEmptyFailure("Recording sidecar check failed: message_count is zero and no recorded topics", metadata), nil + } + if messageCount <= 0 { + return recordingNotEmptyFailure("Recording sidecar check failed: message_count is zero", metadata), nil + } + if topicsRecordedCount == 0 && topicsSummaryCount == 0 { + return recordingNotEmptyFailure("Recording sidecar check failed: no recorded topics", metadata), nil + } + + return episodeQACheckOutcome{ + CheckName: episodeQACheckRecordingNotEmpty, + Passed: true, + Score: 1, + Details: "Recording sidecar reports messages and topics", + Metadata: metadata, + }, nil +} + +func recordingNotEmptyFailure(details string, metadata map[string]any) episodeQACheckOutcome { + if metadata == nil { + metadata = map[string]any{} + } + return episodeQACheckOutcome{ + CheckName: episodeQACheckRecordingNotEmpty, + Passed: false, + Score: 0, + Details: details, + Metadata: metadata, + } +} + +func countNonEmptyStrings(values []string) int { + count := 0 + for _, value := range values { + if strings.TrimSpace(value) != "" { + count++ + } + } + return count +} + +func countNonEmptySidecarTopics(values []sidecarTopicSummary) int { + count := 0 + for _, value := range values { + if strings.TrimSpace(value.Topic) != "" { + count++ + } + } + return count +} + func evaluateMcapMagicCheck(fileSize int64, head, tail []byte, explicitReason string) episodeQACheckOutcome { metadata := map[string]any{ "expected_magic": spacedHex(mcapMagicBytes), diff --git a/internal/api/handlers/episode_qa_check_test.go b/internal/api/handlers/episode_qa_check_test.go index 0d3eff1..6e84ce3 100644 --- a/internal/api/handlers/episode_qa_check_test.go +++ b/internal/api/handlers/episode_qa_check_test.go @@ -71,6 +71,118 @@ func TestEvaluateMcapMagicCheck(t *testing.T) { } } +func TestEvaluateRecordingNotEmptyCheck(t *testing.T) { + tests := []struct { + name string + body string + wantPassed bool + wantDetail string + }{ + { + name: "messages and recorded topics pass", + body: `{ + "recording": { + "duration_sec": 6.4, + "file_size_bytes": 2048, + "message_count": 12, + "topics_recorded": ["/camera/image_raw/compressed"] + }, + "topics_summary": [] + }`, + wantPassed: true, + wantDetail: "Recording sidecar reports messages and topics", + }, + { + name: "messages and topic summary pass", + body: `{ + "recording": { + "duration_sec": 6.4, + "file_size_bytes": 2048, + "message_count": 12, + "topics_recorded": [] + }, + "topics_summary": [{"topic": "/camera/image_raw/compressed"}] + }`, + wantPassed: true, + wantDetail: "Recording sidecar reports messages and topics", + }, + { + name: "empty recording fails", + body: `{ + "recording": { + "duration_sec": 6.461, + "file_size_bytes": 1129, + "message_count": 0, + "topics_recorded": [] + }, + "topics_summary": [] + }`, + wantPassed: false, + wantDetail: "Recording sidecar check failed: message_count is zero and no recorded topics", + }, + { + name: "zero messages with topics fails", + body: `{ + "recording": { + "message_count": 0, + "topics_recorded": ["/camera/image_raw/compressed"] + } + }`, + wantPassed: false, + wantDetail: "Recording sidecar check failed: message_count is zero", + }, + { + name: "messages without topics fail", + body: `{ + "recording": { + "message_count": 12, + "topics_recorded": [] + }, + "topics_summary": [] + }`, + wantPassed: false, + wantDetail: "Recording sidecar check failed: no recorded topics", + }, + { + name: "invalid json fails", + body: `{`, + wantPassed: false, + wantDetail: "Recording sidecar check failed: invalid sidecar JSON", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, err := evaluateRecordingNotEmptyCheck([]byte(tt.body), nil) + if err != nil { + t.Fatalf("evaluate recording check: %v", err) + } + if got.Passed != tt.wantPassed { + t.Fatalf("passed = %v, want %v", got.Passed, tt.wantPassed) + } + if got.Details != tt.wantDetail { + t.Fatalf("details = %q, want %q", got.Details, tt.wantDetail) + } + if got.CheckName != episodeQACheckRecordingNotEmpty { + t.Fatalf("check name = %q, want %q", got.CheckName, episodeQACheckRecordingNotEmpty) + } + }) + } +} + +func TestDefaultEpisodeQASuiteIncludesRecordingNotEmpty(t *testing.T) { + got := defaultEpisodeQASuite(episodeQACheckRow{}) + want := []string{episodeQACheckMcapMagic, episodeQACheckRecordingNotEmpty} + if len(got) != len(want) { + t.Fatalf("suite length = %d, want %d: %v", len(got), len(want), got) + } + for i := range want { + if got[i] != want[i] { + t.Fatalf("suite[%d] = %q, want %q", i, got[i], want[i]) + } + } +} + func TestPersistEpisodeQACheckFailureMarksEpisodeFailed(t *testing.T) { db := setupEpisodeQACheckTestDB(t) handler := &EpisodeQAHandler{db: db} @@ -314,6 +426,7 @@ func setupEpisodeQACheckTestDB(t *testing.T) *sqlx.DB { CREATE TABLE episodes ( id INTEGER PRIMARY KEY, mcap_path TEXT, + sidecar_path TEXT, qa_status TEXT, qa_score REAL, auto_approved BOOLEAN, diff --git a/internal/api/handlers/transfer.go b/internal/api/handlers/transfer.go index 1c677ff..6350cda 100644 --- a/internal/api/handlers/transfer.go +++ b/internal/api/handlers/transfer.go @@ -378,14 +378,21 @@ func (h *TransferHandler) onUploadProgress(dc *services.TransferConn, msg map[st // sidecarRecording is the subset of the sidecar JSON "recording" block we care about. type sidecarRecording struct { - DurationSec float64 `json:"duration_sec"` - FileSizeBytes int64 `json:"file_size_bytes"` - ChecksumSHA256 string `json:"checksum_sha256"` + DurationSec float64 `json:"duration_sec"` + FileSizeBytes int64 `json:"file_size_bytes"` + ChecksumSHA256 string `json:"checksum_sha256"` + MessageCount int64 `json:"message_count"` + TopicsRecorded []string `json:"topics_recorded"` +} + +type sidecarTopicSummary struct { + Topic string `json:"topic"` } // sidecarJSON is the top-level structure of the task sidecar JSON file. type sidecarJSON struct { - Recording sidecarRecording `json:"recording"` + Recording sidecarRecording `json:"recording"` + TopicsSummary []sidecarTopicSummary `json:"topics_summary"` } // readSidecarFromS3 downloads the sidecar JSON object from MinIO and returns the parsed result. From 263864efb48e4ce47eac4e2b4beac6ffdb013300 Mon Sep 17 00:00:00 2001 From: chaoliu Date: Thu, 11 Jun 2026 16:45:30 +0800 Subject: [PATCH 2/3] Add bulk QA progress SSE --- .../designs/data-ops-bulk-actions-api.zh.html | 259 +++++++-- internal/api/handlers/data_ops.go | 19 +- internal/api/handlers/data_ops_bulk.go | 126 ++++- internal/api/handlers/data_ops_bulk_run.go | 516 ++++++++++++++++++ internal/api/handlers/data_ops_test.go | 458 ++++++++++++++++ internal/server/server.go | 3 + .../migrations/000006_bulk_runs.down.sql | 5 + .../migrations/000006_bulk_runs.up.sql | 23 + 8 files changed, 1346 insertions(+), 63 deletions(-) create mode 100644 internal/api/handlers/data_ops_bulk_run.go create mode 100644 internal/storage/database/migrations/000006_bulk_runs.down.sql create mode 100644 internal/storage/database/migrations/000006_bulk_runs.up.sql diff --git a/docs/designs/data-ops-bulk-actions-api.zh.html b/docs/designs/data-ops-bulk-actions-api.zh.html index 92ea960..e9d8ec2 100644 --- a/docs/designs/data-ops-bulk-actions-api.zh.html +++ b/docs/designs/data-ops-bulk-actions-api.zh.html @@ -347,15 +347,15 @@

    Data Ops Bulk Actions

    数据明细批量质检与云同步 API 设计

    - 为数据明细页面提供“按当前筛选结果批量质检”和“按当前筛选结果批量云同步”的后端能力。第一版提供确认前预览和轻量级异步执行 API,不做持久化批任务、进度查询或取消。 + 为数据明细页面提供“按当前筛选结果批量质检”和“按当前筛选结果批量云同步”的后端能力。第一版保留确认前预览和异步执行模型,并为批量质检增加短期 run snapshot 与 SSE 实时进度;批量同步暂不接入实时进度。

    实现目标
    预览 API 先行
    固定筛选快照
    -
    后台 goroutine 异步处理
    -
    无硬上限保护
    +
    批量 QA run_id
    +
    Snapshot + SSE 进度
    @@ -365,6 +365,7 @@

    数据明细批量质检与云同步 API 设计

    API 规格筛选语义执行模型 + 前端进度测试计划实现清单 @@ -379,18 +380,21 @@

    本次实现

  • 请求体只接收逗号字符串形式的筛选条件。
  • 预览接口只计算命中和预计可操作统计,不启动后台任务。
  • 请求内解析筛选条件并查询固定 episode ID 快照。
  • -
  • 返回 202 Accepted 后由后台 goroutine 异步处理。
  • +
  • 批量 QA 返回 run_id,后台 goroutine 异步处理,并通过 snapshot API 与 SSE 暴露聚合进度。
  • +
  • 批量 QA 全局只允许一个 queuedrunning run。
  • +
  • 批量同步仍返回 202 Accepted 后异步处理,本轮不接入实时进度。
  • 补 Swagger 注释和后端 helper 单测。
  • 明确不做

      -
    • 不做前端页面改造。
    • -
    • 不新增批任务表、批次 ID、进度查询或取消接口。
    • -
    • 不返回最终成功、跳过、失败统计。
    • +
    • 不做批量同步进度面板;前端只做当前批量 QA 的右下角进度面板。
    • +
    • 不做长期审计历史或完整任务中心;批量 run 只作为短期进度事实来源。
    • +
    • 不返回或展示 episode 级明细列表。
    • +
    • 不做取消、自动重试或服务重启后的继续执行。
    • 不设置批量命中条数硬上限;逗号筛选值数量也不设置代码层数量上限。
    • -
    • 批量同步不做已同步数据的重新同步。
    • +
    • 批量同步不做已同步数据的重新同步,也不接入本轮 SSE 进度。
    @@ -401,7 +405,7 @@

    2. 已确认决策

    异步模型 - 两个执行接口均返回 202 Accepted,后台继续处理,不等待整批完成。 + 两个执行接口仍不等待整批完成;批量 QA 返回 run_id,批量同步保持轻量 accepted 模型。
    作用范围 @@ -409,7 +413,7 @@

    2. 已确认决策

    筛选快照 - 前端只发送筛选条件;后端请求内查出固定 ID 快照,再把 ID 列表交给后台 goroutine。 + 前端只发送筛选条件;执行接口按请求时刻重新查询固定 ID 快照。批量 QA 的 total_count 以执行时快照为准。
    确认前预览 @@ -427,6 +431,22 @@

    2. 已确认决策

    无硬上限 第一版不限制命中数量,也不限制逗号筛选值数量;仍不承诺完整批次可靠性。
    +
    + 批量 QA 进度 + 新增短期 bulk_runs 进度事实表、snapshot API 和单 run SSE stream。SSE 只推聚合进度,不推 episode 明细。 +
    +
    + 运行互斥 + 全局只允许一个 bulk_qa run 处于 queuedrunning;再次提交返回 409 和现有 run_id +
    +
    + 状态机 + run 状态为 queuedrunningcompletedfailedinterrupted。质检不通过或单条处理失败不改变 run 的 completed 终态。 +
    +
    + 认证方式 + 前端使用 fetch streaming 消费 SSE,并携带 Authorization: Bearer ...;不使用原生 EventSource,不把 token 放入 query string。 +
    matched_count 只表示筛选命中总数,不表示实际可操作数量。 @@ -514,12 +534,62 @@

    3.2 批量质检执行

    } }
    {
    -  "status": "accepted",
    -  "matched_count": 123,
    +  "run": {
    +    "run_id": "bulk_qa_20260611_073012_a8f3c9",
    +    "action": "bulk_qa",
    +    "status": "queued",
    +    "total_count": 123,
    +    "processed_count": 0,
    +    "passed_count": 0,
    +    "qa_failed_count": 0,
    +    "processing_failed_count": 0,
    +    "skipped_count": 0,
    +    "started_at": null,
    +    "updated_at": "2026-06-11T07:30:12Z",
    +    "finished_at": null,
    +    "error_message": ""
    +  },
       "message": "123 episodes accepted for bulk QA"
     }
    -

    3.3 批量云同步执行

    +

    3.3 批量质检进度查询

    +

    GET /api/v1/data-ops/bulk-runs/:run_id

    +
    {
    +  "run_id": "bulk_qa_20260611_073012_a8f3c9",
    +  "action": "bulk_qa",
    +  "status": "running",
    +  "total_count": 123,
    +  "processed_count": 80,
    +  "passed_count": 52,
    +  "qa_failed_count": 18,
    +  "processing_failed_count": 2,
    +  "skipped_count": 8,
    +  "started_at": "2026-06-11T07:30:12Z",
    +  "updated_at": "2026-06-11T07:32:20Z",
    +  "finished_at": null,
    +  "error_message": ""
    +}
    + +

    GET /api/v1/data-ops/bulk-runs/current?action=bulk_qa

    +

    如果存在 queuedrunning 的批量 QA run,返回同样的 snapshot;否则返回 204 No Content。该接口只用于页面加载时发现当前运行,不提供历史任务中心。

    + +

    3.4 批量质检 SSE

    +

    GET /api/v1/data-ops/bulk-runs/:run_id/stream

    +

    前端使用 fetch streaming 连接,并携带 Authorization: Bearer <token>。连接前应先调用 snapshot;只有 queuedrunning 状态才建立 stream。

    +
    event: bulk_run_snapshot
    +data: {"run_id":"bulk_qa_20260611_073012_a8f3c9","action":"bulk_qa","status":"running","total_count":123,"processed_count":80,"passed_count":52,"qa_failed_count":18,"processing_failed_count":2,"skipped_count":8,"started_at":"2026-06-11T07:30:12Z","updated_at":"2026-06-11T07:32:20Z","finished_at":null,"error_message":""}
    +
    +event: bulk_run_progress
    +data: {"run_id":"bulk_qa_20260611_073012_a8f3c9","action":"bulk_qa","status":"running","total_count":123,"processed_count":81,"passed_count":53,"qa_failed_count":18,"processing_failed_count":2,"skipped_count":8,"started_at":"2026-06-11T07:30:12Z","updated_at":"2026-06-11T07:32:21Z","finished_at":null,"error_message":""}
    +
    +event: bulk_run_completed
    +data: {"run_id":"bulk_qa_20260611_073012_a8f3c9","action":"bulk_qa","status":"completed","total_count":123,"processed_count":123,"passed_count":90,"qa_failed_count":25,"processing_failed_count":2,"skipped_count":6,"started_at":"2026-06-11T07:30:12Z","updated_at":"2026-06-11T07:35:12Z","finished_at":"2026-06-11T07:35:12Z","error_message":""}
    +
    +event: ping
    +data: {"ts":"2026-06-11T07:32:22Z"}
    +

    SSE 事件使用 generic 命名:bulk_run_snapshotbulk_run_progressbulk_run_completedbulk_run_failedbulk_run_interruptedping。所有业务事件 payload 使用同一套 snapshot 结构。

    + +

    3.5 批量云同步执行

    POST /api/v1/data-ops/episodes/bulk-sync

    {
       "confirm": true,
    @@ -536,7 +606,7 @@ 

    3.3 批量云同步执行

    -

    3.4 错误响应

    +

    3.6 错误响应

    JSON sidecar 下载不受 MCAP 完整性失败影响不受 MCAP/录制内容质检失败影响 kind=sidecar 保持现有行为
    @@ -566,6 +636,11 @@

    3.4 错误响应

    + + + + + @@ -669,7 +744,7 @@

    执行请求路径

  • filters 转换为 data-ops query。
  • 使用同一套 dataOpsEpisodeBaseFromSQLbuildDataOpsEpisodeWhere 查询 ID 快照。
  • e.created_at DESC, e.id DESC 固定顺序返回 ID。
  • -
  • 启动后台 goroutine,HTTP 返回 202matched_count
  • +
  • 批量 QA 创建 bulk_runs 记录并返回 run_id;批量同步仍直接启动后台 goroutine 并返回 202
  • @@ -678,14 +753,42 @@

    执行请求路径

    后台处理

    1. 后台不使用 HTTP request context,改用 context.Background()
    2. +
    3. 批量 QA goroutine 启动后将 run 从 queued 更新为 running,并写入 started_at
    4. 单条 QA 使用与自动 QA 相同的超时策略。
    5. 批量 QA 通过 worker pool 固定并发 4
    6. -
    7. 批量同步逐条调用 EnqueueEpisodeManual
    8. -
    9. 单条失败或跳过不影响整批继续处理。
    10. -
    11. 开始和结束打印 summary 日志,异常错误逐条打印。
    12. +
    13. 每条 episode 处理结束后立即更新 bulk_runs 聚合计数。
    14. +
    15. SSE 进度事件最多每 500ms 推送一次;终态事件立即推送。
    16. +
    17. 批量同步逐条调用 EnqueueEpisodeManual,本轮不维护 run 进度。
    18. +
    19. 单条失败或跳过不影响整批继续处理;整批无法继续时才将 run 标记为 failed
    20. +
    21. 开始、进度和结束打印 summary 日志,异常错误逐条打印。
    +
    +

    批量 QA run 语义

    +
      +
    • run_id 格式为 bulk_qa_YYYYMMDD_HHMMSS_<random>,时间使用 UTC;数据库内部仍可使用自增 id
    • +
    • total_count 是执行接口查询到的 ID 快照数量,不使用预览接口的估算值。
    • +
    • processed_count 表示本次 run 已经处理出终态的 episode 数量,不复用旧 qa_status 或旧 latest_qa_check
    • +
    • processed_count = passed_count + qa_failed_count + processing_failed_count + skipped_count
    • +
    • passed_countqa_failed_count 只看本次 RunEpisodeQASuiteresult.Passed,不根据 episode 最终状态推断。
    • +
    • skipped_count 只返回一个总数,不返回 breakdown;包含已有质检运行中、快照后 episode 被删除等未实际执行 suite 的情况。
    • +
    • processing_failed_count 表示单条执行过程错误,例如 S3/DB/超时;它不让 run 进入 failed
    • +
    • total_count=0 时仍创建 run,并直接标记为 completed
    • +
    +
    + +
    +

    SSE 与前端恢复

    +
      +
    • 前端执行成功后拿到 run_id,先请求 snapshot,再在 queuedrunning 时连接 SSE。
    • +
    • SSE 断线后,前端先重新请求 snapshot;若仍是 queued/running,按 1s -> 2s -> 5s -> 10s 退避重连。
    • +
    • 收到任意 SSE 事件后重置退避;超过一段时间仍无法恢复时,UI 提示“实时进度连接中断,点击重试”。
    • +
    • SSE stream 对终态 run 做防御:发送一次 bulk_run_completedbulk_run_failedbulk_run_interrupted 后关闭连接。
    • +
    • run 不存在时返回 404,不使用 SSE error event 伪装业务错误。
    • +
    +
    +

    QA 预览口径

    @@ -721,22 +824,83 @@

    同步预览口径

    -

    第一版没有持久化批任务。服务重启后,未处理完的批量 QA 会丢失;批量同步中已经写入 sync_logs 的任务可由 worker 恢复,未写入的不会恢复。

    +

    第一版只做短期进度事实,不做继续执行恢复。Keystone 启动时应将遗留 queuedrunningbulk_qa run 标记为 interrupted,保留已有计数,并提示用户重新发起批量质检。批量同步中已经写入 sync_logs 的任务仍可由 worker 恢复,未写入的不会恢复。

    +
    + + +
    +

    6. 前端进度体验

    +
    +
    +

    展示形态

    +
      +
    • 点击“批量质检”并确认后,页面右下角展示 floating task panel,不使用阻塞 modal。
    • +
    • 面板收起态展示标题、本次已处理 X / Y 和进度条。
    • +
    • 展开态展示 质检通过质检不通过跳过处理失败 四个聚合计数。
    • +
    • 完成后面板不自动消失,用户手动关闭;表格筛选变化不影响当前 run panel。
    • +
    • 第一版不展示 episode 级明细,不提供失败列表或跳转入口。
    • +
    +
    +
    +

    按钮行为

    +
      +
    • 存在 queuedrunning run 时,批量质检按钮文案改为 查看批量质检进度
    • +
    • 点击时打开或展开当前 run panel,而不是再次提交执行请求。
    • +
    • 如果前端未知但执行接口返回 409,使用响应里的 run_id 打开现有 run panel。
    • +
    • 页面加载时调用 GET /api/v1/data-ops/bulk-runs/current?action=bulk_qa 自动发现当前运行中的批量 QA。
    • +
    +
    +
    + +
    +

    前端连接流程

    +
      +
    1. 用户点击批量质检,前端先调用 preview 接口并展示确认信息。
    2. +
    3. 用户确认后调用 POST /api/v1/data-ops/episodes/bulk-qa
    4. +
    5. 响应中拿到 run snapshot 后立即展示右下角 panel。
    6. +
    7. 如果 run 是 queuedrunning,使用 fetch streaming 连接 /stream,请求带 Authorization: Bearer ...
    8. +
    9. 如果 run 已是 completed,例如 total_count=0,直接展示完成态,不连接 SSE。
    10. +
    11. SSE 断开时先重新拉 snapshot,再按退避策略重连或显示终态。
    12. +
    -

    6. 后端改造建议

    +

    7. 后端改造建议

    结构调整

    • DataOpsHandler 增加 QA handler 和 sync worker 依赖。
    • NewDataOpsHandler 接收新增依赖;server 初始化处传入现有 qaHandlersyncWorker
    • -
    • RegisterRoutes 新增 POST /episodes/bulk-qa/previewPOST /episodes/bulk-sync/previewPOST /episodes/bulk-qaPOST /episodes/bulk-sync
    • -
    • 新增 request/response struct:DataOpsBulkEpisodeActionRequestDataOpsBulkEpisodePreviewResponseDataOpsBulkEpisodeActionResponse
    • -
    • 新增 helper:解析 body filters、计算预览统计、查询 ID 快照、启动 QA worker pool、启动 sync enqueue goroutine。
    • +
    • RegisterRoutes 新增 bulk run snapshot、current 和 stream 路由,并保留现有 preview/execute 路由。
    • +
    • 新增短期进度表 bulk_runs;本轮不建 bulk_run_items
    • +
    • 新增 response struct:DataOpsBulkRunResponseDataOpsBulkEpisodeActionResponse,执行响应包裹 run snapshot。
    • +
    • 新增 run helper:生成 run_id、创建 queued run、查 current run、更新聚合计数、发布 SSE 事件、服务启动时标记 interrupted。
    • +
    • 新增 SSE broker,可参考 DeviceStateBroker 的内存 fan-out 模式;snapshot 仍以数据库为事实来源。
    +

    数据表

    +
    CREATE TABLE bulk_runs (
    +  id BIGINT PRIMARY KEY AUTO_INCREMENT,
    +  run_id VARCHAR(64) NOT NULL UNIQUE,
    +  action VARCHAR(32) NOT NULL,
    +  status VARCHAR(32) NOT NULL,
    +  total_count BIGINT NOT NULL DEFAULT 0,
    +  processed_count BIGINT NOT NULL DEFAULT 0,
    +  passed_count BIGINT NOT NULL DEFAULT 0,
    +  qa_failed_count BIGINT NOT NULL DEFAULT 0,
    +  processing_failed_count BIGINT NOT NULL DEFAULT 0,
    +  skipped_count BIGINT NOT NULL DEFAULT 0,
    +  error_message TEXT,
    +  started_at TIMESTAMP NULL,
    +  finished_at TIMESTAMP NULL,
    +  created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
    +  updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP
    +);
    +
    +CREATE INDEX idx_bulk_runs_action_status ON bulk_runs(action, status);
    +CREATE INDEX idx_bulk_runs_updated_at ON bulk_runs(updated_at);
    +

    伪代码

    func (h *DataOpsHandler) PreviewBulkEpisodeQA(c *gin.Context) {
       q, ok := h.parseBulkEpisodeFilters(c)
    @@ -752,9 +916,23 @@ 

    伪代码

    func (h *DataOpsHandler) BulkRunEpisodeQA(c *gin.Context) { req, q, ok := h.parseBulkEpisodeAction(c) + if current := h.currentBulkRun("bulk_qa"); current != nil { ...409... } ids := h.selectBulkEpisodeIDs(c.Request.Context(), q) - go h.runBulkEpisodeQA(ids) - c.JSON(http.StatusAccepted, response("accepted", len(ids), "...")) + run := h.createBulkQARun(ids) + go h.runBulkEpisodeQA(run.RunID, ids) + c.JSON(http.StatusAccepted, response(run, "...")) +} + +func (h *DataOpsHandler) GetBulkRun(c *gin.Context) { + run := h.loadBulkRun(c.Param("run_id")) + c.JSON(http.StatusOK, run) +} + +func (h *DataOpsHandler) StreamBulkRun(c *gin.Context) { + run := h.loadBulkRun(c.Param("run_id")) + writeSSE("bulk_run_snapshot", run) + if isTerminal(run.Status) { return } + subscribeAndFlushRunEvents(run.RunID) } func (h *DataOpsHandler) BulkSyncEpisodes(c *gin.Context) { @@ -768,7 +946,7 @@

    伪代码

    -

    7. 日志策略

    +

    8. 日志策略

    503 { "error": "qa service is not configured" }
    已有批量 QA 正在运行409{ "error": "bulk qa already running", "run_id": "...", "status": "running" }
    sync worker 未配置或未运行 503
    @@ -780,15 +958,19 @@

    7. 日志策略

    - + - + - + + + + + @@ -800,7 +982,7 @@

    7. 日志策略

    -

    8. 测试计划

    +

    9. 测试计划

    必须新增的 helper 单测

      @@ -811,6 +993,11 @@

      必须新增的 helper 单测

    • QA 预览统计能计算 matched_counteligible_countskipped_countprotected_status_count
    • 同步预览统计能计算 QA 未通过、已同步、正在同步等跳过原因汇总。
    • ID 快照 SQL 复用 data-ops from/where,并按 e.created_at DESC, e.id DESC 排序。
    • +
    • 批量 QA 执行创建 queued run,并返回 run_id 和 snapshot。
    • +
    • 存在 queued/runningbulk_qa run 时,再次执行返回 409 和现有 run_id
    • +
    • 每条 episode 处理完成后按本次结果更新 processed_countpassed_countqa_failed_countprocessing_failed_countskipped_count
    • +
    • total_count=0 时创建 completed run。
    • +
    • 启动清理会把遗留 queued/running run 标记为 interrupted
    • 空 filters 合法,会生成只包含 e.deleted_at IS NULL 的 where。
    • 非法 qa_statussync_status、ID 列表和时间范围仍返回解析错误。
    @@ -818,6 +1005,7 @@

    必须新增的 helper 单测

    可选集成测试

    • 使用 fake QA runner 验证 bulk QA goroutine 能按 ID 调用。
    • +
    • 使用 SSE stream 测试验证连接建立时先发送 bulk_run_snapshot,终态时发送一次终态事件并关闭。
    • 使用 fake sync enqueuer 验证 worker 未运行时返回 503
    • 路由注册 smoke test,确保两个 POST endpoint 挂在 data-ops admin group 下。
    @@ -825,11 +1013,16 @@

    可选集成测试

    -

    9. 实现清单

    +

    10. 实现清单

      -
    • 修改 keystone/internal/api/handlers/data_ops.go:新增 request/response、routes、handlers、helpers。
    • +
    • 新增数据库 migration:创建 bulk_runs 表和索引。
    • +
    • 修改 keystone/internal/api/handlers/data_ops.go / data_ops_bulk.go:新增 run response、routes、handlers、helpers。
    • 修改 keystone/internal/server/server.go:构造 DataOpsHandler 时传入 QA handler 和 sync worker。
    • +
    • 服务启动时调用启动清理,将遗留 bulk_qaqueued/running run 标记为 interrupted
    • +
    • 新增 bulk run SSE broker 和 GET /api/v1/data-ops/bulk-runs/:run_id/stream
    • +
    • 新增 GET /api/v1/data-ops/bulk-runs/:run_idGET /api/v1/data-ops/bulk-runs/current?action=bulk_qa
    • +
    • 前端批量质检按钮接入右下角 floating task panel,并用 fetch streaming 消费 SSE。
    • 必要时给 sync worker 暴露轻量 enqueuer 接口,降低 handler 对具体类型的耦合。
    • 补 Swagger 注释后运行 swag init -g internal/server/server.go -o docs
    • keystone/internal/api/handlers/data_ops_test.go 测试。
    • @@ -839,7 +1032,7 @@

      9. 实现清单

    -

    本文档记录第一版轻量级后端 API 规格。后续如果需要可靠长任务、进度展示、取消、重试和审计,应升级为持久化 batch job 模型。

    +

    本文档记录第一版批量操作 API 与批量 QA 实时进度规格。当前只做短期 run snapshot 与 SSE,不做长期审计、取消、重试或服务重启后继续执行;后续如果需要可靠长任务和审计,应升级为完整 batch job 模型。

    diff --git a/internal/api/handlers/data_ops.go b/internal/api/handlers/data_ops.go index 03f4d79..f78932e 100644 --- a/internal/api/handlers/data_ops.go +++ b/internal/api/handlers/data_ops.go @@ -10,6 +10,7 @@ import ( "fmt" "net/http" "strings" + "sync" "time" "github.com/gin-gonic/gin" @@ -31,14 +32,20 @@ var validDataOpsSyncStatuses = map[string]struct{}{ // DataOpsHandler handles data operations APIs for the admin workbench. type DataOpsHandler struct { - db *sqlx.DB - qa *EpisodeQAHandler - syncWorker *services.SyncWorker + db *sqlx.DB + qa *EpisodeQAHandler + qaRunner dataOpsEpisodeQARunner + syncWorker *services.SyncWorker + bulkRunMu sync.Mutex + bulkRunBroker *dataOpsBulkRunBroker } // NewDataOpsHandler creates a data operations handler. func NewDataOpsHandler(db *sqlx.DB) *DataOpsHandler { - return &DataOpsHandler{db: db} + return &DataOpsHandler{ + db: db, + bulkRunBroker: newDataOpsBulkRunBroker(), + } } // SetBulkActionDeps wires optional services used by data-ops bulk actions. @@ -47,6 +54,7 @@ func (h *DataOpsHandler) SetBulkActionDeps(qa *EpisodeQAHandler, syncWorker *ser return } h.qa = qa + h.qaRunner = qa h.syncWorker = syncWorker } @@ -57,6 +65,9 @@ func (h *DataOpsHandler) RegisterRoutes(apiV1 *gin.RouterGroup) { apiV1.POST("/episodes/bulk-sync/preview", h.PreviewBulkEpisodeSync) apiV1.POST("/episodes/bulk-qa", h.BulkRunEpisodeQA) apiV1.POST("/episodes/bulk-sync", h.BulkSyncEpisodes) + apiV1.GET("/bulk-runs/current", h.GetCurrentBulkRun) + apiV1.GET("/bulk-runs/:run_id", h.GetBulkRun) + apiV1.GET("/bulk-runs/:run_id/stream", h.StreamBulkRun) } type dataOpsEpisodeQuery struct { diff --git a/internal/api/handlers/data_ops_bulk.go b/internal/api/handlers/data_ops_bulk.go index aade406..4419649 100644 --- a/internal/api/handlers/data_ops_bulk.go +++ b/internal/api/handlers/data_ops_bulk.go @@ -13,7 +13,7 @@ import ( "strconv" "strings" "sync" - "sync/atomic" + "time" "github.com/gin-gonic/gin" @@ -72,6 +72,12 @@ type DataOpsBulkEpisodeActionResponse struct { Message string `json:"message"` } +// DataOpsBulkEpisodeQAActionResponse acknowledges an accepted asynchronous bulk QA run. +type DataOpsBulkEpisodeQAActionResponse struct { + Run DataOpsBulkRunResponse `json:"run"` + Message string `json:"message"` +} + type dataOpsBulkQAPreviewRow struct { MatchedCount int64 `db:"matched_count"` QARunningCount int64 `db:"qa_running_count"` @@ -159,8 +165,9 @@ func (h *DataOpsHandler) PreviewBulkEpisodeSync(c *gin.Context) { // @Accept json // @Produce json // @Param request body DataOpsBulkEpisodeActionRequest true "Bulk QA filters and confirmation" -// @Success 202 {object} DataOpsBulkEpisodeActionResponse +// @Success 202 {object} DataOpsBulkEpisodeQAActionResponse // @Failure 400 {object} map[string]string +// @Failure 409 {object} map[string]string // @Failure 503 {object} map[string]string // @Failure 500 {object} map[string]string // @Router /data-ops/episodes/bulk-qa [post] @@ -177,6 +184,22 @@ func (h *DataOpsHandler) BulkRunEpisodeQA(c *gin.Context) { return } + h.bulkRunMu.Lock() + defer h.bulkRunMu.Unlock() + + if current, exists, err := h.currentBulkRun(c.Request.Context(), dataOpsBulkRunActionQA); err != nil { + logger.Printf("[DATA_OPS] bulk QA current run lookup failed: %v", err) + c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to load current bulk run"}) + return + } else if exists { + c.JSON(http.StatusConflict, gin.H{ + "error": "bulk qa already running", + "run_id": current.RunID, + "status": current.Status, + }) + return + } + ids, err := h.selectBulkEpisodeIDs(c.Request.Context(), q) if err != nil { logger.Printf("[DATA_OPS] bulk QA ID snapshot failed: %v", err) @@ -184,13 +207,21 @@ func (h *DataOpsHandler) BulkRunEpisodeQA(c *gin.Context) { return } - logger.Printf("[DATA_OPS] Bulk QA accepted: matched=%d", len(ids)) - go h.runBulkEpisodeQA(ids) + run, err := h.createBulkQARun(c.Request.Context(), int64(len(ids))) + if err != nil { + logger.Printf("[DATA_OPS] bulk QA run create failed: %v", err) + c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to create bulk qa run"}) + return + } + + logger.Printf("[DATA_OPS] Bulk QA accepted: run_id=%s total=%d", run.RunID, run.TotalCount) + if len(ids) > 0 { + go h.runBulkEpisodeQA(run.RunID, ids) + } - c.JSON(http.StatusAccepted, DataOpsBulkEpisodeActionResponse{ - Status: "accepted", - MatchedCount: len(ids), - Message: fmt.Sprintf("%d episodes accepted for bulk QA", len(ids)), + c.JSON(http.StatusAccepted, DataOpsBulkEpisodeQAActionResponse{ + Run: run, + Message: fmt.Sprintf("%d episodes accepted for bulk QA", len(ids)), }) } @@ -246,7 +277,7 @@ func (h *DataOpsHandler) ensureDataOpsDatabase(c *gin.Context) bool { } func (h *DataOpsHandler) ensureBulkQAConfigured(c *gin.Context) bool { - if h.qa == nil || h.qa.db == nil || h.qa.s3 == nil { + if h.bulkQARunner() == nil || (h.qa != nil && (h.qa.db == nil || h.qa.s3 == nil)) { c.JSON(http.StatusServiceUnavailable, gin.H{"error": "qa service is not configured"}) return false } @@ -579,10 +610,20 @@ func dataOpsEpisodeIDSnapshotSQL(fromSQL string, where string) string { ` } -func (h *DataOpsHandler) runBulkEpisodeQA(ids []int64) { +func (h *DataOpsHandler) runBulkEpisodeQA(runID string, ids []int64) { matched := int64(len(ids)) if matched == 0 { - logger.Printf("[DATA_OPS] Bulk QA completed: matched=0, attempted=0, skipped=0, failed=0") + logger.Printf("[DATA_OPS] Bulk QA completed: run_id=%s total=0 processed=0 passed=0 qa_failed=0 processing_failed=0 skipped=0", runID) + return + } + runner := h.bulkQARunner() + if runner == nil { + logger.Printf("[DATA_OPS] Bulk QA failed: run_id=%s, err=qa runner is not configured", runID) + _, _ = h.markBulkRunTerminal(context.Background(), runID, dataOpsBulkRunStatusFailed, "qa runner is not configured") + return + } + if _, err := h.markBulkRunRunning(context.Background(), runID); err != nil { + logger.Printf("[DATA_OPS] Bulk QA failed to start: run_id=%s, err=%v", runID, err) return } @@ -591,10 +632,8 @@ func (h *DataOpsHandler) runBulkEpisodeQA(ids []int64) { workerCount = len(ids) } - var attempted int64 - var skipped int64 - var failed int64 jobs := make(chan int64) + results := make(chan dataOpsBulkQAEpisodeResult) var wg sync.WaitGroup for i := 0; i < workerCount; i++ { @@ -603,34 +642,69 @@ func (h *DataOpsHandler) runBulkEpisodeQA(ids []int64) { defer wg.Done() for episodeID := range jobs { ctx, cancel := context.WithTimeout(context.Background(), defaultEpisodeQATimeout) - _, err := h.qa.RunEpisodeQASuite(ctx, episodeID, qaRunModeManual) + result, err := runner.RunEpisodeQASuite(ctx, episodeID, qaRunModeManual) cancel() if err != nil { if isBulkQASkippedError(err) { - atomic.AddInt64(&skipped, 1) + results <- dataOpsBulkQAEpisodeResult{episodeID: episodeID, outcome: dataOpsBulkQAEpisodeSkipped} continue } - atomic.AddInt64(&failed, 1) logger.Printf("[DATA_OPS] Bulk QA failed: episode=%d, err=%v", episodeID, err) + results <- dataOpsBulkQAEpisodeResult{episodeID: episodeID, outcome: dataOpsBulkQAEpisodeProcessingFailed} + continue + } + if result == nil { + logger.Printf("[DATA_OPS] Bulk QA failed: episode=%d, err=empty qa result", episodeID) + results <- dataOpsBulkQAEpisodeResult{episodeID: episodeID, outcome: dataOpsBulkQAEpisodeProcessingFailed} continue } - atomic.AddInt64(&attempted, 1) + if result.Passed { + results <- dataOpsBulkQAEpisodeResult{episodeID: episodeID, outcome: dataOpsBulkQAEpisodePassed} + } else { + results <- dataOpsBulkQAEpisodeResult{episodeID: episodeID, outcome: dataOpsBulkQAEpisodeFailed} + } } }() } - for _, episodeID := range ids { - jobs <- episodeID + go func() { + for _, episodeID := range ids { + jobs <- episodeID + } + close(jobs) + wg.Wait() + close(results) + }() + + var lastProgressPublishedAt time.Time + for result := range results { + run, err := h.incrementBulkQARunCounts(context.Background(), runID, result.outcome) + if err != nil { + logger.Printf("[DATA_OPS] Bulk QA progress update failed: run_id=%s episode=%d err=%v", runID, result.episodeID, err) + continue + } + now := time.Now() + if lastProgressPublishedAt.IsZero() || now.Sub(lastProgressPublishedAt) >= 500*time.Millisecond { + h.publishBulkRunEvent("bulk_run_progress", run) + lastProgressPublishedAt = now + } + } + + finalRun, err := h.markBulkRunTerminal(context.Background(), runID, dataOpsBulkRunStatusCompleted, "") + if err != nil { + logger.Printf("[DATA_OPS] Bulk QA completion update failed: run_id=%s err=%v", runID, err) + return } - close(jobs) - wg.Wait() logger.Printf( - "[DATA_OPS] Bulk QA completed: matched=%d, attempted=%d, skipped=%d, failed=%d", + "[DATA_OPS] Bulk QA completed: run_id=%s total=%d processed=%d passed=%d qa_failed=%d processing_failed=%d skipped=%d", + runID, matched, - attempted, - skipped, - failed, + finalRun.ProcessedCount, + finalRun.PassedCount, + finalRun.QAFailedCount, + finalRun.ProcessingFailedCount, + finalRun.SkippedCount, ) } diff --git a/internal/api/handlers/data_ops_bulk_run.go b/internal/api/handlers/data_ops_bulk_run.go new file mode 100644 index 0000000..468c40c --- /dev/null +++ b/internal/api/handlers/data_ops_bulk_run.go @@ -0,0 +1,516 @@ +// SPDX-FileCopyrightText: 2026 ArcheBase +// +// SPDX-License-Identifier: MulanPSL-2.0 + +package handlers + +import ( + "context" + "crypto/rand" + "database/sql" + "encoding/hex" + "encoding/json" + "fmt" + "net/http" + "strings" + "sync" + "time" + + "github.com/gin-gonic/gin" +) + +const ( + dataOpsBulkRunActionQA = "bulk_qa" + + dataOpsBulkRunStatusQueued = "queued" + dataOpsBulkRunStatusRunning = "running" + dataOpsBulkRunStatusCompleted = "completed" + dataOpsBulkRunStatusFailed = "failed" + dataOpsBulkRunStatusInterrupted = "interrupted" +) + +type dataOpsEpisodeQARunner interface { + RunEpisodeQASuite(ctx context.Context, episodeID int64, mode QARunMode) (*EpisodeQASuiteResponse, error) +} + +type dataOpsBulkRunEvent struct { + name string + run DataOpsBulkRunResponse +} + +type dataOpsBulkRunBroker struct { + mu sync.Mutex + subscribers map[string]map[chan dataOpsBulkRunEvent]struct{} +} + +func newDataOpsBulkRunBroker() *dataOpsBulkRunBroker { + return &dataOpsBulkRunBroker{subscribers: make(map[string]map[chan dataOpsBulkRunEvent]struct{})} +} + +func (b *dataOpsBulkRunBroker) Subscribe(runID string, buffer int) (<-chan dataOpsBulkRunEvent, func()) { + if b == nil { + ch := make(chan dataOpsBulkRunEvent) + close(ch) + return ch, func() {} + } + ch := make(chan dataOpsBulkRunEvent, buffer) + b.mu.Lock() + if b.subscribers == nil { + b.subscribers = make(map[string]map[chan dataOpsBulkRunEvent]struct{}) + } + if b.subscribers[runID] == nil { + b.subscribers[runID] = make(map[chan dataOpsBulkRunEvent]struct{}) + } + b.subscribers[runID][ch] = struct{}{} + b.mu.Unlock() + + unsubscribe := func() { + b.mu.Lock() + defer b.mu.Unlock() + if subscribers := b.subscribers[runID]; subscribers != nil { + if _, ok := subscribers[ch]; ok { + delete(subscribers, ch) + close(ch) + } + if len(subscribers) == 0 { + delete(b.subscribers, runID) + } + } + } + return ch, unsubscribe +} + +func (b *dataOpsBulkRunBroker) Publish(runID string, event dataOpsBulkRunEvent) { + if b == nil { + return + } + b.mu.Lock() + defer b.mu.Unlock() + for ch := range b.subscribers[runID] { + select { + case ch <- event: + default: + } + } +} + +type dataOpsBulkQAEpisodeOutcome string + +const ( + dataOpsBulkQAEpisodePassed dataOpsBulkQAEpisodeOutcome = "passed" + dataOpsBulkQAEpisodeFailed dataOpsBulkQAEpisodeOutcome = "qa_failed" + dataOpsBulkQAEpisodeProcessingFailed dataOpsBulkQAEpisodeOutcome = "processing_failed" + dataOpsBulkQAEpisodeSkipped dataOpsBulkQAEpisodeOutcome = "skipped" +) + +type dataOpsBulkQAEpisodeResult struct { + episodeID int64 + outcome dataOpsBulkQAEpisodeOutcome +} + +// DataOpsBulkRunResponse is the short-lived progress snapshot for one bulk action run. +type DataOpsBulkRunResponse struct { + RunID string `json:"run_id"` + Action string `json:"action"` + Status string `json:"status"` + TotalCount int64 `json:"total_count"` + ProcessedCount int64 `json:"processed_count"` + PassedCount int64 `json:"passed_count"` + QAFailedCount int64 `json:"qa_failed_count"` + ProcessingFailedCount int64 `json:"processing_failed_count"` + SkippedCount int64 `json:"skipped_count"` + StartedAt *time.Time `json:"started_at"` + UpdatedAt time.Time `json:"updated_at"` + FinishedAt *time.Time `json:"finished_at"` + ErrorMessage string `json:"error_message"` +} + +type dataOpsBulkRunRow struct { + ID int64 `db:"id"` + RunID string `db:"run_id"` + Action string `db:"action"` + Status string `db:"status"` + TotalCount int64 `db:"total_count"` + ProcessedCount int64 `db:"processed_count"` + PassedCount int64 `db:"passed_count"` + QAFailedCount int64 `db:"qa_failed_count"` + ProcessingFailedCount int64 `db:"processing_failed_count"` + SkippedCount int64 `db:"skipped_count"` + ErrorMessage sql.NullString `db:"error_message"` + StartedAt sql.NullTime `db:"started_at"` + FinishedAt sql.NullTime `db:"finished_at"` + CreatedAt time.Time `db:"created_at"` + UpdatedAt time.Time `db:"updated_at"` +} + +func (h *DataOpsHandler) bulkQARunner() dataOpsEpisodeQARunner { + if h == nil { + return nil + } + if h.qaRunner != nil { + return h.qaRunner + } + if h.qa != nil { + return h.qa + } + return nil +} + +func (h *DataOpsHandler) ensureBulkRunBroker() *dataOpsBulkRunBroker { + if h.bulkRunBroker == nil { + h.bulkRunMu.Lock() + defer h.bulkRunMu.Unlock() + } + if h.bulkRunBroker == nil { + h.bulkRunBroker = newDataOpsBulkRunBroker() + } + return h.bulkRunBroker +} + +func (h *DataOpsHandler) publishBulkRunEvent(name string, run DataOpsBulkRunResponse) { + if h == nil { + return + } + h.ensureBulkRunBroker().Publish(run.RunID, dataOpsBulkRunEvent{name: name, run: run}) +} + +func (h *DataOpsHandler) dataOpsBulkRunNow() time.Time { + return time.Now().UTC().Truncate(time.Second) +} + +func defaultDataOpsBulkRunID(action string, now time.Time) (string, error) { + var randomBytes [3]byte + if _, err := rand.Read(randomBytes[:]); err != nil { + return "", err + } + return fmt.Sprintf("%s_%s_%s", action, now.UTC().Format("20060102_150405"), hex.EncodeToString(randomBytes[:])), nil +} + +func dataOpsBulkRunResponseFromRow(row dataOpsBulkRunRow) DataOpsBulkRunResponse { + resp := DataOpsBulkRunResponse{ + RunID: row.RunID, + Action: row.Action, + Status: row.Status, + TotalCount: row.TotalCount, + ProcessedCount: row.ProcessedCount, + PassedCount: row.PassedCount, + QAFailedCount: row.QAFailedCount, + ProcessingFailedCount: row.ProcessingFailedCount, + SkippedCount: row.SkippedCount, + UpdatedAt: row.UpdatedAt.UTC(), + } + if row.ErrorMessage.Valid { + resp.ErrorMessage = row.ErrorMessage.String + } + if row.StartedAt.Valid { + startedAt := row.StartedAt.Time.UTC() + resp.StartedAt = &startedAt + } + if row.FinishedAt.Valid { + finishedAt := row.FinishedAt.Time.UTC() + resp.FinishedAt = &finishedAt + } + return resp +} + +func (h *DataOpsHandler) createBulkQARun(ctx context.Context, totalCount int64) (DataOpsBulkRunResponse, error) { + now := h.dataOpsBulkRunNow() + runID, err := defaultDataOpsBulkRunID(dataOpsBulkRunActionQA, now) + if err != nil { + return DataOpsBulkRunResponse{}, err + } + + status := dataOpsBulkRunStatusQueued + var startedAt interface{} + var finishedAt interface{} + if totalCount == 0 { + status = dataOpsBulkRunStatusCompleted + startedAt = now + finishedAt = now + } + + if _, err := h.db.ExecContext(ctx, ` + INSERT INTO bulk_runs ( + run_id, action, status, total_count, processed_count, passed_count, + qa_failed_count, processing_failed_count, skipped_count, error_message, + started_at, finished_at, created_at, updated_at + ) + VALUES (?, ?, ?, ?, 0, 0, 0, 0, 0, '', ?, ?, ?, ?) + `, runID, dataOpsBulkRunActionQA, status, totalCount, startedAt, finishedAt, now, now); err != nil { + return DataOpsBulkRunResponse{}, err + } + + return h.loadBulkRun(ctx, runID) +} + +func (h *DataOpsHandler) loadBulkRun(ctx context.Context, runID string) (DataOpsBulkRunResponse, error) { + var row dataOpsBulkRunRow + if err := h.db.GetContext(ctx, &row, ` + SELECT id, run_id, action, status, total_count, processed_count, passed_count, + qa_failed_count, processing_failed_count, skipped_count, error_message, + started_at, finished_at, created_at, updated_at + FROM bulk_runs + WHERE run_id = ? + `, runID); err != nil { + return DataOpsBulkRunResponse{}, err + } + return dataOpsBulkRunResponseFromRow(row), nil +} + +func (h *DataOpsHandler) markBulkRunRunning(ctx context.Context, runID string) (DataOpsBulkRunResponse, error) { + now := h.dataOpsBulkRunNow() + if _, err := h.db.ExecContext(ctx, ` + UPDATE bulk_runs + SET status = ?, started_at = COALESCE(started_at, ?), updated_at = ? + WHERE run_id = ? AND status = ? + `, dataOpsBulkRunStatusRunning, now, now, runID, dataOpsBulkRunStatusQueued); err != nil { + return DataOpsBulkRunResponse{}, err + } + return h.loadBulkRun(ctx, runID) +} + +func (h *DataOpsHandler) incrementBulkQARunCounts(ctx context.Context, runID string, outcome dataOpsBulkQAEpisodeOutcome) (DataOpsBulkRunResponse, error) { + var passedDelta int64 + var qaFailedDelta int64 + var processingFailedDelta int64 + var skippedDelta int64 + switch outcome { + case dataOpsBulkQAEpisodePassed: + passedDelta = 1 + case dataOpsBulkQAEpisodeFailed: + qaFailedDelta = 1 + case dataOpsBulkQAEpisodeProcessingFailed: + processingFailedDelta = 1 + case dataOpsBulkQAEpisodeSkipped: + skippedDelta = 1 + default: + return DataOpsBulkRunResponse{}, fmt.Errorf("unknown bulk qa outcome %q", outcome) + } + + if _, err := h.db.ExecContext(ctx, ` + UPDATE bulk_runs + SET processed_count = processed_count + 1, + passed_count = passed_count + ?, + qa_failed_count = qa_failed_count + ?, + processing_failed_count = processing_failed_count + ?, + skipped_count = skipped_count + ?, + updated_at = ? + WHERE run_id = ? AND status = ? + `, passedDelta, qaFailedDelta, processingFailedDelta, skippedDelta, h.dataOpsBulkRunNow(), runID, dataOpsBulkRunStatusRunning); err != nil { + return DataOpsBulkRunResponse{}, err + } + return h.loadBulkRun(ctx, runID) +} + +func (h *DataOpsHandler) markBulkRunTerminal(ctx context.Context, runID string, status string, errorMessage string) (DataOpsBulkRunResponse, error) { + now := h.dataOpsBulkRunNow() + if _, err := h.db.ExecContext(ctx, ` + UPDATE bulk_runs + SET status = ?, error_message = ?, finished_at = COALESCE(finished_at, ?), updated_at = ? + WHERE run_id = ? + `, status, errorMessage, now, now, runID); err != nil { + return DataOpsBulkRunResponse{}, err + } + run, err := h.loadBulkRun(ctx, runID) + if err != nil { + return DataOpsBulkRunResponse{}, err + } + if eventName, ok := dataOpsBulkRunTerminalEventName(run.Status); ok { + h.publishBulkRunEvent(eventName, run) + } + return run, nil +} + +// InterruptActiveBulkQARuns marks stale in-flight bulk QA runs as interrupted on service startup. +func (h *DataOpsHandler) InterruptActiveBulkQARuns(ctx context.Context) error { + if h == nil || h.db == nil { + return nil + } + now := h.dataOpsBulkRunNow() + _, err := h.db.ExecContext(ctx, ` + UPDATE bulk_runs + SET status = ?, error_message = ?, finished_at = COALESCE(finished_at, ?), updated_at = ? + WHERE action = ? AND status IN (?, ?) + `, dataOpsBulkRunStatusInterrupted, "service restarted before bulk qa completed", now, now, dataOpsBulkRunActionQA, dataOpsBulkRunStatusQueued, dataOpsBulkRunStatusRunning) + return err +} + +// GetBulkRun returns the latest stored snapshot for one bulk run. +// +// @Summary Get bulk run snapshot +// @Description Returns the current aggregate snapshot for one bulk action run. +// @Tags data-ops +// @Produce json +// @Param run_id path string true "Bulk run ID" +// @Success 200 {object} DataOpsBulkRunResponse +// @Failure 404 {object} map[string]string +// @Failure 503 {object} map[string]string +// @Failure 500 {object} map[string]string +// @Router /data-ops/bulk-runs/{run_id} [get] +func (h *DataOpsHandler) GetBulkRun(c *gin.Context) { + if !h.ensureDataOpsDatabase(c) { + return + } + run, err := h.loadBulkRun(c.Request.Context(), c.Param("run_id")) + if err != nil { + if err == sql.ErrNoRows { + c.JSON(http.StatusNotFound, gin.H{"error": "bulk run not found"}) + return + } + c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to load bulk run"}) + return + } + c.JSON(http.StatusOK, run) +} + +// GetCurrentBulkRun returns the active bulk QA run, if one exists. +// +// @Summary Get current bulk run +// @Description Returns the active bulk QA run snapshot, or 204 when no run is active. +// @Tags data-ops +// @Produce json +// @Param action query string true "Bulk action, currently bulk_qa" +// @Success 200 {object} DataOpsBulkRunResponse +// @Success 204 +// @Failure 400 {object} map[string]string +// @Failure 503 {object} map[string]string +// @Failure 500 {object} map[string]string +// @Router /data-ops/bulk-runs/current [get] +func (h *DataOpsHandler) GetCurrentBulkRun(c *gin.Context) { + if !h.ensureDataOpsDatabase(c) { + return + } + if c.Query("action") != dataOpsBulkRunActionQA { + c.JSON(http.StatusBadRequest, gin.H{"error": "action must be bulk_qa"}) + return + } + + run, ok, err := h.currentBulkRun(c.Request.Context(), dataOpsBulkRunActionQA) + if err != nil { + c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to load current bulk run"}) + return + } + if !ok { + c.Status(http.StatusNoContent) + return + } + c.JSON(http.StatusOK, run) +} + +// StreamBulkRun streams progress events for one bulk run. +// +// @Summary Stream bulk run progress +// @Description Streams aggregate bulk run snapshots using Server-Sent Events. +// @Tags data-ops +// @Produce text/event-stream +// @Param run_id path string true "Bulk run ID" +// @Success 200 +// @Failure 404 {object} map[string]string +// @Failure 503 {object} map[string]string +// @Failure 500 {object} map[string]string +// @Router /data-ops/bulk-runs/{run_id}/stream [get] +func (h *DataOpsHandler) StreamBulkRun(c *gin.Context) { + if !h.ensureDataOpsDatabase(c) { + return + } + + runID := strings.TrimSpace(c.Param("run_id")) + events, unsubscribe := h.ensureBulkRunBroker().Subscribe(runID, 64) + defer unsubscribe() + + run, err := h.loadBulkRun(c.Request.Context(), runID) + if err != nil { + if err == sql.ErrNoRows { + c.JSON(http.StatusNotFound, gin.H{"error": "bulk run not found"}) + return + } + c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to load bulk run"}) + return + } + + w := c.Writer + w.Header().Set("Content-Type", "text/event-stream") + w.Header().Set("Cache-Control", "no-cache") + w.Header().Set("Connection", "keep-alive") + w.Header().Set("X-Accel-Buffering", "no") + w.WriteHeader(http.StatusOK) + w.Flush() + + if err := writeDataOpsBulkRunSSE(w, "bulk_run_snapshot", run); err != nil { + return + } + if eventName, ok := dataOpsBulkRunTerminalEventName(run.Status); ok { + _ = writeDataOpsBulkRunSSE(w, eventName, run) + return + } + + heartbeat := time.NewTicker(15 * time.Second) + defer heartbeat.Stop() + for { + select { + case <-c.Request.Context().Done(): + return + case event, ok := <-events: + if !ok { + return + } + if err := writeDataOpsBulkRunSSE(w, event.name, event.run); err != nil { + return + } + if _, terminal := dataOpsBulkRunTerminalEventName(event.run.Status); terminal { + return + } + case <-heartbeat.C: + if err := writeDataOpsBulkRunSSE(w, "ping", gin.H{"ts": h.dataOpsBulkRunNow().Format(time.RFC3339)}); err != nil { + return + } + } + } +} + +func (h *DataOpsHandler) currentBulkRun(ctx context.Context, action string) (DataOpsBulkRunResponse, bool, error) { + var row dataOpsBulkRunRow + if err := h.db.GetContext(ctx, &row, ` + SELECT id, run_id, action, status, total_count, processed_count, passed_count, + qa_failed_count, processing_failed_count, skipped_count, error_message, + started_at, finished_at, created_at, updated_at + FROM bulk_runs + WHERE action = ? AND status IN (?, ?) + ORDER BY updated_at DESC, id DESC + LIMIT 1 + `, action, dataOpsBulkRunStatusQueued, dataOpsBulkRunStatusRunning); err != nil { + if err == sql.ErrNoRows { + return DataOpsBulkRunResponse{}, false, nil + } + return DataOpsBulkRunResponse{}, false, err + } + return dataOpsBulkRunResponseFromRow(row), true, nil +} + +func dataOpsBulkRunTerminalEventName(status string) (string, bool) { + switch status { + case dataOpsBulkRunStatusCompleted: + return "bulk_run_completed", true + case dataOpsBulkRunStatusFailed: + return "bulk_run_failed", true + case dataOpsBulkRunStatusInterrupted: + return "bulk_run_interrupted", true + default: + return "", false + } +} + +func writeDataOpsBulkRunSSE(w gin.ResponseWriter, eventName string, payload interface{}) error { + encoded, err := json.Marshal(payload) + if err != nil { + return err + } + if _, err := fmt.Fprintf(w, "event: %s\n", eventName); err != nil { + return err + } + if _, err := fmt.Fprintf(w, "data: %s\n\n", encoded); err != nil { + return err + } + w.Flush() + return nil +} diff --git a/internal/api/handlers/data_ops_test.go b/internal/api/handlers/data_ops_test.go index e2537bb..d46a9ca 100644 --- a/internal/api/handlers/data_ops_test.go +++ b/internal/api/handlers/data_ops_test.go @@ -7,10 +7,13 @@ package handlers import ( "bytes" "context" + "encoding/json" + "errors" "net/http" "net/http/httptest" "strings" "testing" + "time" "github.com/gin-gonic/gin" "github.com/jmoiron/sqlx" @@ -267,6 +270,364 @@ func TestPreviewBulkEpisodeSyncTreatsMissingSyncLogAsEligible(t *testing.T) { } } +func TestBulkRunEpisodeQACreatesRunSnapshot(t *testing.T) { + gin.SetMode(gin.TestMode) + + db := setupDataOpsBulkPreviewTestDB(t) + release := make(chan struct{}) + h := &DataOpsHandler{db: db, qaRunner: controlledDataOpsQARunner{release: release}} + router := gin.New() + h.RegisterRoutes(router.Group("/api/v1/data-ops")) + + insertDataOpsBulkTestEpisode(t, db, 1, "2026-06-02T00:00:00Z") + insertDataOpsBulkTestEpisode(t, db, 2, "2026-06-01T00:00:00Z") + + req := httptest.NewRequest(http.MethodPost, "/api/v1/data-ops/episodes/bulk-qa", bytes.NewBufferString(`{"confirm":true,"filters":{}}`)) + req.Header.Set("Content-Type", "application/json") + rec := httptest.NewRecorder() + + router.ServeHTTP(rec, req) + + if rec.Code != http.StatusAccepted { + t.Fatalf("status = %d, body = %s", rec.Code, rec.Body.String()) + } + + var got struct { + Run DataOpsBulkRunResponse `json:"run"` + Message string `json:"message"` + } + if err := json.Unmarshal(rec.Body.Bytes(), &got); err != nil { + t.Fatalf("decode response: %v", err) + } + + if !strings.HasPrefix(got.Run.RunID, "bulk_qa_") { + t.Fatalf("run_id = %q, want bulk_qa_ prefix", got.Run.RunID) + } + if got.Run.Action != "bulk_qa" || got.Run.Status != "queued" { + t.Fatalf("run action/status = %s/%s, want bulk_qa/queued", got.Run.Action, got.Run.Status) + } + if got.Run.TotalCount != 2 || got.Run.ProcessedCount != 0 { + t.Fatalf("run counts = total %d processed %d, want 2/0", got.Run.TotalCount, got.Run.ProcessedCount) + } + if got.Message != "2 episodes accepted for bulk QA" { + t.Fatalf("message = %q", got.Message) + } + close(release) + waitForBulkRunStatus(t, router, got.Run.RunID, dataOpsBulkRunStatusCompleted) +} + +func TestBulkRunEpisodeQARejectsSecondActiveRun(t *testing.T) { + gin.SetMode(gin.TestMode) + + db := setupDataOpsBulkPreviewTestDB(t) + release := make(chan struct{}) + h := &DataOpsHandler{db: db, qaRunner: controlledDataOpsQARunner{release: release}} + router := gin.New() + h.RegisterRoutes(router.Group("/api/v1/data-ops")) + + insertDataOpsBulkTestEpisode(t, db, 1, "2026-06-01T00:00:00Z") + + first := httptest.NewRecorder() + firstReq := httptest.NewRequest(http.MethodPost, "/api/v1/data-ops/episodes/bulk-qa", bytes.NewBufferString(`{"confirm":true,"filters":{}}`)) + firstReq.Header.Set("Content-Type", "application/json") + router.ServeHTTP(first, firstReq) + if first.Code != http.StatusAccepted { + t.Fatalf("first status = %d, body = %s", first.Code, first.Body.String()) + } + var firstBody struct { + Run DataOpsBulkRunResponse `json:"run"` + } + if err := json.Unmarshal(first.Body.Bytes(), &firstBody); err != nil { + t.Fatalf("decode first response: %v", err) + } + + second := httptest.NewRecorder() + secondReq := httptest.NewRequest(http.MethodPost, "/api/v1/data-ops/episodes/bulk-qa", bytes.NewBufferString(`{"confirm":true,"filters":{}}`)) + secondReq.Header.Set("Content-Type", "application/json") + router.ServeHTTP(second, secondReq) + + if second.Code != http.StatusConflict { + t.Fatalf("second status = %d, body = %s", second.Code, second.Body.String()) + } + var conflict struct { + Error string `json:"error"` + RunID string `json:"run_id"` + Status string `json:"status"` + } + if err := json.Unmarshal(second.Body.Bytes(), &conflict); err != nil { + t.Fatalf("decode conflict response: %v", err) + } + if conflict.RunID != firstBody.Run.RunID || (conflict.Status != dataOpsBulkRunStatusQueued && conflict.Status != dataOpsBulkRunStatusRunning) { + t.Fatalf("conflict = %+v, want run_id %s and active status", conflict, firstBody.Run.RunID) + } + close(release) + waitForBulkRunStatus(t, router, firstBody.Run.RunID, dataOpsBulkRunStatusCompleted) +} + +func TestGetBulkRunAndCurrentBulkRunReturnSnapshots(t *testing.T) { + gin.SetMode(gin.TestMode) + + db := setupDataOpsBulkPreviewTestDB(t) + release := make(chan struct{}) + h := &DataOpsHandler{db: db, qaRunner: controlledDataOpsQARunner{release: release}} + router := gin.New() + h.RegisterRoutes(router.Group("/api/v1/data-ops")) + + insertDataOpsBulkTestEpisode(t, db, 1, "2026-06-01T00:00:00Z") + + postRec := httptest.NewRecorder() + postReq := httptest.NewRequest(http.MethodPost, "/api/v1/data-ops/episodes/bulk-qa", bytes.NewBufferString(`{"confirm":true,"filters":{}}`)) + postReq.Header.Set("Content-Type", "application/json") + router.ServeHTTP(postRec, postReq) + if postRec.Code != http.StatusAccepted { + t.Fatalf("post status = %d, body = %s", postRec.Code, postRec.Body.String()) + } + var postBody struct { + Run DataOpsBulkRunResponse `json:"run"` + } + if err := json.Unmarshal(postRec.Body.Bytes(), &postBody); err != nil { + t.Fatalf("decode post response: %v", err) + } + + getRec := httptest.NewRecorder() + getReq := httptest.NewRequest(http.MethodGet, "/api/v1/data-ops/bulk-runs/"+postBody.Run.RunID, nil) + router.ServeHTTP(getRec, getReq) + if getRec.Code != http.StatusOK { + t.Fatalf("get status = %d, body = %s", getRec.Code, getRec.Body.String()) + } + var got DataOpsBulkRunResponse + if err := json.Unmarshal(getRec.Body.Bytes(), &got); err != nil { + t.Fatalf("decode get response: %v", err) + } + if got.RunID != postBody.Run.RunID || got.TotalCount != 1 { + t.Fatalf("snapshot = %+v, want run_id %s and total 1", got, postBody.Run.RunID) + } + + currentRec := httptest.NewRecorder() + currentReq := httptest.NewRequest(http.MethodGet, "/api/v1/data-ops/bulk-runs/current?action=bulk_qa", nil) + router.ServeHTTP(currentRec, currentReq) + if currentRec.Code != http.StatusOK { + t.Fatalf("current status = %d, body = %s", currentRec.Code, currentRec.Body.String()) + } + var current DataOpsBulkRunResponse + if err := json.Unmarshal(currentRec.Body.Bytes(), ¤t); err != nil { + t.Fatalf("decode current response: %v", err) + } + if current.RunID != postBody.Run.RunID { + t.Fatalf("current run_id = %s, want %s", current.RunID, postBody.Run.RunID) + } + close(release) + waitForBulkRunStatus(t, router, postBody.Run.RunID, dataOpsBulkRunStatusCompleted) +} + +func TestCurrentBulkRunReturnsNoContentWhenNoRunIsActive(t *testing.T) { + gin.SetMode(gin.TestMode) + + db := setupDataOpsBulkPreviewTestDB(t) + h := &DataOpsHandler{db: db, qaRunner: scriptedDataOpsQARunner{}} + router := gin.New() + h.RegisterRoutes(router.Group("/api/v1/data-ops")) + + rec := httptest.NewRecorder() + req := httptest.NewRequest(http.MethodGet, "/api/v1/data-ops/bulk-runs/current?action=bulk_qa", nil) + router.ServeHTTP(rec, req) + + if rec.Code != http.StatusNoContent { + t.Fatalf("status = %d, body = %s", rec.Code, rec.Body.String()) + } +} + +func TestBulkRunEpisodeQAWithNoMatchedEpisodesCompletesImmediately(t *testing.T) { + gin.SetMode(gin.TestMode) + + db := setupDataOpsBulkPreviewTestDB(t) + h := &DataOpsHandler{db: db, qaRunner: scriptedDataOpsQARunner{}} + router := gin.New() + h.RegisterRoutes(router.Group("/api/v1/data-ops")) + + rec := httptest.NewRecorder() + req := httptest.NewRequest(http.MethodPost, "/api/v1/data-ops/episodes/bulk-qa", bytes.NewBufferString(`{"confirm":true,"filters":{}}`)) + req.Header.Set("Content-Type", "application/json") + router.ServeHTTP(rec, req) + + if rec.Code != http.StatusAccepted { + t.Fatalf("status = %d, body = %s", rec.Code, rec.Body.String()) + } + var got struct { + Run DataOpsBulkRunResponse `json:"run"` + } + if err := json.Unmarshal(rec.Body.Bytes(), &got); err != nil { + t.Fatalf("decode response: %v", err) + } + if got.Run.Status != dataOpsBulkRunStatusCompleted || got.Run.TotalCount != 0 || got.Run.FinishedAt == nil { + t.Fatalf("run = %+v, want completed empty run with finished_at", got.Run) + } +} + +func TestBulkRunEpisodeQAUpdatesRunProgressFromSuiteResults(t *testing.T) { + gin.SetMode(gin.TestMode) + + db := setupDataOpsBulkPreviewTestDB(t) + h := &DataOpsHandler{db: db, qaRunner: scriptedDataOpsQARunner{ + results: map[int64]*EpisodeQASuiteResponse{ + 1: {EpisodeID: 1, Passed: true, Mode: qaRunModeManual}, + 2: {EpisodeID: 2, Passed: false, Mode: qaRunModeManual}, + }, + errs: map[int64]error{ + 3: errEpisodeQAAlreadyRunning, + 4: errors.New("s3 read failed"), + }, + }} + router := gin.New() + h.RegisterRoutes(router.Group("/api/v1/data-ops")) + + insertDataOpsBulkTestEpisode(t, db, 1, "2026-06-04T00:00:00Z") + insertDataOpsBulkTestEpisode(t, db, 2, "2026-06-03T00:00:00Z") + insertDataOpsBulkTestEpisode(t, db, 3, "2026-06-02T00:00:00Z") + insertDataOpsBulkTestEpisode(t, db, 4, "2026-06-01T00:00:00Z") + + rec := httptest.NewRecorder() + req := httptest.NewRequest(http.MethodPost, "/api/v1/data-ops/episodes/bulk-qa", bytes.NewBufferString(`{"confirm":true,"filters":{}}`)) + req.Header.Set("Content-Type", "application/json") + router.ServeHTTP(rec, req) + if rec.Code != http.StatusAccepted { + t.Fatalf("status = %d, body = %s", rec.Code, rec.Body.String()) + } + var accepted struct { + Run DataOpsBulkRunResponse `json:"run"` + } + if err := json.Unmarshal(rec.Body.Bytes(), &accepted); err != nil { + t.Fatalf("decode response: %v", err) + } + + got := waitForBulkRunStatus(t, router, accepted.Run.RunID, dataOpsBulkRunStatusCompleted) + if got.TotalCount != 4 || got.ProcessedCount != 4 { + t.Fatalf("total/processed = %d/%d, want 4/4", got.TotalCount, got.ProcessedCount) + } + if got.PassedCount != 1 || got.QAFailedCount != 1 || got.SkippedCount != 1 || got.ProcessingFailedCount != 1 { + t.Fatalf("run counts = passed %d qa_failed %d skipped %d processing_failed %d, want 1/1/1/1", got.PassedCount, got.QAFailedCount, got.SkippedCount, got.ProcessingFailedCount) + } +} + +func TestInterruptActiveBulkQARunsMarksQueuedAndRunningRunsInterrupted(t *testing.T) { + db := setupDataOpsBulkPreviewTestDB(t) + h := &DataOpsHandler{db: db} + + insertDataOpsBulkRunForTest(t, db, "bulk_qa_queued", dataOpsBulkRunStatusQueued) + insertDataOpsBulkRunForTest(t, db, "bulk_qa_running", dataOpsBulkRunStatusRunning) + insertDataOpsBulkRunForTest(t, db, "bulk_qa_completed", dataOpsBulkRunStatusCompleted) + + if err := h.InterruptActiveBulkQARuns(context.Background()); err != nil { + t.Fatalf("InterruptActiveBulkQARuns returned error: %v", err) + } + + for _, runID := range []string{"bulk_qa_queued", "bulk_qa_running"} { + run, err := h.loadBulkRun(context.Background(), runID) + if err != nil { + t.Fatalf("load %s: %v", runID, err) + } + if run.Status != dataOpsBulkRunStatusInterrupted || run.FinishedAt == nil { + t.Fatalf("run %s = %+v, want interrupted with finished_at", runID, run) + } + } + + completed, err := h.loadBulkRun(context.Background(), "bulk_qa_completed") + if err != nil { + t.Fatalf("load completed run: %v", err) + } + if completed.Status != dataOpsBulkRunStatusCompleted { + t.Fatalf("completed status = %s, want completed", completed.Status) + } +} + +func TestStreamBulkRunSendsSnapshotAndTerminalEventForCompletedRun(t *testing.T) { + gin.SetMode(gin.TestMode) + + db := setupDataOpsBulkPreviewTestDB(t) + h := &DataOpsHandler{db: db} + router := gin.New() + h.RegisterRoutes(router.Group("/api/v1/data-ops")) + + insertDataOpsBulkRunForTest(t, db, "bulk_qa_completed", dataOpsBulkRunStatusCompleted) + + rec := httptest.NewRecorder() + req := httptest.NewRequest(http.MethodGet, "/api/v1/data-ops/bulk-runs/bulk_qa_completed/stream", nil) + router.ServeHTTP(rec, req) + + if rec.Code != http.StatusOK { + t.Fatalf("status = %d, body = %s", rec.Code, rec.Body.String()) + } + if got := rec.Header().Get("Content-Type"); !strings.Contains(got, "text/event-stream") { + t.Fatalf("Content-Type = %q, want text/event-stream", got) + } + body := rec.Body.String() + for _, want := range []string{ + "event: bulk_run_snapshot\n", + `"run_id":"bulk_qa_completed"`, + "event: bulk_run_completed\n", + } { + if !strings.Contains(body, want) { + t.Fatalf("SSE body should contain %q, got:\n%s", want, body) + } + } +} + +func TestStreamBulkRunClosesWhenRunningRunCompletes(t *testing.T) { + gin.SetMode(gin.TestMode) + + db := setupDataOpsBulkPreviewTestDB(t) + release := make(chan struct{}) + h := &DataOpsHandler{db: db, qaRunner: controlledDataOpsQARunner{release: release}} + router := gin.New() + h.RegisterRoutes(router.Group("/api/v1/data-ops")) + + insertDataOpsBulkTestEpisode(t, db, 1, "2026-06-01T00:00:00Z") + + postRec := httptest.NewRecorder() + postReq := httptest.NewRequest(http.MethodPost, "/api/v1/data-ops/episodes/bulk-qa", bytes.NewBufferString(`{"confirm":true,"filters":{}}`)) + postReq.Header.Set("Content-Type", "application/json") + router.ServeHTTP(postRec, postReq) + if postRec.Code != http.StatusAccepted { + t.Fatalf("post status = %d, body = %s", postRec.Code, postRec.Body.String()) + } + var accepted struct { + Run DataOpsBulkRunResponse `json:"run"` + } + if err := json.Unmarshal(postRec.Body.Bytes(), &accepted); err != nil { + t.Fatalf("decode post response: %v", err) + } + + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + streamRec := httptest.NewRecorder() + streamReq := httptest.NewRequest(http.MethodGet, "/api/v1/data-ops/bulk-runs/"+accepted.Run.RunID+"/stream", nil).WithContext(ctx) + done := make(chan struct{}) + go func() { + router.ServeHTTP(streamRec, streamReq) + close(done) + }() + + time.Sleep(20 * time.Millisecond) + close(release) + + select { + case <-done: + case <-time.After(2 * time.Second): + t.Fatal("stream did not close after bulk run completed") + } + + body := streamRec.Body.String() + for _, want := range []string{ + "event: bulk_run_snapshot\n", + "event: bulk_run_completed\n", + `"processed_count":1`, + } { + if !strings.Contains(body, want) { + t.Fatalf("SSE body should contain %q, got:\n%s", want, body) + } + } +} + func setupDataOpsBulkPreviewTestDB(t *testing.T) *sqlx.DB { t.Helper() @@ -274,6 +635,7 @@ func setupDataOpsBulkPreviewTestDB(t *testing.T) *sqlx.DB { if err != nil { t.Fatalf("open sqlite: %v", err) } + db.SetMaxOpenConns(1) t.Cleanup(func() { if err := db.Close(); err != nil { t.Fatalf("close sqlite: %v", err) @@ -319,6 +681,23 @@ func setupDataOpsBulkPreviewTestDB(t *testing.T) *sqlx.DB { episode_id INTEGER NOT NULL, status TEXT NOT NULL )`, + `CREATE TABLE bulk_runs ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + run_id TEXT NOT NULL UNIQUE, + action TEXT NOT NULL, + status TEXT NOT NULL, + total_count INTEGER NOT NULL DEFAULT 0, + processed_count INTEGER NOT NULL DEFAULT 0, + passed_count INTEGER NOT NULL DEFAULT 0, + qa_failed_count INTEGER NOT NULL DEFAULT 0, + processing_failed_count INTEGER NOT NULL DEFAULT 0, + skipped_count INTEGER NOT NULL DEFAULT 0, + error_message TEXT, + started_at TIMESTAMP NULL, + finished_at TIMESTAMP NULL, + created_at TIMESTAMP NOT NULL, + updated_at TIMESTAMP NOT NULL + )`, } for _, stmt := range schema { if _, err := db.Exec(stmt); err != nil { @@ -327,3 +706,82 @@ func setupDataOpsBulkPreviewTestDB(t *testing.T) *sqlx.DB { } return db } + +func insertDataOpsBulkTestEpisode(t *testing.T, db *sqlx.DB, id int64, createdAt string) { + t.Helper() + + if _, err := db.Exec(` + INSERT INTO episodes (id, episode_id, task_id, scene_id, qa_status, cloud_synced, deleted_at, created_at) + VALUES (?, ?, 0, 0, 'pending_qa', 0, NULL, ?) + `, id, "episode", createdAt); err != nil { + t.Fatalf("insert episode %d: %v", id, err) + } +} + +func insertDataOpsBulkRunForTest(t *testing.T, db *sqlx.DB, runID string, status string) { + t.Helper() + + now := time.Date(2026, 6, 11, 7, 30, 12, 0, time.UTC) + if _, err := db.Exec(` + INSERT INTO bulk_runs ( + run_id, action, status, total_count, processed_count, passed_count, + qa_failed_count, processing_failed_count, skipped_count, error_message, + started_at, finished_at, created_at, updated_at + ) + VALUES (?, 'bulk_qa', ?, 10, 0, 0, 0, 0, 0, '', NULL, NULL, ?, ?) + `, runID, status, now, now); err != nil { + t.Fatalf("insert bulk run %s: %v", runID, err) + } +} + +type controlledDataOpsQARunner struct { + release <-chan struct{} +} + +func (r controlledDataOpsQARunner) RunEpisodeQASuite(ctx context.Context, episodeID int64, mode QARunMode) (*EpisodeQASuiteResponse, error) { + select { + case <-r.release: + return &EpisodeQASuiteResponse{EpisodeID: episodeID, Passed: true, Mode: mode}, nil + case <-ctx.Done(): + return nil, ctx.Err() + } +} + +type scriptedDataOpsQARunner struct { + results map[int64]*EpisodeQASuiteResponse + errs map[int64]error +} + +func (r scriptedDataOpsQARunner) RunEpisodeQASuite(_ context.Context, episodeID int64, _ QARunMode) (*EpisodeQASuiteResponse, error) { + if err := r.errs[episodeID]; err != nil { + return nil, err + } + if result := r.results[episodeID]; result != nil { + return result, nil + } + return &EpisodeQASuiteResponse{EpisodeID: episodeID, Passed: true, Mode: qaRunModeManual}, nil +} + +func waitForBulkRunStatus(t *testing.T, router http.Handler, runID string, status string) DataOpsBulkRunResponse { + t.Helper() + + deadline := time.Now().Add(2 * time.Second) + var last DataOpsBulkRunResponse + for time.Now().Before(deadline) { + rec := httptest.NewRecorder() + req := httptest.NewRequest(http.MethodGet, "/api/v1/data-ops/bulk-runs/"+runID, nil) + router.ServeHTTP(rec, req) + if rec.Code != http.StatusOK { + t.Fatalf("get run status = %d, body = %s", rec.Code, rec.Body.String()) + } + if err := json.Unmarshal(rec.Body.Bytes(), &last); err != nil { + t.Fatalf("decode run: %v", err) + } + if last.Status == status { + return last + } + time.Sleep(10 * time.Millisecond) + } + t.Fatalf("bulk run %s did not reach status %s, last snapshot = %+v", runID, status, last) + return DataOpsBulkRunResponse{} +} diff --git a/internal/server/server.go b/internal/server/server.go index 3f27e69..b4e96ba 100644 --- a/internal/server/server.go +++ b/internal/server/server.go @@ -157,6 +157,9 @@ func New(cfg *config.Config, db *sqlx.DB, s3Client *s3.Client, syncWorker *servi orderHandler = handlers.NewOrderHandler(db, recorderHub, recorderRPCTimeout) dataOpsHandler = handlers.NewDataOpsHandler(db) dataOpsHandler.SetBulkActionDeps(qaHandler, syncWorker) + if err := dataOpsHandler.InterruptActiveBulkQARuns(context.Background()); err != nil { + logger.Printf("[DATA_OPS] Failed to interrupt stale bulk QA runs: %v", err) + } dataStatsHandler = handlers.NewDataProductionStatisticsHandler(db) productionDashboardHandler = handlers.NewProductionDashboardHandler(db, recorderHub, transferHub) } diff --git a/internal/storage/database/migrations/000006_bulk_runs.down.sql b/internal/storage/database/migrations/000006_bulk_runs.down.sql new file mode 100644 index 0000000..49fe208 --- /dev/null +++ b/internal/storage/database/migrations/000006_bulk_runs.down.sql @@ -0,0 +1,5 @@ +-- SPDX-FileCopyrightText: 2026 ArcheBase +-- +-- SPDX-License-Identifier: MulanPSL-2.0 + +DROP TABLE IF EXISTS bulk_runs; diff --git a/internal/storage/database/migrations/000006_bulk_runs.up.sql b/internal/storage/database/migrations/000006_bulk_runs.up.sql new file mode 100644 index 0000000..6727631 --- /dev/null +++ b/internal/storage/database/migrations/000006_bulk_runs.up.sql @@ -0,0 +1,23 @@ +-- SPDX-FileCopyrightText: 2026 ArcheBase +-- +-- SPDX-License-Identifier: MulanPSL-2.0 + +CREATE TABLE IF NOT EXISTS bulk_runs ( + id BIGINT AUTO_INCREMENT PRIMARY KEY, + run_id VARCHAR(64) NOT NULL UNIQUE, + action VARCHAR(32) NOT NULL, + status VARCHAR(32) NOT NULL, + total_count BIGINT NOT NULL DEFAULT 0, + processed_count BIGINT NOT NULL DEFAULT 0, + passed_count BIGINT NOT NULL DEFAULT 0, + qa_failed_count BIGINT NOT NULL DEFAULT 0, + processing_failed_count BIGINT NOT NULL DEFAULT 0, + skipped_count BIGINT NOT NULL DEFAULT 0, + error_message TEXT, + started_at TIMESTAMP NULL, + finished_at TIMESTAMP NULL, + created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, + updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, + INDEX idx_bulk_runs_action_status (action, status), + INDEX idx_bulk_runs_updated_at (updated_at) +); From 66e504b28b6058e77b9fe05dcd101779168ec463 Mon Sep 17 00:00:00 2001 From: chaoliu Date: Thu, 11 Jun 2026 17:09:19 +0800 Subject: [PATCH 3/3] Fix bulk QA lint findings --- internal/api/handlers/data_ops_bulk_run.go | 33 ++++++++++++++++++++-- 1 file changed, 31 insertions(+), 2 deletions(-) diff --git a/internal/api/handlers/data_ops_bulk_run.go b/internal/api/handlers/data_ops_bulk_run.go index 468c40c..b4fce72 100644 --- a/internal/api/handlers/data_ops_bulk_run.go +++ b/internal/api/handlers/data_ops_bulk_run.go @@ -229,6 +229,7 @@ func (h *DataOpsHandler) createBulkQARun(ctx context.Context, totalCount int64) finishedAt = now } + // #nosec G701 -- static SQL with placeholder-bound bulk run values. if _, err := h.db.ExecContext(ctx, ` INSERT INTO bulk_runs ( run_id, action, status, total_count, processed_count, passed_count, @@ -259,6 +260,7 @@ func (h *DataOpsHandler) loadBulkRun(ctx context.Context, runID string) (DataOps func (h *DataOpsHandler) markBulkRunRunning(ctx context.Context, runID string) (DataOpsBulkRunResponse, error) { now := h.dataOpsBulkRunNow() + // #nosec G701 -- static SQL with placeholder-bound bulk run values. if _, err := h.db.ExecContext(ctx, ` UPDATE bulk_runs SET status = ?, started_at = COALESCE(started_at, ?), updated_at = ? @@ -287,6 +289,7 @@ func (h *DataOpsHandler) incrementBulkQARunCounts(ctx context.Context, runID str return DataOpsBulkRunResponse{}, fmt.Errorf("unknown bulk qa outcome %q", outcome) } + // #nosec G701 -- static SQL with placeholder-bound bulk run counters. if _, err := h.db.ExecContext(ctx, ` UPDATE bulk_runs SET processed_count = processed_count + 1, @@ -304,6 +307,7 @@ func (h *DataOpsHandler) incrementBulkQARunCounts(ctx context.Context, runID str func (h *DataOpsHandler) markBulkRunTerminal(ctx context.Context, runID string, status string, errorMessage string) (DataOpsBulkRunResponse, error) { now := h.dataOpsBulkRunNow() + // #nosec G701 -- static SQL with placeholder-bound bulk run values. if _, err := h.db.ExecContext(ctx, ` UPDATE bulk_runs SET status = ?, error_message = ?, finished_at = COALESCE(finished_at, ?), updated_at = ? @@ -327,6 +331,7 @@ func (h *DataOpsHandler) InterruptActiveBulkQARuns(ctx context.Context) error { return nil } now := h.dataOpsBulkRunNow() + // #nosec G701 -- static SQL with placeholder-bound bulk run values. _, err := h.db.ExecContext(ctx, ` UPDATE bulk_runs SET status = ?, error_message = ?, finished_at = COALESCE(finished_at, ?), updated_at = ? @@ -500,15 +505,39 @@ func dataOpsBulkRunTerminalEventName(status string) (string, bool) { } } +func isAllowedDataOpsBulkRunSSEEventName(eventName string) bool { + switch eventName { + case "bulk_run_snapshot", "bulk_run_progress", "bulk_run_completed", "bulk_run_failed", "bulk_run_interrupted", "ping": + return true + default: + return false + } +} + func writeDataOpsBulkRunSSE(w gin.ResponseWriter, eventName string, payload interface{}) error { + if !isAllowedDataOpsBulkRunSSEEventName(eventName) { + return fmt.Errorf("unsupported bulk run sse event %q", eventName) + } encoded, err := json.Marshal(payload) if err != nil { return err } - if _, err := fmt.Fprintf(w, "event: %s\n", eventName); err != nil { + if _, err := w.Write([]byte("event: ")); err != nil { + return err + } + if _, err := w.Write([]byte(eventName)); err != nil { + return err + } + if _, err := w.Write([]byte("\n")); err != nil { + return err + } + if _, err := w.Write([]byte("data: ")); err != nil { + return err + } + if _, err := w.Write(encoded); err != nil { return err } - if _, err := fmt.Fprintf(w, "data: %s\n\n", encoded); err != nil { + if _, err := w.Write([]byte("\n\n")); err != nil { return err } w.Flush()
    接受批量请求[DATA_OPS] Bulk QA accepted: matched=123[DATA_OPS] Bulk QA accepted: run_id=bulk_qa_... total=123
    批量完成[DATA_OPS] Bulk QA completed: matched=123, attempted=120, skipped=2, failed=1[DATA_OPS] Bulk QA completed: run_id=bulk_qa_... total=123 processed=123 passed=90 qa_failed=25 processing_failed=2 skipped=6
    单条异常失败逐条打印 episode ID 和错误。逐条打印 run_id、episode ID 和错误。
    运行中断服务启动标记遗留 run 为 interrupted 时打印 run 数量和 run_id。
    正常跳过