Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
259 changes: 226 additions & 33 deletions docs/designs/data-ops-bulk-actions-api.zh.html

Large diffs are not rendered by default.

69 changes: 55 additions & 14 deletions docs/designs/episode-qa-checks-mcap-integrity.zh.html
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,7 @@
<div>
<p class="eyebrow">Keystone / Synapse Development Design</p>
<h1>数据运维质检中心与 Episode 自动质检</h1>
<p class="lead">在管理后台“数据运维”板块增加质检中心,作为运营处理异常 episode 的工作台。第一版提供轻量 MCAP 完整性检查、episode 创建后自动质检、手动重新质检和统一质检历史;后续再扩展 robot_type 维度的 Go/Python 脚本质检配置。</p>
<p class="lead">在管理后台“数据运维”板块增加质检中心,作为运营处理异常 episode 的工作台。第一版提供轻量 MCAP 完整性检查、sidecar 空录制检查、episode 创建后自动质检、手动重新质检和统一质检历史;后续再扩展 robot_type 维度的 Go/Python 脚本质检配置。</p>
</div>
<div class="header-meta">
<strong>设计状态</strong>
Expand All @@ -369,9 +369,10 @@ <h1>数据运维质检中心与 Episode 自动质检</h1>
<section id="background" class="panel">
<h2>背景</h2>
<p>部分异常会导致上传后的 MCAP 无法播放,例如播放器初始化时报错:<code>Expected MCAP magic '89 4d 43 41 50 30 0d 0a', found ...</code>。这类错误通常在预览加载早期暴露,不需要等待全量数据解析。</p>
<p>另一类异常是 MCAP 文件边界合法,但录制内容为空:文件只有 sidecar metadata,没有 schema、channel、message 或 chunk。此时头尾 magic 可以通过,但预览器没有任何图像帧可播放。第一版通过 sidecar JSON 的 <code>recording.message_count</code>、<code>recording.topics_recorded</code> 和 <code>topics_summary</code> 快速拦截这类空录制。</p>
<p>上一版已经验证了轻量 MCAP 头尾 magic 检查的价值。下一版需要把单个详情页按钮升级为后台“质检中心”:运营人员能集中看到待处理 episode、重新触发完整质检 suite、查看最近一次检查结果和历史证据;episode 创建后也应自动进入质检流程。</p>
<div class="callout warn">
<strong>边界说明:</strong>头尾 magic 校验只能证明 MCAP 文件边界基本正确,不能证明文件一定可播放。内部 chunk、schema、压缩数据、CRC 或索引仍可能损坏。该检查用于快速拦截明显坏包,后续可以通过更多检查项补足
<strong>边界说明:</strong>头尾 magic 校验只能证明 MCAP 文件边界基本正确,不能证明文件一定可播放。sidecar 空录制检查依赖 recorder 写出的 JSON 摘要,适合快速拦截明显空包,但不能证明 MCAP 内部一定与 sidecar 一致。内部 chunk、schema、压缩数据、CRC、索引或 sidecar/MCAP 不一致仍需要后续通过 MCAP parser 交叉校验
</div>
</section>

Expand All @@ -385,7 +386,7 @@ <h3>目标</h3>
<li>质检中心第一版是运营工作台,默认展示 <code>pending_qa</code>、<code>failed</code>、<code>needs_inspection</code> 等可处理 episode。</li>
<li>episode 创建后由 Keystone 异步触发自动质检。</li>
<li>前端手动入口统一触发完整质检 suite,而不是触发单个检查项。</li>
<li>第一版默认质检 suite 固定为 <code>['mcap_magic']</code>。</li>
<li>第一版默认质检 suite 固定为 <code>['mcap_magic', 'recording_not_empty']</code>。</li>
<li>所有检查项都写入 <code>qa_checks</code>,失败时写入 <code>quality_flag</code> 并将 <code>qa_status</code> 置为 <code>failed</code>。</li>
<li>完整 suite 全部通过后,允许自动将可自动流转的 episode 置为 <code>approved</code>。</li>
<li><code>qa_status=failed</code> 继续阻止 MCAP 预览、MCAP 下载和云同步。</li>
Expand Down Expand Up @@ -425,7 +426,7 @@ <h2>数据模型复用</h2>
<tr>
<td><code>episodes.quality_flag</code></td>
<td>面向研究员和运营人员的质量说明</td>
<td>失败时写入失败摘要,例如头尾 magic 不匹配;通过时可清空由自动质检写入的失败摘要</td>
<td>失败时写入失败摘要,例如头尾 magic 不匹配或 sidecar 显示空录制;通过时可清空由自动质检写入的失败摘要</td>
</tr>
<tr>
<td><code>qa_checks</code></td>
Expand All @@ -435,12 +436,12 @@ <h2>数据模型复用</h2>
<tr>
<td><code>qa_checks.check_name</code></td>
<td>检查项标识</td>
<td>第一版固定为 <code>mcap_magic</code>,未来可出现 <code>topic_required</code>、<code>duration_range</code>、<code>python:&lt;script_name&gt;</code> 等</td>
<td>第一版固定为 <code>mcap_magic</code> 和 <code>recording_not_empty</code>,未来可出现 <code>topic_required</code>、<code>duration_range</code>、<code>python:&lt;script_name&gt;</code> 等</td>
</tr>
<tr>
<td><code>qa_checks.check_metadata</code></td>
<td>结构化检查详情</td>
<td>记录 expected/head/tail/file_size 等数据,便于质检中心抽屉展示和问题排查</td>
<td>记录 expected/head/tail/file_size、message_count、topics_recorded_count、topics_summary_count 等数据,便于质检中心抽屉展示和问题排查</td>
</tr>
</tbody>
</table>
Expand All @@ -449,7 +450,7 @@ <h3><code>qa_checks.passed</code> 与 <code>qa_status</code> 的区别</h3>
<div class="three-grid">
<div class="mini">
<strong><code>qa_checks.passed</code></strong>
<p>单次、单项检查的事实记录。它回答“这一次 mcap_magic 是否通过”。</p>
<p>单次、单项检查的事实记录。它回答“这一次 mcap_magic 或 recording_not_empty 是否通过”。</p>
</div>
<div class="mini">
<strong><code>qa_status</code></strong>
Expand Down Expand Up @@ -537,12 +538,12 @@ <h3>查询质检中心 episode 列表</h3>
"task_id": 88,
"robot_type": "arm_bot",
"qa_status": "failed",
"quality_flag": "MCAP integrity check failed: tail magic mismatch",
"quality_flag": "Recording sidecar check failed: message_count is zero and no recorded topics",
"created_at": "2026-06-05T10:20:00Z",
"latest_qa_check": {
"check_name": "mcap_magic",
"check_name": "recording_not_empty",
"passed": false,
"details": "MCAP integrity check failed: tail magic mismatch",
"details": "Recording sidecar check failed: message_count is zero and no recorded topics",
"checked_at": "2026-06-05T10:30:00Z"
}
}
Expand Down Expand Up @@ -572,6 +573,23 @@ <h3>查询 episode 质检历史</h3>
"file_size_bytes": 123456789
},
"checked_at": "2026-06-05T10:30:00Z"
},
{
"id": 457,
"episode_id": 123,
"check_name": "recording_not_empty",
"passed": false,
"score": 0,
"details": "Recording sidecar check failed: message_count is zero and no recorded topics",
"check_metadata": {
"message_count": 0,
"topics_recorded_count": 0,
"topics_summary_count": 0,
"duration_sec": 6.461,
"file_size_bytes": 1129,
"sidecar_size_bytes": 752
},
"checked_at": "2026-06-05T10:30:00Z"
}
]
}</code></pre>
Expand All @@ -594,6 +612,13 @@ <h3>运行 episode 完整质检 suite</h3>
"score": 1,
"details": "MCAP head and tail magic matched",
"checked_at": "2026-06-05T10:35:00Z"
},
{
"check_name": "recording_not_empty",
"passed": true,
"score": 1,
"details": "Recording sidecar reports messages and topics",
"checked_at": "2026-06-05T10:35:00Z"
}
]
}</code></pre>
Expand Down Expand Up @@ -650,7 +675,7 @@ <h3>执行流程</h3>
<div class="step">
<span>3</span>
<strong>加载 suite</strong>
<p>第一版硬编码 <code>['mcap_magic']</code>,未来按 <code>robot_type</code> 加载配置。</p>
<p>第一版硬编码 <code>['mcap_magic', 'recording_not_empty']</code>,未来按 <code>robot_type</code> 加载配置。</p>
</div>
<div class="step">
<span>4</span>
Expand Down Expand Up @@ -716,6 +741,19 @@ <h3><code>mcap_magic</code> 检查细节</h3>
</ul>
</div>

<div class="panel">
<h3><code>recording_not_empty</code> 检查细节</h3>
<ul>
<li>读取 <code>episodes.sidecar_path</code> 指向的 JSON sidecar,不直接解析 MCAP。</li>
<li>先通过 S3 stat 获取 sidecar 对象大小;空对象直接失败。</li>
<li>sidecar 大小超过 4 MiB 时直接失败,避免误读异常大对象。</li>
<li>解析 <code>recording.message_count</code>、<code>recording.topics_recorded</code> 和 <code>topics_summary</code>。</li>
<li><code>message_count &lt;= 0</code> 且没有任何 recorded topic 时失败,错误摘要为 <code>Recording sidecar check failed: message_count is zero and no recorded topics</code>。</li>
<li><code>message_count &lt;= 0</code> 或没有 recorded topic 任一单独成立时也失败,分别提示 message_count 为零或无 recorded topics。</li>
<li>该检查用于快速拦截“合法 MCAP 空录制”;后续可增加 MCAP footer/summary 读取,校验 sidecar 与 MCAP 的 message/channel 统计是否一致。</li>
</ul>
</div>

<div class="panel">
<h3>门禁改造</h3>
<table>
Expand All @@ -739,7 +777,7 @@ <h3>门禁改造</h3>
</tr>
<tr>
<td>JSON sidecar 下载</td>
<td>不受 MCAP 完整性失败影响</td>
<td>不受 MCAP/录制内容质检失败影响</td>
<td><code>kind=sidecar</code> 保持现有行为</td>
</tr>
<tr>
Expand Down Expand Up @@ -810,7 +848,8 @@ <h2>测试计划</h2>
<div class="panel">
<h3>Keystone</h3>
<ul>
<li>episode 创建后自动入队并执行 <code>mcap_magic</code>。</li>
<li>episode 创建后自动入队并执行 <code>mcap_magic</code> 和 <code>recording_not_empty</code>。</li>
<li>sidecar 中 <code>message_count=0</code> 且 <code>topics_recorded=[]</code>、<code>topics_summary=[]</code> 时,<code>recording_not_empty</code> 失败并写入结构化证据。</li>
<li>自动质检全部通过时,<code>pending_qa</code> 或 <code>qa_running</code> 更新为 <code>approved</code>。</li>
<li>自动质检失败时写入 <code>qa_checks.passed=false</code>、<code>quality_flag</code>,并设置 <code>qa_status=failed</code>。</li>
<li>手动重新质检允许 <code>failed</code> 在全部通过后恢复为 <code>approved</code>。</li>
Expand Down Expand Up @@ -843,6 +882,7 @@ <h2>后续 robot_type 与 Python 脚本扩展</h2>
"robot_type": "arm_bot",
"checks": [
{ "name": "mcap_magic", "runtime": "go" },
{ "name": "recording_not_empty", "runtime": "go" },
{ "name": "topic_required", "runtime": "python", "script": "topic_required.py" },
{ "name": "duration_range", "runtime": "python", "script": "duration_range.py" }
]
Expand All @@ -851,6 +891,7 @@ <h2>后续 robot_type 与 Python 脚本扩展</h2>
<strong>未来自动批准规则:</strong>如果某个机器人类型配置了多个检查脚本,该机器人生产的 episode 必须通过全部已配置检查,才自动把 <code>qa_status</code> 改成 <code>approved</code>;任一失败都改成 <code>failed</code>。
</div>
<div class="tag-row" aria-label="后续检查项示例">
<span class="tag">recording_not_empty</span>
<span class="tag">sidecar_schema</span>
<span class="tag">duration_range</span>
<span class="tag">topic_required</span>
Expand All @@ -864,7 +905,7 @@ <h2>已确认决策</h2>
<li>质检中心放在管理后台“数据运维”板块。</li>
<li>第一版不支持批量质检、批量同步、持久化 job 队列、robot_type 配置 UI 或 Python 脚本执行。</li>
<li>episode 创建后触发自动质检;第一版使用进程内轻量队列,接受服务重启丢失未执行任务。</li>
<li>第一版默认 suite 固定为 <code>['mcap_magic']</code>。</li>
<li>第一版默认 suite 固定为 <code>['mcap_magic', 'recording_not_empty']</code>。</li>
<li>质检中心和 episode 详情页都使用 <code>POST /api/v1/qa/episodes/:id/run</code>。</li>
<li>按钮文案使用 <code>重新质检</code>,不再使用单项检查文案。</li>
<li>所有检查项失败都把 <code>qa_status</code> 置为 <code>failed</code>。</li>
Expand Down
19 changes: 15 additions & 4 deletions internal/api/handlers/data_ops.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"fmt"
"net/http"
"strings"
"sync"
"time"

"github.com/gin-gonic/gin"
Expand All @@ -31,14 +32,20 @@ var validDataOpsSyncStatuses = map[string]struct{}{

// DataOpsHandler handles data operations APIs for the admin workbench.
type DataOpsHandler struct {
db *sqlx.DB
qa *EpisodeQAHandler
syncWorker *services.SyncWorker
db *sqlx.DB
qa *EpisodeQAHandler
qaRunner dataOpsEpisodeQARunner
syncWorker *services.SyncWorker
bulkRunMu sync.Mutex
bulkRunBroker *dataOpsBulkRunBroker
}

// NewDataOpsHandler creates a data operations handler.
func NewDataOpsHandler(db *sqlx.DB) *DataOpsHandler {
return &DataOpsHandler{db: db}
return &DataOpsHandler{
db: db,
bulkRunBroker: newDataOpsBulkRunBroker(),
}
}

// SetBulkActionDeps wires optional services used by data-ops bulk actions.
Expand All @@ -47,6 +54,7 @@ func (h *DataOpsHandler) SetBulkActionDeps(qa *EpisodeQAHandler, syncWorker *ser
return
}
h.qa = qa
h.qaRunner = qa
h.syncWorker = syncWorker
}

Expand All @@ -57,6 +65,9 @@ func (h *DataOpsHandler) RegisterRoutes(apiV1 *gin.RouterGroup) {
apiV1.POST("/episodes/bulk-sync/preview", h.PreviewBulkEpisodeSync)
apiV1.POST("/episodes/bulk-qa", h.BulkRunEpisodeQA)
apiV1.POST("/episodes/bulk-sync", h.BulkSyncEpisodes)
apiV1.GET("/bulk-runs/current", h.GetCurrentBulkRun)
apiV1.GET("/bulk-runs/:run_id", h.GetBulkRun)
apiV1.GET("/bulk-runs/:run_id/stream", h.StreamBulkRun)
}

type dataOpsEpisodeQuery struct {
Expand Down
Loading
Loading