Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
125 changes: 99 additions & 26 deletions docs/designs/cloud-sync-go-direct-upload.zh.html

Large diffs are not rendered by default.

394 changes: 346 additions & 48 deletions internal/api/handlers/robot.go

Large diffs are not rendered by default.

125 changes: 107 additions & 18 deletions internal/api/handlers/robot_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import (
"encoding/json"
"net/http"
"net/http/httptest"
"os"
"path/filepath"
"strings"
"testing"
"time"
Expand Down Expand Up @@ -265,12 +267,12 @@ func TestRobotHandlerListRobots_ConnectedFilterUsesHubIntersection(t *testing.T)
})
}

func TestRobotHandlerAssetID_CreateUpdateAndList(t *testing.T) {
func TestRobotHandlerAssetID_CreateValidatesDPProfileAndList(t *testing.T) {
db := newTestRobotHandlerDB(t)
defer db.Close()
seedRobotLookups(t, db)

r := newTestRobotRouter(t, db)
r := newTestRobotRouterWithDPConfig(t, db, writeRobotDPConfigFixture(t, robotDPConfigJSON("asset-1")))

req := httptest.NewRequest(http.MethodPost, "/api/v1/robots", bytes.NewBufferString(`{
"robot_type_id": "10",
Expand Down Expand Up @@ -311,44 +313,56 @@ func TestRobotHandlerAssetID_CreateUpdateAndList(t *testing.T) {
req.Header.Set("Content-Type", "application/json")
w = httptest.NewRecorder()
r.ServeHTTP(w, req)
if w.Code != http.StatusOK {
t.Fatalf("same-value update status=%d want=%d body=%s", w.Code, http.StatusOK, w.Body.String())
if w.Code != http.StatusBadRequest {
t.Fatalf("ordinary update status=%d want=%d body=%s", w.Code, http.StatusBadRequest, w.Body.String())
}
}

func TestRobotHandlerAssetID_ImmutableOnceSet(t *testing.T) {
func TestRobotHandlerCloudAsset_BindChangeUnbind(t *testing.T) {
db := newTestRobotHandlerDB(t)
defer db.Close()
seedRobotLookups(t, db)
seedRobot(t, db, 1, "local-device-1", "asset-1", nil)
seedRobot(t, db, 1, "local-device-1", "", nil)

r := newTestRobotRouter(t, db)
r := newTestRobotRouterWithDPConfig(t, db, writeRobotDPConfigFixture(t, robotDPConfigJSON("asset-1", "asset-2")))

for _, tt := range []struct {
name string
body string
name string
method string
path string
body string
wantAsset string
}{
{name: "change rejected", body: `{"asset_id":"asset-2"}`},
{name: "clear rejected", body: `{"asset_id":""}`},
{name: "blank clear rejected", body: `{"asset_id":" "}`},
{name: "bind", method: http.MethodPut, path: "/api/v1/robots/1/cloud-asset", body: `{"asset_id":" asset-1 "}`, wantAsset: "asset-1"},
{name: "same value idempotent", method: http.MethodPut, path: "/api/v1/robots/1/cloud-asset", body: `{"asset_id":"asset-1"}`, wantAsset: "asset-1"},
{name: "change", method: http.MethodPut, path: "/api/v1/robots/1/cloud-asset", body: `{"asset_id":"asset-2"}`, wantAsset: "asset-2"},
{name: "unbind", method: http.MethodDelete, path: "/api/v1/robots/1/cloud-asset", wantAsset: ""},
{name: "unbind idempotent", method: http.MethodDelete, path: "/api/v1/robots/1/cloud-asset", wantAsset: ""},
} {
t.Run(tt.name, func(t *testing.T) {
req := httptest.NewRequest(http.MethodPut, "/api/v1/robots/1", bytes.NewBufferString(tt.body))
req := httptest.NewRequest(tt.method, tt.path, bytes.NewBufferString(tt.body))
req.Header.Set("Content-Type", "application/json")
w := httptest.NewRecorder()
r.ServeHTTP(w, req)
if w.Code != http.StatusBadRequest {
t.Fatalf("status=%d want=%d body=%s", w.Code, http.StatusBadRequest, w.Body.String())
if w.Code != http.StatusOK {
t.Fatalf("status=%d want=%d body=%s", w.Code, http.StatusOK, w.Body.String())
}
var resp RobotResponse
if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil {
t.Fatalf("unmarshal response: %v body=%s", err, w.Body.String())
}
if resp.AssetID != tt.wantAsset {
t.Fatalf("asset_id=%q want=%q response=%#v", resp.AssetID, tt.wantAsset, resp)
}
})
}

req := httptest.NewRequest(http.MethodPut, "/api/v1/robots/1", bytes.NewBufferString(`{"asset_id":null}`))
req := httptest.NewRequest(http.MethodPut, "/api/v1/robots/1/cloud-asset", bytes.NewBufferString(`{"asset_id":" "}`))
req.Header.Set("Content-Type", "application/json")
w := httptest.NewRecorder()
r.ServeHTTP(w, req)
if w.Code != http.StatusBadRequest {
t.Fatalf("null clear status=%d want=%d body=%s", w.Code, http.StatusBadRequest, w.Body.String())
t.Fatalf("blank bind status=%d want=%d body=%s", w.Code, http.StatusBadRequest, w.Body.String())
}
}

Expand All @@ -360,7 +374,7 @@ func TestRobotHandlerAssetID_UniqueAmongActiveRobots(t *testing.T) {
deletedAt := time.Now().UTC()
seedRobot(t, db, 2, "deleted-device", "deleted-asset", &deletedAt)

r := newTestRobotRouter(t, db)
r := newTestRobotRouterWithDPConfig(t, db, writeRobotDPConfigFixture(t, robotDPConfigJSON("asset-1", "deleted-asset")))

req := httptest.NewRequest(http.MethodPost, "/api/v1/robots", bytes.NewBufferString(`{
"robot_type_id": "10",
Expand Down Expand Up @@ -389,6 +403,49 @@ func TestRobotHandlerAssetID_UniqueAmongActiveRobots(t *testing.T) {
}
}

func TestRobotHandlerCloudAssetOptions(t *testing.T) {
db := newTestRobotHandlerDB(t)
defer db.Close()
seedRobotLookups(t, db)
seedRobot(t, db, 1, "local-device-1", "asset-1", nil)
seedRobot(t, db, 2, "local-device-2", "current-asset", nil)
deletedAt := time.Now().UTC()
seedRobot(t, db, 3, "deleted-device", "deleted-asset", &deletedAt)

body := `{
"version": 3,
"endpoints": {"auth": "auth:1", "gateway": "gateway:2"},
"devices": [
{"deviceId":"asset-1","apiKey":"key","tags":{"k":"v"}},
{"deviceId":"asset-2","apiKey":"key","tags":{"k":"v"}},
{"deviceId":"current-asset","apiKey":"key","tags":{"k":"v"}},
{"deviceId":"deleted-asset","apiKey":"key","tags":{"k":"v"}},
{"deviceId":"bad-api","apiKey":" ","tags":{"k":"v"}},
{"deviceId":"bad-tags","apiKey":"key","tags":{}}
]
}`
r := newTestRobotRouterWithDPConfig(t, db, writeRobotDPConfigFixture(t, body))

req := httptest.NewRequest(http.MethodGet, "/api/v1/robots/cloud-asset-options?robot_id=2&q=ASSET", nil)
w := httptest.NewRecorder()
r.ServeHTTP(w, req)
if w.Code != http.StatusOK {
t.Fatalf("status=%d want=%d body=%s", w.Code, http.StatusOK, w.Body.String())
}
var resp CloudAssetOptionsResponse
if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil {
t.Fatalf("unmarshal options: %v body=%s", err, w.Body.String())
}
got := make([]string, 0, len(resp.Items))
for _, item := range resp.Items {
got = append(got, item.DeviceID)
}
want := []string{"asset-2", "deleted-asset"}
if strings.Join(got, ",") != strings.Join(want, ",") {
t.Fatalf("options=%v want=%v", got, want)
}
}

func TestRobotHandlerAssetID_Validation(t *testing.T) {
db := newTestRobotHandlerDB(t)
defer db.Close()
Expand Down Expand Up @@ -428,6 +485,14 @@ func newTestRobotRouter(t *testing.T, db *sqlx.DB) *gin.Engine {
return newTestRobotRouterWithHubs(t, db, nil, nil)
}

func newTestRobotRouterWithDPConfig(t *testing.T, db *sqlx.DB, dpConfigPath string) *gin.Engine {
t.Helper()
gin.SetMode(gin.TestMode)
r := gin.New()
NewRobotHandler(db, nil, nil, dpConfigPath).RegisterRoutes(r.Group("/api/v1"))
return r
}

func newTestRobotRouterWithHubs(t *testing.T, db *sqlx.DB, recorderHub *services.RecorderHub, transferHub *services.TransferHub) *gin.Engine {
t.Helper()
gin.SetMode(gin.TestMode)
Expand All @@ -436,6 +501,30 @@ func newTestRobotRouterWithHubs(t *testing.T, db *sqlx.DB, recorderHub *services
return r
}

func writeRobotDPConfigFixture(t *testing.T, body string) string {
t.Helper()
path := filepath.Join(t.TempDir(), "dp-config.json")
if err := os.WriteFile(path, []byte(body), 0o600); err != nil {
t.Fatalf("write DP config fixture: %v", err)
}
return path
}

func robotDPConfigJSON(deviceIDs ...string) string {
var b strings.Builder
b.WriteString(`{"version":3,"endpoints":{"auth":"auth:1","gateway":"gateway:2"},"devices":[`)
for i, deviceID := range deviceIDs {
if i > 0 {
b.WriteString(",")
}
b.WriteString(`{"deviceId":"`)
b.WriteString(deviceID)
b.WriteString(`","apiKey":"key","tags":{"k":"v"}}`)
}
b.WriteString(`]}`)
return b.String()
}

func connectRecorderForTest(t *testing.T, hub *services.RecorderHub, deviceID string, connectedAt time.Time) {
t.Helper()
conn := hub.NewRecorderConn(nil, deviceID, "127.0.0.1")
Expand Down
2 changes: 1 addition & 1 deletion internal/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ func New(cfg *config.Config, db *sqlx.DB, s3Client *s3.Client, syncWorker *servi
if db != nil {
batchHandler = handlers.NewBatchHandler(db, recorderHub, recorderRPCTimeout)
robotTypeHandler = handlers.NewRobotTypeHandler(db)
robotHandler = handlers.NewRobotHandler(db, recorderHub, transferHub)
robotHandler = handlers.NewRobotHandler(db, recorderHub, transferHub, cfg.Sync.DPConfigPath)
deviceRegistrationHandler = handlers.NewDeviceRegistrationHandler(db)
factoryHandler = handlers.NewFactoryHandler(db)
dataCollectorHandler = handlers.NewDataCollectorHandler(db)
Expand Down
58 changes: 34 additions & 24 deletions internal/services/dp_asset_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,35 +27,45 @@ func assetIDFromEpisodeMetadata(metadata sql.NullString) string {
}

func resolveAssetIDForEpisode(ctx context.Context, db *sqlx.DB, episodeID int64, metadata sql.NullString, workstationID sql.NullInt64) (string, error) {
if assetID := assetIDFromEpisodeMetadata(metadata); assetID != "" {
return assetID, nil
metadataAssetID := assetIDFromEpisodeMetadata(metadata)
if db != nil && workstationID.Valid && workstationID.Int64 > 0 {
var row struct {
RobotID sql.NullInt64 `db:"robot_id"`
ResolvedRobotID sql.NullInt64 `db:"resolved_robot_id"`
AssetID sql.NullString `db:"asset_id"`
}
err := db.GetContext(ctx, &row, `
SELECT ws.robot_id, r.id AS resolved_robot_id, r.asset_id
FROM workstations ws
LEFT JOIN robots r ON r.id = ws.robot_id
WHERE ws.id = ?
LIMIT 1
`, workstationID.Int64)
if err != nil && err != sql.ErrNoRows {
return "", fmt.Errorf("resolve asset_id for episode %d workstation %d: %w", episodeID, workstationID.Int64, err)
}
if err == nil && row.RobotID.Valid && row.RobotID.Int64 > 0 {
if !row.ResolvedRobotID.Valid || row.ResolvedRobotID.Int64 <= 0 {
if metadataAssetID != "" {
return metadataAssetID, nil
}
return "", fmt.Errorf("episode %d workstation %d robot %d not found while resolving asset_id", episodeID, workstationID.Int64, row.RobotID.Int64)
}
assetID := strings.TrimSpace(row.AssetID.String)
if row.AssetID.Valid && assetID != "" {
return assetID, nil
}
return "", fmt.Errorf("episode %d workstation %d has no robot asset_id", episodeID, workstationID.Int64)
}
}
if metadataAssetID != "" {
return metadataAssetID, nil
}
if db == nil {
return "", fmt.Errorf("database is not available")
}
if !workstationID.Valid || workstationID.Int64 <= 0 {
return "", fmt.Errorf("episode %d has no asset_id metadata and no workstation_id", episodeID)
}

var row struct {
AssetID sql.NullString `db:"asset_id"`
}
err := db.GetContext(ctx, &row, `
SELECT r.asset_id
FROM workstations ws
LEFT JOIN robots r ON r.id = ws.robot_id
WHERE ws.id = ?
LIMIT 1
`, workstationID.Int64)
if err == sql.ErrNoRows {
return "", fmt.Errorf("episode %d workstation %d not found while resolving asset_id", episodeID, workstationID.Int64)
}
if err != nil {
return "", fmt.Errorf("resolve asset_id for episode %d workstation %d: %w", episodeID, workstationID.Int64, err)
}
assetID := strings.TrimSpace(row.AssetID.String)
if !row.AssetID.Valid || assetID == "" {
return "", fmt.Errorf("episode %d workstation %d has no robot asset_id", episodeID, workstationID.Int64)
}
return assetID, nil
return "", fmt.Errorf("episode %d workstation %d not found while resolving asset_id", episodeID, workstationID.Int64)
}
62 changes: 60 additions & 2 deletions internal/services/dp_asset_resolver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,15 +42,73 @@ func newTestAssetResolverDB(t *testing.T) *sqlx.DB {
return db
}

func TestResolveAssetIDForEpisode_MetadataWins(t *testing.T) {
func TestResolveAssetIDForEpisode_CurrentRobotAssetWinsOverMetadata(t *testing.T) {
db := newTestAssetResolverDB(t)
if _, err := db.Exec(`INSERT INTO robots (id, device_id, asset_id) VALUES (1, 'local-device', 'fallback-asset')`); err != nil {
if _, err := db.Exec(`INSERT INTO robots (id, device_id, asset_id) VALUES (1, 'local-device', 'current-asset')`); err != nil {
t.Fatalf("seed robot: %v", err)
}
if _, err := db.Exec(`INSERT INTO workstations (id, robot_id) VALUES (10, 1)`); err != nil {
t.Fatalf("seed workstation: %v", err)
}

got, err := resolveAssetIDForEpisode(
context.Background(),
db,
1,
sql.NullString{String: `{"asset_id":" snapshot-asset "}`, Valid: true},
sql.NullInt64{Int64: 10, Valid: true},
)
if err != nil {
t.Fatalf("resolveAssetIDForEpisode() error = %v", err)
}
if got != "current-asset" {
t.Fatalf("asset_id=%q want current-asset", got)
}
}

func TestResolveAssetIDForEpisode_UnboundRobotDoesNotUseStaleMetadata(t *testing.T) {
db := newTestAssetResolverDB(t)
if _, err := db.Exec(`INSERT INTO robots (id, device_id, asset_id) VALUES (1, 'local-device', NULL)`); err != nil {
t.Fatalf("seed robot: %v", err)
}
if _, err := db.Exec(`INSERT INTO workstations (id, robot_id) VALUES (10, 1)`); err != nil {
t.Fatalf("seed workstation: %v", err)
}

_, err := resolveAssetIDForEpisode(
context.Background(),
db,
1,
sql.NullString{String: `{"asset_id":"stale-asset"}`, Valid: true},
sql.NullInt64{Int64: 10, Valid: true},
)
if err == nil || !strings.Contains(err.Error(), "asset_id") {
t.Fatalf("error=%v want asset_id missing error", err)
}
}

func TestResolveAssetIDForEpisode_MetadataFallbackWhenNoWorkstation(t *testing.T) {
got, err := resolveAssetIDForEpisode(
context.Background(),
nil,
1,
sql.NullString{String: `{"asset_id":" snapshot-asset "}`, Valid: true},
sql.NullInt64{},
)
if err != nil {
t.Fatalf("resolveAssetIDForEpisode() error = %v", err)
}
if got != "snapshot-asset" {
t.Fatalf("asset_id=%q want snapshot-asset", got)
}
}

func TestResolveAssetIDForEpisode_MetadataFallbackWhenRobotMissing(t *testing.T) {
db := newTestAssetResolverDB(t)
if _, err := db.Exec(`INSERT INTO workstations (id, robot_id) VALUES (10, 999)`); err != nil {
t.Fatalf("seed workstation: %v", err)
}

got, err := resolveAssetIDForEpisode(
context.Background(),
db,
Expand Down
Loading
Loading