Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions internal/cloud/uploader.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,8 @@ const (
resumeOSSAlreadyComplete // OSS object already present and verified; skip upload, go straight to CompleteUpload
)

const uploadProgressLogPartInterval = 10

// gatewayClient is the subset of GatewayClient methods used by Uploader.
// It is defined as an interface to allow test injection of fake implementations.
type gatewayClient interface {
Expand Down Expand Up @@ -711,8 +713,10 @@ func (u *Uploader) streamMultipartParts(ctx context.Context, episodeID string, s
progress(offset, fileSize)
}

logger.Printf("[CLOUD-UPLOAD] Progress: episode=%s parts=%d offset=%d/%d",
episodeID, len(parts), offset, fileSize)
if len(parts)%uploadProgressLogPartInterval == 0 || offset == fileSize {
logger.Printf("[CLOUD-UPLOAD] Progress: episode=%s parts=%d offset=%d/%d",
episodeID, len(parts), offset, fileSize)
}
}

return session, parts, partMD5s, nil
Expand Down
52 changes: 36 additions & 16 deletions internal/services/dp_raw_tags.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"database/sql"
"fmt"
"path"
"strconv"
"strings"
)

Expand All @@ -21,11 +20,21 @@ type dpRawTagsInput struct {
Profile DPDeviceProfile
McapKey string
SidecarTags map[string]string
EpisodeID int64
EpisodePublicID string
TaskID int64
FactoryID sql.NullInt64
OrganizationID sql.NullInt64
Context dpRawTagContext
}

type dpRawTagContext struct {
SOPSlug sql.NullString
SOPVersion sql.NullString
SOPDescription sql.NullString
Scene sql.NullString
Subscene sql.NullString
RobotType sql.NullString
DataCollectorOperatorID sql.NullString
DataCollectorName sql.NullString
OrderName sql.NullString
BatchID sql.NullString
}

func buildDPDirectRawTags(input dpRawTagsInput) (map[string]string, error) {
Expand Down Expand Up @@ -56,20 +65,31 @@ func buildDPDirectRawTags(input dpRawTagsInput) (map[string]string, error) {

func keystoneExtraTags(input dpRawTagsInput) map[string]string {
tags := map[string]string{
"episode_id": input.EpisodePublicID,
"keystone_episode_id": strconv.FormatInt(input.EpisodeID, 10),
"sync_channel": "keystone_direct",
"episode_id": input.EpisodePublicID,
"sync_channel": "keystone_direct",
}
if input.TaskID > 0 {
tags["task_id"] = strconv.FormatInt(input.TaskID, 10)
}
if input.FactoryID.Valid {
tags["factory_id"] = strconv.FormatInt(input.FactoryID.Int64, 10)
addNonEmptyTag(tags, "sop_slug", input.Context.SOPSlug)
addNonEmptyTag(tags, "sop_version", input.Context.SOPVersion)
addNonEmptyTag(tags, "sop_description", input.Context.SOPDescription)
addNonEmptyTag(tags, "scene", input.Context.Scene)
addNonEmptyTag(tags, "subscene", input.Context.Subscene)
addNonEmptyTag(tags, "robot_type", input.Context.RobotType)
addNonEmptyTag(tags, "data_collector_operator_id", input.Context.DataCollectorOperatorID)
addNonEmptyTag(tags, "data_collector_name", input.Context.DataCollectorName)
addNonEmptyTag(tags, "order_name", input.Context.OrderName)
addNonEmptyTag(tags, "batch_id", input.Context.BatchID)
return tags
}

func addNonEmptyTag(tags map[string]string, key string, value sql.NullString) {
if !value.Valid {
return
}
if input.OrganizationID.Valid {
tags["organization_id"] = strconv.FormatInt(input.OrganizationID.Int64, 10)
trimmed := strings.TrimSpace(value.String)
if trimmed == "" {
return
}
return tags
tags[key] = trimmed
}

func insertAllNonConflictingTags(dst map[string]string, src map[string]string) error {
Expand Down
80 changes: 58 additions & 22 deletions internal/services/dp_raw_tags_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,40 +25,82 @@ func TestBuildDPDirectRawTags_MergesInDocumentedOrder(t *testing.T) {
"array_field": `["a","b"]`,
"empty_value": "",
},
EpisodeID: 42,
EpisodePublicID: "episode-public-42",
TaskID: 77,
FactoryID: sql.NullInt64{Int64: 3, Valid: true},
OrganizationID: sql.NullInt64{Int64: 9, Valid: true},
Context: dpRawTagContext{
SOPSlug: sql.NullString{String: "pick-place", Valid: true},
SOPVersion: sql.NullString{String: "2.1.0", Valid: true},
SOPDescription: sql.NullString{String: " pick up and place object ", Valid: true},
Scene: sql.NullString{String: "Bedroom", Valid: true},
Subscene: sql.NullString{String: "Bed", Valid: true},
RobotType: sql.NullString{String: "Mobile Manipulator", Valid: true},
DataCollectorOperatorID: sql.NullString{String: "op-001", Valid: true},
DataCollectorName: sql.NullString{String: "Alice", Valid: true},
OrderName: sql.NullString{String: "Order A", Valid: true},
BatchID: sql.NullString{String: "batch_20260615_034848_268_00_378b4bcf", Valid: true},
},
})
if err != nil {
t.Fatalf("buildDPDirectRawTags() error = %v", err)
}

cases := map[string]string{
"profile": "tag",
"same": "value",
dpReservedDeviceIDTagKey: "asset-1",
dpReservedRawFileTagKey: "task.mcap",
"array_field": `["a","b"]`,
"empty_value": "",
"episode_id": "episode-public-42",
"keystone_episode_id": "42",
"sync_channel": "keystone_direct",
"task_id": "77",
"factory_id": "3",
"organization_id": "9",
"profile": "tag",
"same": "value",
dpReservedDeviceIDTagKey: "asset-1",
dpReservedRawFileTagKey: "task.mcap",
"array_field": `["a","b"]`,
"empty_value": "",
"episode_id": "episode-public-42",
"sync_channel": "keystone_direct",
"sop_slug": "pick-place",
"sop_version": "2.1.0",
"sop_description": "pick up and place object",
"scene": "Bedroom",
"subscene": "Bed",
"robot_type": "Mobile Manipulator",
"data_collector_operator_id": "op-001",
"data_collector_name": "Alice",
"order_name": "Order A",
"batch_id": "batch_20260615_034848_268_00_378b4bcf",
}
for key, want := range cases {
if got[key] != want {
t.Fatalf("tag[%q]=%q want %q tags=%+v", key, got[key], want, got)
}
}
for _, key := range []string{"keystone_episode_id", "task_id", "factory_id", "organization_id"} {
if _, ok := got[key]; ok {
t.Fatalf("tag[%q] should not be injected: %+v", key, got)
}
}
if _, ok := got["device_id"]; ok {
t.Fatalf("ordinary device_id raw tag must not be injected: %+v", got)
}
}

func TestBuildDPDirectRawTags_SkipsEmptyContextTags(t *testing.T) {
got, err := buildDPDirectRawTags(dpRawTagsInput{
Profile: DPDeviceProfile{
DeviceID: "asset-1",
Tags: map[string]string{"profile": "tag"},
},
McapKey: "bucket/file.mcap",
EpisodePublicID: "episode-1",
Context: dpRawTagContext{
SOPSlug: sql.NullString{String: " ", Valid: true},
Scene: sql.NullString{String: "Scene A", Valid: false},
},
})
if err != nil {
t.Fatalf("buildDPDirectRawTags() error = %v", err)
}
for _, key := range []string{"sop_slug", "scene"} {
if _, ok := got[key]; ok {
t.Fatalf("empty context tag %q should be skipped: %+v", key, got)
}
}
}

func TestBuildDPDirectRawTags_UsesMcapKeyBasenameNotSidecarMcapFile(t *testing.T) {
got, err := buildDPDirectRawTags(dpRawTagsInput{
Profile: DPDeviceProfile{
Expand All @@ -69,7 +111,6 @@ func TestBuildDPDirectRawTags_UsesMcapKeyBasenameNotSidecarMcapFile(t *testing.T
SidecarTags: map[string]string{
"mcap_file": "sidecar-claimed.mcap",
},
EpisodeID: 1,
EpisodePublicID: "episode-1",
})
if err != nil {
Expand All @@ -96,7 +137,6 @@ func TestBuildDPDirectRawTags_ConflictingTagsFail(t *testing.T) {
Tags: map[string]string{dpReservedDeviceIDTagKey: "other-device"},
},
McapKey: "bucket/file.mcap",
EpisodeID: 1,
EpisodePublicID: "episode-1",
},
},
Expand All @@ -109,7 +149,6 @@ func TestBuildDPDirectRawTags_ConflictingTagsFail(t *testing.T) {
},
McapKey: "bucket/file.mcap",
SidecarTags: map[string]string{"scene": "sidecar"},
EpisodeID: 1,
EpisodePublicID: "episode-1",
},
},
Expand All @@ -122,7 +161,6 @@ func TestBuildDPDirectRawTags_ConflictingTagsFail(t *testing.T) {
},
McapKey: "bucket/file.mcap",
SidecarTags: map[string]string{"sync_channel": "other"},
EpisodeID: 1,
EpisodePublicID: "episode-1",
},
},
Expand All @@ -143,7 +181,6 @@ func TestBuildDPDirectRawTags_RejectsEmptyKeyAndRawFile(t *testing.T) {
Tags: map[string]string{"": "value"},
},
McapKey: "bucket/file.mcap",
EpisodeID: 1,
EpisodePublicID: "episode-1",
})
if err == nil || !strings.Contains(err.Error(), "key") {
Expand All @@ -156,7 +193,6 @@ func TestBuildDPDirectRawTags_RejectsEmptyKeyAndRawFile(t *testing.T) {
Tags: map[string]string{"profile": "tag"},
},
McapKey: "bucket/",
EpisodeID: 1,
EpisodePublicID: "episode-1",
})
if err == nil || !strings.Contains(err.Error(), "raw_file") {
Expand Down
6 changes: 3 additions & 3 deletions internal/services/sidecar_tags.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ import (
// suitable for use as RawTags in an upload request.
//
// Nested objects are flattened with dot notation (e.g. "device.device_id").
// Array values are JSON-encoded into a single string under one key (e.g. task.skills -> ["pick"]).
// The "topics_summary" key is intentionally excluded.
// Array values are JSON-encoded into a single string under one key (e.g. recording.topics -> ["topic"]).
// The "topics_summary" and top-level "task" keys are intentionally excluded.
func flattenSidecar(data []byte) (map[string]string, error) {
var raw map[string]interface{}
if err := json.Unmarshal(data, &raw); err != nil {
Expand All @@ -31,7 +31,7 @@ func flattenValue(out map[string]string, prefix string, v interface{}) {
switch val := v.(type) {
case map[string]interface{}:
for k, child := range val {
if prefix == "" && k == "topics_summary" {
if prefix == "" && (k == "topics_summary" || k == "task") {
continue
}
flattenValue(out, joinKey(prefix, k), child)
Expand Down
22 changes: 16 additions & 6 deletions internal/services/sidecar_tags_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ const testSidecarJSON = `{
"scene": "卧室",
"skills": ["pick"],
"subscene": "床",
"task": "collect-bedroom-bed",
"task_id": "task_20260414_054145_205_27_fdd80b4c"
},
"topics_summary": [
Expand Down Expand Up @@ -65,11 +66,6 @@ func TestFlattenSidecar_BasicFields(t *testing.T) {
"recording.duration_sec": "121.966222012",
"recording.recording_started_at": "2026-04-14T09:15:05.858Z",
"recording.recording_finished_at": "2026-04-14T09:17:07.825Z",
"task.data_collector_id": "刘备",
"task.factory": "Shanghai Factory",
"task.scene": "卧室",
"task.subscene": "床",
"task.task_id": "task_20260414_054145_205_27_fdd80b4c",
"version": "1.0",
}

Expand All @@ -83,6 +79,21 @@ func TestFlattenSidecar_BasicFields(t *testing.T) {
t.Errorf("tags[%q] = %q, want %q", key, got, want)
}
}

for _, key := range []string{
"task.data_collector_id",
"task.factory",
"task.order_id",
"task.scene",
"task.skills",
"task.subscene",
"task.task",
"task.task_id",
} {
if _, ok := tags[key]; ok {
t.Errorf("task subfield %q should be excluded", key)
}
}
}

func TestFlattenSidecar_ArraysEncodedAsJSONString(t *testing.T) {
Expand All @@ -93,7 +104,6 @@ func TestFlattenSidecar_ArraysEncodedAsJSONString(t *testing.T) {

cases := map[string]string{
"recording.topics_recorded": `["/hal/camera/head/color/camera_info","/system/info"]`,
"task.skills": `["pick"]`,
}

for key, want := range cases {
Expand Down
Loading
Loading