Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
973 changes: 973 additions & 0 deletions docs/designs/data-details-center.zh.html

Large diffs are not rendered by default.

846 changes: 846 additions & 0 deletions docs/designs/data-ops-bulk-actions-api.zh.html

Large diffs are not rendered by default.

880 changes: 880 additions & 0 deletions docs/designs/episode-qa-checks-mcap-integrity.zh.html

Large diffs are not rendered by default.

569 changes: 569 additions & 0 deletions internal/api/handlers/data_ops.go

Large diffs are not rendered by default.

682 changes: 682 additions & 0 deletions internal/api/handlers/data_ops_bulk.go

Large diffs are not rendered by default.

329 changes: 329 additions & 0 deletions internal/api/handlers/data_ops_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,329 @@
// SPDX-FileCopyrightText: 2026 ArcheBase
//
// SPDX-License-Identifier: MulanPSL-2.0

package handlers

import (
"bytes"
"context"
"net/http"
"net/http/httptest"
"strings"
"testing"

"github.com/gin-gonic/gin"
"github.com/jmoiron/sqlx"
_ "modernc.org/sqlite"
)

func TestParseDataOpsEpisodeQuery(t *testing.T) {
gin.SetMode(gin.TestMode)

c, _ := gin.CreateTestContext(httptest.NewRecorder())
c.Request = httptest.NewRequest(http.MethodGet, "/data-ops/episodes?limit=20&offset=40&created_at_from=2026-06-01T00:00:00Z&created_at_to=2026-06-06T00:00:00Z&q=ep&qa_status=failed,pending_qa&sync_status=not_started,failed&scene_id=1,2&sop_id=9,10&robot_type_id=3&robot_device_id=robot-001,robot-002&collector_operator_id=op001&label=recalled_batch", nil)

got, err := parseDataOpsEpisodeQuery(c)
if err != nil {
t.Fatalf("parseDataOpsEpisodeQuery returned error: %v", err)
}
if got.Pagination.Limit != 20 || got.Pagination.Offset != 40 {
t.Fatalf("unexpected pagination: %+v", got.Pagination)
}
if !got.HasCreatedAtFrom || !got.HasCreatedAtTo || got.Keyword != "ep" || got.Label != "recalled_batch" {
t.Fatalf("unexpected scalar filters: %+v", got)
}
if strings.Join(got.QAStatuses, ",") != "failed,pending_qa" {
t.Fatalf("unexpected qa statuses: %#v", got.QAStatuses)
}
if strings.Join(got.SyncStatuses, ",") != "not_started,failed" {
t.Fatalf("unexpected sync statuses: %#v", got.SyncStatuses)
}
if len(got.SceneIDs) != 2 || got.SceneIDs[0] != 1 || got.SceneIDs[1] != 2 {
t.Fatalf("unexpected scene ids: %#v", got.SceneIDs)
}
if len(got.SOPIDs) != 2 || got.SOPIDs[0] != 9 || got.SOPIDs[1] != 10 {
t.Fatalf("unexpected sop ids: %#v", got.SOPIDs)
}
if len(got.RobotTypeIDs) != 1 || got.RobotTypeIDs[0] != 3 {
t.Fatalf("unexpected robot type ids: %#v", got.RobotTypeIDs)
}
if strings.Join(got.RobotDeviceIDs, ",") != "robot-001,robot-002" || strings.Join(got.CollectorOperatorIDs, ",") != "op001" {
t.Fatalf("unexpected string filters: %+v", got)
}
}

func TestDataOpsEpisodeWhereIncludesSOPFilter(t *testing.T) {
sql, args := buildDataOpsEpisodeWhere(dataOpsEpisodeQuery{SOPIDs: []int64{9, 10}})
if !strings.Contains(sql, "COALESCE(e.sop_id, t.sop_id) IN (?,?)") {
t.Fatalf("SOP filter SQL should use episode/task SOP fallback: %s", sql)
}
if len(args) != 2 || args[0] != int64(9) || args[1] != int64(10) {
t.Fatalf("unexpected args: %#v", args)
}
}

func TestDataOpsEpisodeListSQLIncludesSOPColumns(t *testing.T) {
sql := dataOpsEpisodeListSQL(dataOpsEpisodeBaseFromSQL(), " WHERE e.deleted_at IS NULL")
for _, want := range []string{
"COALESCE(e.sop_id, t.sop_id) AS sop_id",
"LEFT JOIN sops s ON s.id = COALESCE(e.sop_id, t.sop_id)",
"CONCAT('SOP #', CAST(COALESCE(e.sop_id, t.sop_id) AS CHAR))",
"ELSE CONCAT(s.slug, ' @ ', s.version)",
"COALESCE(NULLIF(dc.name, ''), NULLIF(ws.collector_name, '')) AS collector_name",
} {
if !strings.Contains(sql, want) {
t.Fatalf("data ops SQL should include %q: %s", want, sql)
}
}
}

func TestDataOpsSyncStatusWhereSupportsNotStartedAndLatestStatus(t *testing.T) {
sql, args := dataOpsSyncStatusWhere([]string{"not_started", "failed"})
if !strings.Contains(sql, "NOT EXISTS") {
t.Fatalf("sync status SQL should include not_started branch: %s", sql)
}
if !strings.Contains(sql, "MAX(sl2.id)") || !strings.Contains(sql, "sl_latest.status IN (?)") {
t.Fatalf("sync status SQL should filter latest sync log status: %s", sql)
}
if len(args) != 1 || args[0] != "failed" {
t.Fatalf("unexpected args: %#v", args)
}
}

func TestDataOpsLatestQueriesOnlyUsePageEpisodeIDs(t *testing.T) {
qaSQL, qaArgs := dataOpsLatestQAChecksSQL([]int64{10, 20})
if !strings.Contains(qaSQL, "WHERE episode_id IN (?,?)") {
t.Fatalf("latest QA SQL should constrain page episode IDs: %s", qaSQL)
}
if len(qaArgs) != 2 {
t.Fatalf("latest QA args = %#v", qaArgs)
}

syncSQL, syncArgs := dataOpsLatestSyncLogsSQL([]int64{10, 20})
if !strings.Contains(syncSQL, "WHERE episode_id IN (?,?)") {
t.Fatalf("latest sync SQL should constrain page episode IDs: %s", syncSQL)
}
if len(syncArgs) != 2 {
t.Fatalf("latest sync args = %#v", syncArgs)
}
}

func TestParseDataOpsBulkEpisodeFilters(t *testing.T) {
got, err := parseDataOpsBulkEpisodeFilters(DataOpsBulkEpisodeFilters{
CreatedAtFrom: "2026-06-01T00:00:00Z",
CreatedAtTo: "2026-06-06T00:00:00Z",
Keyword: "ep",
QAStatus: "failed,pending_qa",
SyncStatus: "not_started,failed",
SceneID: "1,2",
SOPID: "9,10",
RobotTypeID: "3",
RobotDeviceID: "robot-001,robot-002",
CollectorOperatorID: "op001",
Label: "recalled_batch",
Limit: "20",
Offset: "40",
})
if err != nil {
t.Fatalf("parseDataOpsBulkEpisodeFilters returned error: %v", err)
}
if got.Pagination.Limit != 0 || got.Pagination.Offset != 0 {
t.Fatalf("bulk filters should ignore pagination: %+v", got.Pagination)
}
if !got.HasCreatedAtFrom || !got.HasCreatedAtTo || got.Keyword != "ep" || got.Label != "recalled_batch" {
t.Fatalf("unexpected scalar filters: %+v", got)
}
if strings.Join(got.QAStatuses, ",") != "failed,pending_qa" {
t.Fatalf("unexpected qa statuses: %#v", got.QAStatuses)
}
if strings.Join(got.SyncStatuses, ",") != "not_started,failed" {
t.Fatalf("unexpected sync statuses: %#v", got.SyncStatuses)
}
if len(got.SceneIDs) != 2 || got.SceneIDs[0] != 1 || got.SceneIDs[1] != 2 {
t.Fatalf("unexpected scene ids: %#v", got.SceneIDs)
}
if len(got.SOPIDs) != 2 || got.SOPIDs[0] != 9 || got.SOPIDs[1] != 10 {
t.Fatalf("unexpected sop ids: %#v", got.SOPIDs)
}
if len(got.RobotTypeIDs) != 1 || got.RobotTypeIDs[0] != 3 {
t.Fatalf("unexpected robot type ids: %#v", got.RobotTypeIDs)
}
if strings.Join(got.RobotDeviceIDs, ",") != "robot-001,robot-002" || strings.Join(got.CollectorOperatorIDs, ",") != "op001" {
t.Fatalf("unexpected string filters: %+v", got)
}
}

func TestParseDataOpsBulkEpisodeFiltersDoesNotCapMultiValueCount(t *testing.T) {
got, err := parseDataOpsBulkEpisodeFilters(DataOpsBulkEpisodeFilters{
SceneID: joinedNumberList(maxMultiValueFilterItems + 1),
RobotDeviceID: joinedStringList("robot-", maxMultiValueFilterItems+1),
})
if err != nil {
t.Fatalf("parseDataOpsBulkEpisodeFilters returned error: %v", err)
}
if len(got.SceneIDs) != maxMultiValueFilterItems+1 {
t.Fatalf("scene id count = %d, want %d", len(got.SceneIDs), maxMultiValueFilterItems+1)
}
if len(got.RobotDeviceIDs) != maxMultiValueFilterItems+1 {
t.Fatalf("robot device id count = %d, want %d", len(got.RobotDeviceIDs), maxMultiValueFilterItems+1)
}
}

func TestParseDataOpsBulkEpisodeRequestConfirmGuard(t *testing.T) {
gin.SetMode(gin.TestMode)

recorder := httptest.NewRecorder()
c, _ := gin.CreateTestContext(recorder)
c.Request = httptest.NewRequest(http.MethodPost, "/data-ops/episodes/bulk-qa", bytes.NewBufferString(`{"filters":{}}`))
c.Request.Header.Set("Content-Type", "application/json")

h := &DataOpsHandler{}
if _, _, ok := h.parseBulkEpisodeActionRequest(c, true); ok {
t.Fatal("bulk execute request without confirm should fail")
}
if recorder.Code != http.StatusBadRequest {
t.Fatalf("status = %d, want 400", recorder.Code)
}
}

func TestParseDataOpsBulkEpisodeRequestPreviewDoesNotRequireConfirm(t *testing.T) {
gin.SetMode(gin.TestMode)

c, _ := gin.CreateTestContext(httptest.NewRecorder())
c.Request = httptest.NewRequest(http.MethodPost, "/data-ops/episodes/bulk-qa/preview", bytes.NewBufferString(`{"filters":{"qa_status":"failed"}}`))
c.Request.Header.Set("Content-Type", "application/json")

h := &DataOpsHandler{}
_, q, ok := h.parseBulkEpisodeActionRequest(c, false)
if !ok {
t.Fatal("bulk preview request should not require confirm")
}
if strings.Join(q.QAStatuses, ",") != "failed" {
t.Fatalf("unexpected qa statuses: %#v", q.QAStatuses)
}
}

func TestDataOpsEpisodeIDSnapshotSQLUsesDataOpsOrdering(t *testing.T) {
sql := dataOpsEpisodeIDSnapshotSQL(dataOpsEpisodeBaseFromSQL(), " WHERE e.deleted_at IS NULL")
for _, want := range []string{
"SELECT e.id",
"FROM episodes e",
"ORDER BY e.created_at DESC, e.id DESC",
} {
if !strings.Contains(sql, want) {
t.Fatalf("ID snapshot SQL should include %q: %s", want, sql)
}
}
}

func TestDataOpsBulkPreviewSQLs(t *testing.T) {
qaSQL := dataOpsBulkQAPreviewSQL(dataOpsEpisodeBaseFromSQL(), " WHERE e.deleted_at IS NULL")
for _, want := range []string{"matched_count", "qa_running_count", "protected_status_count"} {
if !strings.Contains(qaSQL, want) {
t.Fatalf("QA preview SQL should include %q: %s", want, qaSQL)
}
}

syncSQL := dataOpsBulkSyncPreviewSQL(dataOpsEpisodeBaseFromSQL()+dataOpsLatestSyncPreviewJoinSQL(), " WHERE e.deleted_at IS NULL")
for _, want := range []string{"latest_sync", "eligible_count", "qa_not_approved_count", "already_synced_count", "sync_active_count"} {
if !strings.Contains(syncSQL, want) {
t.Fatalf("sync preview SQL should include %q: %s", want, syncSQL)
}
}
}

func TestPreviewBulkEpisodeSyncTreatsMissingSyncLogAsEligible(t *testing.T) {
db := setupDataOpsBulkPreviewTestDB(t)
h := &DataOpsHandler{db: db}

for id := int64(1); id <= 11; id++ {
if _, err := db.Exec(`
INSERT INTO episodes (id, episode_id, task_id, scene_id, qa_status, cloud_synced, deleted_at, created_at)
VALUES (?, ?, 0, 0, 'approved', 0, NULL, '2026-06-01T00:00:00Z')
`, id, "episode"); err != nil {
t.Fatalf("insert episode %d: %v", id, err)
}
}
if _, err := db.Exec(`
INSERT INTO sync_logs (id, episode_id, status)
VALUES (1, 1, 'failed')
`); err != nil {
t.Fatalf("insert failed sync log: %v", err)
}

preview, err := h.previewBulkEpisodeSync(context.Background(), dataOpsEpisodeQuery{
QAStatuses: []string{"approved"},
})
if err != nil {
t.Fatalf("previewBulkEpisodeSync returned error: %v", err)
}

if preview.MatchedCount != 11 || preview.EligibleCount != 11 || preview.SkippedCount != 0 {
t.Fatalf("preview counts = matched %d eligible %d skipped %d, want 11/11/0", preview.MatchedCount, preview.EligibleCount, preview.SkippedCount)
}
if len(preview.SkippedBreakdown) != 0 {
t.Fatalf("unexpected skipped breakdown: %#v", preview.SkippedBreakdown)
}
}

func setupDataOpsBulkPreviewTestDB(t *testing.T) *sqlx.DB {
t.Helper()

db, err := sqlx.Open("sqlite", ":memory:")
if err != nil {
t.Fatalf("open sqlite: %v", err)
}
t.Cleanup(func() {
if err := db.Close(); err != nil {
t.Fatalf("close sqlite: %v", err)
}
})

schema := []string{
`CREATE TABLE episodes (
id INTEGER PRIMARY KEY,
episode_id TEXT NOT NULL,
task_id INTEGER NOT NULL,
scene_id INTEGER NOT NULL,
workstation_id INTEGER,
sop_id INTEGER,
qa_status TEXT,
cloud_synced BOOLEAN NOT NULL DEFAULT 0,
deleted_at TEXT,
created_at TEXT NOT NULL
)`,
`CREATE TABLE tasks (
id INTEGER PRIMARY KEY,
sop_id INTEGER,
workstation_id INTEGER,
deleted_at TEXT
)`,
`CREATE TABLE scenes (id INTEGER PRIMARY KEY, deleted_at TEXT)`,
`CREATE TABLE workstations (
id INTEGER PRIMARY KEY,
robot_id INTEGER,
data_collector_id INTEGER,
deleted_at TEXT
)`,
`CREATE TABLE robots (
id INTEGER PRIMARY KEY,
robot_type_id INTEGER,
deleted_at TEXT
)`,
`CREATE TABLE robot_types (id INTEGER PRIMARY KEY, deleted_at TEXT)`,
`CREATE TABLE data_collectors (id INTEGER PRIMARY KEY, deleted_at TEXT)`,
`CREATE TABLE sops (id INTEGER PRIMARY KEY, deleted_at TEXT)`,
`CREATE TABLE sync_logs (
id INTEGER PRIMARY KEY,
episode_id INTEGER NOT NULL,
status TEXT NOT NULL
)`,
}
for _, stmt := range schema {
if _, err := db.Exec(stmt); err != nil {
t.Fatalf("create schema: %v", err)
}
}
return db
}
7 changes: 5 additions & 2 deletions internal/api/handlers/data_production_statistics.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,7 @@ type breakdownStatsRow struct {
SuccessCount sql.NullInt64 `db:"success_count"`
FailedCount sql.NullInt64 `db:"failed_count"`
ProcessingCount sql.NullInt64 `db:"processing_count"`
TotalDurationMs sql.NullFloat64 `db:"total_duration_ms"`
AvgDurationMs sql.NullFloat64 `db:"avg_duration_ms"`
MaxDurationMs sql.NullFloat64 `db:"max_duration_ms"`
TotalBytes sql.NullInt64 `db:"total_bytes"`
Expand Down Expand Up @@ -510,6 +511,7 @@ func dataProductionBreakdownSQL(idExpr string, nameExpr string, baseSQL string)
COALESCE(SUM(CASE WHEN status = 'success' THEN count_value ELSE 0 END), 0) AS success_count,
COALESCE(SUM(CASE WHEN status IN ('failed', 'cancelled') THEN count_value ELSE 0 END), 0) AS failed_count,
COALESCE(SUM(CASE WHEN status = 'processing' THEN count_value ELSE 0 END), 0) AS processing_count,
SUM(duration_ms) AS total_duration_ms,
AVG(duration_ms) AS avg_duration_ms,
MAX(duration_ms) AS max_duration_ms,
COALESCE(SUM(COALESCE(size_bytes, 0)), 0) AS total_bytes,
Expand Down Expand Up @@ -971,8 +973,9 @@ func breakdownRowToItem(row breakdownStatsRow) dataProductionBreakdownItem {
SuccessRate: rate(success, total),
},
Duration: statsDurationMetrics{
AvgMs: roundNullFloat(row.AvgDurationMs),
MaxMs: roundNullFloat(row.MaxDurationMs),
TotalMs: roundNullFloat(row.TotalDurationMs),
AvgMs: roundNullFloat(row.AvgDurationMs),
MaxMs: roundNullFloat(row.MaxDurationMs),
},
Size: statsSizeMetrics{
TotalBytes: nullInt64(row.TotalBytes),
Expand Down
18 changes: 18 additions & 0 deletions internal/api/handlers/data_production_statistics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,9 @@ func TestDataProductionBreakdownSQLGroupsByDimensionExpression(t *testing.T) {
}

querySQL := dataProductionBreakdownSQL("robot_device_id", "robot_device_id", "SELECT 1")
if !strings.Contains(querySQL, "SUM(duration_ms) AS total_duration_ms") {
t.Fatalf("breakdown SQL should select total duration: %s", querySQL)
}
if !strings.Contains(querySQL, "GROUP BY robot_device_id") {
t.Fatalf("breakdown SQL should group by the dimension expression: %s", querySQL)
}
Expand All @@ -139,6 +142,21 @@ func TestDataProductionBreakdownSQLGroupsByDimensionExpression(t *testing.T) {
}
}

func TestBreakdownRowToItemIncludesTotalDuration(t *testing.T) {
item := breakdownRowToItem(breakdownStatsRow{
TotalDurationMs: sql.NullFloat64{Float64: 1234.4, Valid: true},
AvgDurationMs: sql.NullFloat64{Float64: 617.2, Valid: true},
MaxDurationMs: sql.NullFloat64{Float64: 900.6, Valid: true},
})

if item.Duration.TotalMs != 1234 {
t.Fatalf("total duration = %d, want 1234", item.Duration.TotalMs)
}
if item.Duration.AvgMs != 617 || item.Duration.MaxMs != 901 {
t.Fatalf("unexpected duration metrics: %+v", item.Duration)
}
}

func TestStatsBreakdownExpressionsSupportsEpisodeDimensions(t *testing.T) {
tests := []struct {
dimension string
Expand Down
Loading
Loading