From 4f7461f17b3f40c731a68d82f6397ebebb2b3efe Mon Sep 17 00:00:00 2001 From: chaoliu Date: Wed, 17 Jun 2026 13:45:06 +0800 Subject: [PATCH 1/3] simplify sync progress log --- internal/cloud/uploader.go | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/internal/cloud/uploader.go b/internal/cloud/uploader.go index 0a518f7..3ecd014 100644 --- a/internal/cloud/uploader.go +++ b/internal/cloud/uploader.go @@ -102,6 +102,8 @@ const ( resumeOSSAlreadyComplete // OSS object already present and verified; skip upload, go straight to CompleteUpload ) +const uploadProgressLogPartInterval = 10 + // gatewayClient is the subset of GatewayClient methods used by Uploader. // It is defined as an interface to allow test injection of fake implementations. type gatewayClient interface { @@ -711,8 +713,10 @@ func (u *Uploader) streamMultipartParts(ctx context.Context, episodeID string, s progress(offset, fileSize) } - logger.Printf("[CLOUD-UPLOAD] Progress: episode=%s parts=%d offset=%d/%d", - episodeID, len(parts), offset, fileSize) + if len(parts)%uploadProgressLogPartInterval == 0 || offset == fileSize { + logger.Printf("[CLOUD-UPLOAD] Progress: episode=%s parts=%d offset=%d/%d", + episodeID, len(parts), offset, fileSize) + } } return session, parts, partMD5s, nil From f8a2970fc711ef4ab9ed2cd067e33dc1bb8faec5 Mon Sep 17 00:00:00 2001 From: chaoliu Date: Wed, 17 Jun 2026 14:11:00 +0800 Subject: [PATCH 2/3] feat: delete tag without explicit meaning --- internal/services/dp_raw_tags.go | 23 +++-------------------- internal/services/dp_raw_tags_test.go | 20 +++++--------------- internal/services/sidecar_tags.go | 6 +++--- internal/services/sidecar_tags_test.go | 22 ++++++++++++++++------ internal/services/sync_worker.go | 26 ++++++++------------------ 5 files changed, 35 insertions(+), 62 deletions(-) diff --git a/internal/services/dp_raw_tags.go b/internal/services/dp_raw_tags.go index 925116a..e36694d 100644 --- a/internal/services/dp_raw_tags.go +++ b/internal/services/dp_raw_tags.go @@ -5,10 +5,8 @@ package services import ( - "database/sql" "fmt" "path" - "strconv" "strings" ) @@ -21,11 +19,7 @@ type dpRawTagsInput struct { Profile DPDeviceProfile McapKey string SidecarTags map[string]string - EpisodeID int64 EpisodePublicID string - TaskID int64 - FactoryID sql.NullInt64 - OrganizationID sql.NullInt64 } func buildDPDirectRawTags(input dpRawTagsInput) (map[string]string, error) { @@ -55,21 +49,10 @@ func buildDPDirectRawTags(input dpRawTagsInput) (map[string]string, error) { } func keystoneExtraTags(input dpRawTagsInput) map[string]string { - tags := map[string]string{ - "episode_id": input.EpisodePublicID, - "keystone_episode_id": strconv.FormatInt(input.EpisodeID, 10), - "sync_channel": "keystone_direct", + return map[string]string{ + "episode_id": input.EpisodePublicID, + "sync_channel": "keystone_direct", } - if input.TaskID > 0 { - tags["task_id"] = strconv.FormatInt(input.TaskID, 10) - } - if input.FactoryID.Valid { - tags["factory_id"] = strconv.FormatInt(input.FactoryID.Int64, 10) - } - if input.OrganizationID.Valid { - tags["organization_id"] = strconv.FormatInt(input.OrganizationID.Int64, 10) - } - return tags } func insertAllNonConflictingTags(dst map[string]string, src map[string]string) error { diff --git a/internal/services/dp_raw_tags_test.go b/internal/services/dp_raw_tags_test.go index bbca857..8155f47 100644 --- a/internal/services/dp_raw_tags_test.go +++ b/internal/services/dp_raw_tags_test.go @@ -5,7 +5,6 @@ package services import ( - "database/sql" "strings" "testing" ) @@ -25,11 +24,7 @@ func TestBuildDPDirectRawTags_MergesInDocumentedOrder(t *testing.T) { "array_field": `["a","b"]`, "empty_value": "", }, - EpisodeID: 42, EpisodePublicID: "episode-public-42", - TaskID: 77, - FactoryID: sql.NullInt64{Int64: 3, Valid: true}, - OrganizationID: sql.NullInt64{Int64: 9, Valid: true}, }) if err != nil { t.Fatalf("buildDPDirectRawTags() error = %v", err) @@ -43,17 +38,18 @@ func TestBuildDPDirectRawTags_MergesInDocumentedOrder(t *testing.T) { "array_field": `["a","b"]`, "empty_value": "", "episode_id": "episode-public-42", - "keystone_episode_id": "42", "sync_channel": "keystone_direct", - "task_id": "77", - "factory_id": "3", - "organization_id": "9", } for key, want := range cases { if got[key] != want { t.Fatalf("tag[%q]=%q want %q tags=%+v", key, got[key], want, got) } } + for _, key := range []string{"keystone_episode_id", "task_id", "factory_id", "organization_id"} { + if _, ok := got[key]; ok { + t.Fatalf("tag[%q] should not be injected: %+v", key, got) + } + } if _, ok := got["device_id"]; ok { t.Fatalf("ordinary device_id raw tag must not be injected: %+v", got) } @@ -69,7 +65,6 @@ func TestBuildDPDirectRawTags_UsesMcapKeyBasenameNotSidecarMcapFile(t *testing.T SidecarTags: map[string]string{ "mcap_file": "sidecar-claimed.mcap", }, - EpisodeID: 1, EpisodePublicID: "episode-1", }) if err != nil { @@ -96,7 +91,6 @@ func TestBuildDPDirectRawTags_ConflictingTagsFail(t *testing.T) { Tags: map[string]string{dpReservedDeviceIDTagKey: "other-device"}, }, McapKey: "bucket/file.mcap", - EpisodeID: 1, EpisodePublicID: "episode-1", }, }, @@ -109,7 +103,6 @@ func TestBuildDPDirectRawTags_ConflictingTagsFail(t *testing.T) { }, McapKey: "bucket/file.mcap", SidecarTags: map[string]string{"scene": "sidecar"}, - EpisodeID: 1, EpisodePublicID: "episode-1", }, }, @@ -122,7 +115,6 @@ func TestBuildDPDirectRawTags_ConflictingTagsFail(t *testing.T) { }, McapKey: "bucket/file.mcap", SidecarTags: map[string]string{"sync_channel": "other"}, - EpisodeID: 1, EpisodePublicID: "episode-1", }, }, @@ -143,7 +135,6 @@ func TestBuildDPDirectRawTags_RejectsEmptyKeyAndRawFile(t *testing.T) { Tags: map[string]string{"": "value"}, }, McapKey: "bucket/file.mcap", - EpisodeID: 1, EpisodePublicID: "episode-1", }) if err == nil || !strings.Contains(err.Error(), "key") { @@ -156,7 +147,6 @@ func TestBuildDPDirectRawTags_RejectsEmptyKeyAndRawFile(t *testing.T) { Tags: map[string]string{"profile": "tag"}, }, McapKey: "bucket/", - EpisodeID: 1, EpisodePublicID: "episode-1", }) if err == nil || !strings.Contains(err.Error(), "raw_file") { diff --git a/internal/services/sidecar_tags.go b/internal/services/sidecar_tags.go index 9f976da..cd8a84c 100644 --- a/internal/services/sidecar_tags.go +++ b/internal/services/sidecar_tags.go @@ -14,8 +14,8 @@ import ( // suitable for use as RawTags in an upload request. // // Nested objects are flattened with dot notation (e.g. "device.device_id"). -// Array values are JSON-encoded into a single string under one key (e.g. task.skills -> ["pick"]). -// The "topics_summary" key is intentionally excluded. +// Array values are JSON-encoded into a single string under one key (e.g. recording.topics -> ["topic"]). +// The "topics_summary" and top-level "task" keys are intentionally excluded. func flattenSidecar(data []byte) (map[string]string, error) { var raw map[string]interface{} if err := json.Unmarshal(data, &raw); err != nil { @@ -31,7 +31,7 @@ func flattenValue(out map[string]string, prefix string, v interface{}) { switch val := v.(type) { case map[string]interface{}: for k, child := range val { - if prefix == "" && k == "topics_summary" { + if prefix == "" && (k == "topics_summary" || k == "task") { continue } flattenValue(out, joinKey(prefix, k), child) diff --git a/internal/services/sidecar_tags_test.go b/internal/services/sidecar_tags_test.go index d945c4b..1d96385 100644 --- a/internal/services/sidecar_tags_test.go +++ b/internal/services/sidecar_tags_test.go @@ -35,6 +35,7 @@ const testSidecarJSON = `{ "scene": "卧室", "skills": ["pick"], "subscene": "床", + "task": "collect-bedroom-bed", "task_id": "task_20260414_054145_205_27_fdd80b4c" }, "topics_summary": [ @@ -65,11 +66,6 @@ func TestFlattenSidecar_BasicFields(t *testing.T) { "recording.duration_sec": "121.966222012", "recording.recording_started_at": "2026-04-14T09:15:05.858Z", "recording.recording_finished_at": "2026-04-14T09:17:07.825Z", - "task.data_collector_id": "刘备", - "task.factory": "Shanghai Factory", - "task.scene": "卧室", - "task.subscene": "床", - "task.task_id": "task_20260414_054145_205_27_fdd80b4c", "version": "1.0", } @@ -83,6 +79,21 @@ func TestFlattenSidecar_BasicFields(t *testing.T) { t.Errorf("tags[%q] = %q, want %q", key, got, want) } } + + for _, key := range []string{ + "task.data_collector_id", + "task.factory", + "task.order_id", + "task.scene", + "task.skills", + "task.subscene", + "task.task", + "task.task_id", + } { + if _, ok := tags[key]; ok { + t.Errorf("task subfield %q should be excluded", key) + } + } } func TestFlattenSidecar_ArraysEncodedAsJSONString(t *testing.T) { @@ -93,7 +104,6 @@ func TestFlattenSidecar_ArraysEncodedAsJSONString(t *testing.T) { cases := map[string]string{ "recording.topics_recorded": `["/hal/camera/head/color/camera_info","/system/info"]`, - "task.skills": `["pick"]`, } for key, want := range cases { diff --git a/internal/services/sync_worker.go b/internal/services/sync_worker.go index 6eaa50c..65e777e 100644 --- a/internal/services/sync_worker.go +++ b/internal/services/sync_worker.go @@ -44,16 +44,13 @@ type syncEnqueueRequest struct { } type syncEpisodeUploadRow struct { - ID int64 `db:"id"` - EpisodeUUID string `db:"episode_id"` - TaskID int64 `db:"task_id"` - McapPath string `db:"mcap_path"` - SidecarPath string `db:"sidecar_path"` - CloudSynced bool `db:"cloud_synced"` - Metadata sql.NullString `db:"metadata"` - WorkstationID sql.NullInt64 `db:"workstation_id"` - FactoryID sql.NullInt64 `db:"factory_id"` - OrganizationID sql.NullInt64 `db:"organization_id"` + ID int64 `db:"id"` + EpisodeUUID string `db:"episode_id"` + McapPath string `db:"mcap_path"` + SidecarPath string `db:"sidecar_path"` + CloudSynced bool `db:"cloud_synced"` + Metadata sql.NullString `db:"metadata"` + WorkstationID sql.NullInt64 `db:"workstation_id"` } // SyncProgressSnapshot is the latest in-memory progress for an active episode sync. @@ -907,14 +904,11 @@ func (w *SyncWorker) processEpisodeWithMode(ctx context.Context, episodeID int64 SELECT id, episode_id, - task_id, mcap_path, sidecar_path, cloud_synced, metadata, - workstation_id, - factory_id, - organization_id + workstation_id FROM episodes WHERE id = ? AND deleted_at IS NULL `, episodeID) @@ -982,11 +976,7 @@ func (w *SyncWorker) uploadEpisodeDirect(ctx context.Context, ep syncEpisodeUplo Profile: dpConfig.Profile, McapKey: mcapKey, SidecarTags: sidecarTags, - EpisodeID: ep.ID, EpisodePublicID: ep.EpisodeUUID, - TaskID: ep.TaskID, - FactoryID: ep.FactoryID, - OrganizationID: ep.OrganizationID, }) if err != nil { return nil, wrapNonRetryableSyncError(err, "build raw tags for episode %d", ep.ID) From a26374a01a3ce910546db210df76dac6af24f211 Mon Sep 17 00:00:00 2001 From: chaoliu Date: Wed, 17 Jun 2026 14:39:29 +0800 Subject: [PATCH 3/3] feat: add useful tags --- internal/services/dp_raw_tags.go | 39 +++++++++++++- internal/services/dp_raw_tags_test.go | 62 +++++++++++++++++++--- internal/services/sync_worker.go | 74 +++++++++++++++++++++------ 3 files changed, 150 insertions(+), 25 deletions(-) diff --git a/internal/services/dp_raw_tags.go b/internal/services/dp_raw_tags.go index e36694d..3910d08 100644 --- a/internal/services/dp_raw_tags.go +++ b/internal/services/dp_raw_tags.go @@ -5,6 +5,7 @@ package services import ( + "database/sql" "fmt" "path" "strings" @@ -20,6 +21,20 @@ type dpRawTagsInput struct { McapKey string SidecarTags map[string]string EpisodePublicID string + Context dpRawTagContext +} + +type dpRawTagContext struct { + SOPSlug sql.NullString + SOPVersion sql.NullString + SOPDescription sql.NullString + Scene sql.NullString + Subscene sql.NullString + RobotType sql.NullString + DataCollectorOperatorID sql.NullString + DataCollectorName sql.NullString + OrderName sql.NullString + BatchID sql.NullString } func buildDPDirectRawTags(input dpRawTagsInput) (map[string]string, error) { @@ -49,10 +64,32 @@ func buildDPDirectRawTags(input dpRawTagsInput) (map[string]string, error) { } func keystoneExtraTags(input dpRawTagsInput) map[string]string { - return map[string]string{ + tags := map[string]string{ "episode_id": input.EpisodePublicID, "sync_channel": "keystone_direct", } + addNonEmptyTag(tags, "sop_slug", input.Context.SOPSlug) + addNonEmptyTag(tags, "sop_version", input.Context.SOPVersion) + addNonEmptyTag(tags, "sop_description", input.Context.SOPDescription) + addNonEmptyTag(tags, "scene", input.Context.Scene) + addNonEmptyTag(tags, "subscene", input.Context.Subscene) + addNonEmptyTag(tags, "robot_type", input.Context.RobotType) + addNonEmptyTag(tags, "data_collector_operator_id", input.Context.DataCollectorOperatorID) + addNonEmptyTag(tags, "data_collector_name", input.Context.DataCollectorName) + addNonEmptyTag(tags, "order_name", input.Context.OrderName) + addNonEmptyTag(tags, "batch_id", input.Context.BatchID) + return tags +} + +func addNonEmptyTag(tags map[string]string, key string, value sql.NullString) { + if !value.Valid { + return + } + trimmed := strings.TrimSpace(value.String) + if trimmed == "" { + return + } + tags[key] = trimmed } func insertAllNonConflictingTags(dst map[string]string, src map[string]string) error { diff --git a/internal/services/dp_raw_tags_test.go b/internal/services/dp_raw_tags_test.go index 8155f47..16cfd83 100644 --- a/internal/services/dp_raw_tags_test.go +++ b/internal/services/dp_raw_tags_test.go @@ -5,6 +5,7 @@ package services import ( + "database/sql" "strings" "testing" ) @@ -25,20 +26,42 @@ func TestBuildDPDirectRawTags_MergesInDocumentedOrder(t *testing.T) { "empty_value": "", }, EpisodePublicID: "episode-public-42", + Context: dpRawTagContext{ + SOPSlug: sql.NullString{String: "pick-place", Valid: true}, + SOPVersion: sql.NullString{String: "2.1.0", Valid: true}, + SOPDescription: sql.NullString{String: " pick up and place object ", Valid: true}, + Scene: sql.NullString{String: "Bedroom", Valid: true}, + Subscene: sql.NullString{String: "Bed", Valid: true}, + RobotType: sql.NullString{String: "Mobile Manipulator", Valid: true}, + DataCollectorOperatorID: sql.NullString{String: "op-001", Valid: true}, + DataCollectorName: sql.NullString{String: "Alice", Valid: true}, + OrderName: sql.NullString{String: "Order A", Valid: true}, + BatchID: sql.NullString{String: "batch_20260615_034848_268_00_378b4bcf", Valid: true}, + }, }) if err != nil { t.Fatalf("buildDPDirectRawTags() error = %v", err) } cases := map[string]string{ - "profile": "tag", - "same": "value", - dpReservedDeviceIDTagKey: "asset-1", - dpReservedRawFileTagKey: "task.mcap", - "array_field": `["a","b"]`, - "empty_value": "", - "episode_id": "episode-public-42", - "sync_channel": "keystone_direct", + "profile": "tag", + "same": "value", + dpReservedDeviceIDTagKey: "asset-1", + dpReservedRawFileTagKey: "task.mcap", + "array_field": `["a","b"]`, + "empty_value": "", + "episode_id": "episode-public-42", + "sync_channel": "keystone_direct", + "sop_slug": "pick-place", + "sop_version": "2.1.0", + "sop_description": "pick up and place object", + "scene": "Bedroom", + "subscene": "Bed", + "robot_type": "Mobile Manipulator", + "data_collector_operator_id": "op-001", + "data_collector_name": "Alice", + "order_name": "Order A", + "batch_id": "batch_20260615_034848_268_00_378b4bcf", } for key, want := range cases { if got[key] != want { @@ -55,6 +78,29 @@ func TestBuildDPDirectRawTags_MergesInDocumentedOrder(t *testing.T) { } } +func TestBuildDPDirectRawTags_SkipsEmptyContextTags(t *testing.T) { + got, err := buildDPDirectRawTags(dpRawTagsInput{ + Profile: DPDeviceProfile{ + DeviceID: "asset-1", + Tags: map[string]string{"profile": "tag"}, + }, + McapKey: "bucket/file.mcap", + EpisodePublicID: "episode-1", + Context: dpRawTagContext{ + SOPSlug: sql.NullString{String: " ", Valid: true}, + Scene: sql.NullString{String: "Scene A", Valid: false}, + }, + }) + if err != nil { + t.Fatalf("buildDPDirectRawTags() error = %v", err) + } + for _, key := range []string{"sop_slug", "scene"} { + if _, ok := got[key]; ok { + t.Fatalf("empty context tag %q should be skipped: %+v", key, got) + } + } +} + func TestBuildDPDirectRawTags_UsesMcapKeyBasenameNotSidecarMcapFile(t *testing.T) { got, err := buildDPDirectRawTags(dpRawTagsInput{ Profile: DPDeviceProfile{ diff --git a/internal/services/sync_worker.go b/internal/services/sync_worker.go index 65e777e..aa5b109 100644 --- a/internal/services/sync_worker.go +++ b/internal/services/sync_worker.go @@ -44,13 +44,23 @@ type syncEnqueueRequest struct { } type syncEpisodeUploadRow struct { - ID int64 `db:"id"` - EpisodeUUID string `db:"episode_id"` - McapPath string `db:"mcap_path"` - SidecarPath string `db:"sidecar_path"` - CloudSynced bool `db:"cloud_synced"` - Metadata sql.NullString `db:"metadata"` - WorkstationID sql.NullInt64 `db:"workstation_id"` + ID int64 `db:"id"` + EpisodeUUID string `db:"episode_id"` + McapPath string `db:"mcap_path"` + SidecarPath string `db:"sidecar_path"` + CloudSynced bool `db:"cloud_synced"` + Metadata sql.NullString `db:"metadata"` + WorkstationID sql.NullInt64 `db:"workstation_id"` + SOPSlug sql.NullString `db:"sop_slug"` + SOPVersion sql.NullString `db:"sop_version"` + SOPDescription sql.NullString `db:"sop_description"` + Scene sql.NullString `db:"scene"` + Subscene sql.NullString `db:"subscene"` + RobotType sql.NullString `db:"robot_type"` + DataCollectorOperatorID sql.NullString `db:"data_collector_operator_id"` + DataCollectorName sql.NullString `db:"data_collector_name"` + OrderName sql.NullString `db:"order_name"` + BatchID sql.NullString `db:"batch_id"` } // SyncProgressSnapshot is the latest in-memory progress for an active episode sync. @@ -902,15 +912,35 @@ func (w *SyncWorker) processEpisodeWithMode(ctx context.Context, episodeID int64 var ep syncEpisodeUploadRow err := w.db.GetContext(ctx, &ep, ` SELECT - id, - episode_id, - mcap_path, - sidecar_path, - cloud_synced, - metadata, - workstation_id - FROM episodes - WHERE id = ? AND deleted_at IS NULL + e.id, + e.episode_id, + e.mcap_path, + e.sidecar_path, + e.cloud_synced, + e.metadata, + e.workstation_id, + s.slug AS sop_slug, + s.version AS sop_version, + s.description AS sop_description, + COALESCE(NULLIF(e.scene_name, ''), NULLIF(t.scene_name, ''), NULLIF(sc.name, '')) AS scene, + COALESCE(NULLIF(t.subscene_name, ''), NULLIF(ss.name, '')) AS subscene, + COALESCE(NULLIF(rt.name, ''), NULLIF(rt.model, ''), NULLIF(ws.robot_name, '')) AS robot_type, + COALESCE(NULLIF(dc.operator_id, ''), NULLIF(ws.collector_operator_id, '')) AS data_collector_operator_id, + COALESCE(NULLIF(dc.name, ''), NULLIF(ws.collector_name, '')) AS data_collector_name, + o.name AS order_name, + b.batch_id AS batch_id + FROM episodes e + LEFT JOIN tasks t ON t.id = e.task_id AND t.deleted_at IS NULL + LEFT JOIN sops s ON s.id = COALESCE(e.sop_id, t.sop_id) AND s.deleted_at IS NULL + LEFT JOIN scenes sc ON sc.id = COALESCE(e.scene_id, t.scene_id) AND sc.deleted_at IS NULL + LEFT JOIN subscenes ss ON ss.id = t.subscene_id AND ss.deleted_at IS NULL + LEFT JOIN workstations ws ON ws.id = COALESCE(e.workstation_id, t.workstation_id) AND ws.deleted_at IS NULL + LEFT JOIN robots r ON r.id = ws.robot_id AND r.deleted_at IS NULL + LEFT JOIN robot_types rt ON rt.id = r.robot_type_id AND rt.deleted_at IS NULL + LEFT JOIN data_collectors dc ON dc.id = ws.data_collector_id AND dc.deleted_at IS NULL + LEFT JOIN orders o ON o.id = COALESCE(e.order_id, t.order_id) AND o.deleted_at IS NULL + LEFT JOIN batches b ON b.id = COALESCE(e.batch_id, t.batch_id) AND b.deleted_at IS NULL + WHERE e.id = ? AND e.deleted_at IS NULL `, episodeID) if err == sql.ErrNoRows { logger.Printf("[SYNC-WORKER] Episode %d not found, skipping", episodeID) @@ -977,6 +1007,18 @@ func (w *SyncWorker) uploadEpisodeDirect(ctx context.Context, ep syncEpisodeUplo McapKey: mcapKey, SidecarTags: sidecarTags, EpisodePublicID: ep.EpisodeUUID, + Context: dpRawTagContext{ + SOPSlug: ep.SOPSlug, + SOPVersion: ep.SOPVersion, + SOPDescription: ep.SOPDescription, + Scene: ep.Scene, + Subscene: ep.Subscene, + RobotType: ep.RobotType, + DataCollectorOperatorID: ep.DataCollectorOperatorID, + DataCollectorName: ep.DataCollectorName, + OrderName: ep.OrderName, + BatchID: ep.BatchID, + }, }) if err != nil { return nil, wrapNonRetryableSyncError(err, "build raw tags for episode %d", ep.ID)