From b047bc1ea6be9135ccb6495fcd7a0dbb69507194 Mon Sep 17 00:00:00 2001 From: chaoliu Date: Mon, 15 Jun 2026 12:15:57 +0800 Subject: [PATCH] feat(robots): manage cloud asset binding --- .../cloud-sync-go-direct-upload.zh.html | 125 ++++-- internal/api/handlers/robot.go | 394 +++++++++++++++--- internal/api/handlers/robot_test.go | 125 +++++- internal/server/server.go | 2 +- internal/services/dp_asset_resolver.go | 58 +-- internal/services/dp_asset_resolver_test.go | 62 ++- internal/services/dp_config_loader.go | 109 ++++- internal/services/dp_config_loader_test.go | 34 ++ 8 files changed, 770 insertions(+), 139 deletions(-) diff --git a/docs/designs/cloud-sync-go-direct-upload.zh.html b/docs/designs/cloud-sync-go-direct-upload.zh.html index 65f0a88..902590f 100644 --- a/docs/designs/cloud-sync-go-direct-upload.zh.html +++ b/docs/designs/cloud-sync-go-direct-upload.zh.html @@ -317,7 +317,7 @@

目标与非目标

目标

+

后台候选过滤规则

+
    +
  • 候选接口复用同一份 DP config 解析逻辑,但返回的是“可绑定云资产编号”列表,不返回密钥、tags 或配置路径。
  • +
  • config 文件不可读、JSON 不可解析、version 不支持、endpoint 无效、存在空 deviceId 或重复 deviceId 时,候选接口整体失败。
  • +
  • 单个 profile 的 apiKey 为空、tags 为空、tag key 为空,或 deviceId 不满足 Keystone asset_id 基础校验时,该 profile 从候选中隐藏。
  • +
  • 已被其他 active robot 绑定的 deviceId 从候选中隐藏;软删除 robot 不占用云资产编号。
  • +
  • 候选接口每次请求都重新读取 config 文件;前端每次打开选择器面板时请求一次,不要求 Keystone 重启。
  • +
+

Endpoint 与 TLS 规则

  • https://host[:port] 使用 TLS gRPC;未写端口时补 443;TLS CA 使用系统 CA,server name 使用 URL host。
  • @@ -644,7 +701,7 @@

    直连上传流程

    1. 领取 episode:沿用原有 cloud sync worker 的自动扫描、手动触发、重试和并发控制。
    2. 加载 episode:读取 MCAP MinIO key、sidecar path、metadata、workstation id 和任务上下文。
    -
    3. 解析 asset_id:优先使用 episodes.metadata.asset_id,否则通过历史 workstation 反查 robots.asset_id
    +
    3. 解析 asset_id:优先通过历史 workstation 反查当前 robots.asset_id,无法追溯 workstation / robot 时才用 episodes.metadata.asset_id 兜底。
    4. 加载 DP config:从 KEYSTONE_SYNC_DP_CONFIG 读取 device profile 和 endpoints。
    5. 构造 raw tags:合并 device tags、reserved tags、sidecar tags 和 Keystone extra tags,执行冲突校验。
    6. 构造 direct uploader:为本次 episode 创建专用 AuthClientGatewayClientcloud.Uploader
    @@ -700,19 +757,28 @@

    实施步骤

    1. 复用 robots.asset_id 字段,增加 active 非空唯一约束; - create / update 实现 trim、控制字符校验、“首次非空设置后不可修改、不可清空、同值幂等”。 + create 和专用绑定接口实现 trim、控制字符校验、active 非空唯一与 DP profile 可绑定校验。 +
    2. +
    3. + 普通 robot update 拒绝 asset_id;新增 + GET /robots/cloud-asset-optionsPUT /robots/:id/cloud-asset、 + DELETE /robots/:id/cloud-asset,绑定 / 更换 / 解绑成功后返回完整 robot 对象。 +
    4. +
    5. + Synapse 机器人新建 / 编辑弹窗中的“云资产编号”复用现有 RemoteSelect; + 下拉打开时读取候选,编辑态只更新表单草稿,保存时再按差异调用绑定 / 更换 / 解绑接口。
    6. episode 创建时解析 task -> workstation -> robot 的 asset_id,非空时写入 episodes.metadata.asset_id,但缺失不阻止 episode 创建。
    7. - 新增 DP config loader,读取 SyncConfig.DPConfigPath / KEYSTONE_SYNC_DP_CONFIG, - 校验 version、endpoint、重复 device id、空 apiKey 和空 tags,并按 asset_id 返回 profile。 + 提取 DP config 公共解析与 profile 校验逻辑,读取 SyncConfig.DPConfigPath / + KEYSTONE_SYNC_DP_CONFIG,供候选接口、绑定接口和上传 loader 复用。
    8. - 给 SyncWorker 增加 asset_id resolver:优先读 episodes.metadata.asset_id, - 缺失时允许读取软删除 workstation 并反查 robots.asset_id。 + 给 SyncWorker 增加 asset_id resolver:优先读取软删除 workstation 并反查当前 + robots.asset_id;无法追溯 workstation / robot 时才使用 episodes.metadata.asset_id 兜底。
    9. 新增 raw tags builder,包含两个 reserved key 常量、非冲突插入、MinIO basename 选择、strict sidecar @@ -760,13 +826,18 @@

      风险与处理

      asset_id 填错 - 本地 robot 会永久绑定错误的 Data Platform device。 - 字段首次非空设置后不可修改;未来由自动化流程写入,第一版不提供 break-glass 维护入口。 + 之后发起的同步会使用错误的 Data Platform device 身份。 + 前端不允许自由输入,只能从可绑定 DP profile 中选择;已绑定 robot 支持受控更换,更换会影响之后发起的云同步。 + + + 解绑后继续云同步 + 该机器人之后发起的云同步缺少 asset_id,本次 sync 失败。 + 解绑需要二次确认;历史 episode 的 metadata.asset_id 不自动改写,但同步解析优先使用当前 robot 绑定。 device profile 缺失或不完整 上传前失败,episode 本次 sync 失败。 - 错误信息包含 asset_id 和 config path,但不打印 api key;提示现场执行 dp device initdp device reinit。 + 候选下拉隐藏不可用 profile;上传 worker 错误信息包含 asset_id 和 config path,但不打印 api key。 endpoint / TLS 配置错误 @@ -800,10 +871,12 @@

      风险与处理

      测试计划

        -
      • 单元测试 robot API / 存储:asset_id 可首次设置、同值幂等、不可修改、不可清空、active 非空唯一。
      • +
      • 单元测试 robot API / 存储:创建可不绑定;创建绑定需通过 DP profile 校验;普通 update 拒绝 asset_id;专用接口支持绑定、更换、解绑、同值幂等和 active 非空唯一。
      • +
      • 单元测试 cloud asset options:过滤已占用、软删除不占用、隐藏不可用 profile、按 device_id 升序、q trim 且大小写不敏感、config 结构错误返回错误。
      • 单元测试 episode 创建:有 asset_id 时写入 episodes.metadata.asset_id,缺失时仍创建 episode。
      • -
      • 单元测试 asset_id resolver:metadata 优先、fallback 读取软删除 workstation、缺失时报 non-retryable 错误、不 fallback 到 robots.device_id
      • -
      • 单元测试 DP config loader:version、endpoint scheme/TLS、禁止 path/query/fragment、成功选择 device、缺失 device、空 apiKey、空 tags、重复 deviceId。
      • +
      • 单元测试 asset_id resolver:当前 robot 绑定优先、解绑后不使用 stale metadata、无 workstation 时 metadata 遗留兜底、缺失时报 non-retryable 错误、不 fallback 到 robots.device_id
      • +
      • 单元测试 DP config loader:version、endpoint scheme/TLS、禁止 path/query/fragment、成功选择 device、缺失 device、空 apiKey、空 tags、空 tag key、重复 deviceId。
      • +
      • 前端组件测试 / 手动验证:新建可选可清空;编辑态选择 / 清空后取消不产生副作用;保存时首次绑定直接提交;已绑定更换 / 解绑需要确认;加载、空态、搜索无结果和错误文案正确。
      • 单元测试 raw tags builder:合并顺序、reserved device tag 注入、raw_file 使用 MinIO basename、相同 key 相同 value 幂等、相同 key 不同 value 报错、空 value 保留。
      • 单元测试 SyncWorker 错误分类:non-retryable failed 写 next_retry_at=NULL,auto scan 跳过,manual sync 可重新尝试。
      • 单元测试 uploader 持久化恢复:同 MCAP key 但 asset_id 不同不复用旧 state。
      • diff --git a/internal/api/handlers/robot.go b/internal/api/handlers/robot.go index 94fe64e..5a4c166 100644 --- a/internal/api/handlers/robot.go +++ b/internal/api/handlers/robot.go @@ -26,17 +26,23 @@ import ( // RobotHandler handles robot related HTTP requests. type RobotHandler struct { - db *sqlx.DB - recorderHub *services.RecorderHub - transferHub *services.TransferHub + db *sqlx.DB + recorderHub *services.RecorderHub + transferHub *services.TransferHub + dpConfigPath string } // NewRobotHandler creates a new RobotHandler. -func NewRobotHandler(db *sqlx.DB, recorderHub *services.RecorderHub, transferHub *services.TransferHub) *RobotHandler { +func NewRobotHandler(db *sqlx.DB, recorderHub *services.RecorderHub, transferHub *services.TransferHub, dpConfigPath ...string) *RobotHandler { + configPath := "" + if len(dpConfigPath) > 0 { + configPath = strings.TrimSpace(dpConfigPath[0]) + } return &RobotHandler{ - db: db, - recorderHub: recorderHub, - transferHub: transferHub, + db: db, + recorderHub: recorderHub, + transferHub: transferHub, + dpConfigPath: configPath, } } @@ -69,6 +75,21 @@ type RobotListResponse struct { HasPrev bool `json:"hasPrev,omitempty"` } +// CloudAssetOption is one bindable cloud asset choice for robot management. +type CloudAssetOption struct { + DeviceID string `json:"device_id"` +} + +// CloudAssetOptionsResponse represents available cloud asset choices. +type CloudAssetOptionsResponse struct { + Items []CloudAssetOption `json:"items"` +} + +// BindCloudAssetRequest represents a robot cloud asset binding request. +type BindCloudAssetRequest struct { + AssetID string `json:"asset_id"` +} + // DeviceConnectionResponse is an in-memory connection snapshot keyed by Axon device_id (no database access). type DeviceConnectionResponse struct { DeviceID string `json:"device_id"` @@ -106,9 +127,12 @@ type CreateRobotResponse struct { func (h *RobotHandler) RegisterRoutes(apiV1 *gin.RouterGroup) { apiV1.GET("/robots", h.ListRobots) apiV1.POST("/robots", h.CreateRobot) + apiV1.GET("/robots/cloud-asset-options", h.ListCloudAssetOptions) apiV1.GET("/devices/:device_id/connection", h.GetDeviceConnection) apiV1.GET("/robots/:id", h.GetRobot) apiV1.PUT("/robots/:id", h.UpdateRobot) + apiV1.PUT("/robots/:id/cloud-asset", h.BindCloudAsset) + apiV1.DELETE("/robots/:id/cloud-asset", h.UnbindCloudAsset) apiV1.DELETE("/robots/:id", h.DeleteRobot) } @@ -159,6 +183,30 @@ func assetIDValue(ns sql.NullString) string { return strings.TrimSpace(ns.String) } +func isDPConfigSystemError(err error) bool { + if err == nil { + return false + } + msg := err.Error() + return strings.Contains(msg, "KEYSTONE_SYNC_DP_CONFIG") || + strings.Contains(msg, "read DP config") || + strings.Contains(msg, "parse DP config") || + strings.Contains(msg, "unsupported version") || + strings.Contains(msg, "endpoints.") || + strings.Contains(msg, "deviceId is empty") || + strings.Contains(msg, "duplicate deviceId") +} + +func (h *RobotHandler) respondCloudAssetValidationError(c *gin.Context, err error, action string) { + if isDPConfigSystemError(err) { + logger.Printf("[ROBOT] Failed to %s cloud asset using DP config: %v", action, err) + c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to validate cloud asset"}) + return + } + logger.Printf("[ROBOT] Cloud asset is not bindable while %s: %v", action, err) + c.JSON(http.StatusBadRequest, gin.H{"error": "asset_id is not bindable"}) +} + func (h *RobotHandler) assetIDInUse(assetID string, excludeRobotID int64) (bool, error) { assetID = strings.TrimSpace(assetID) if assetID == "" { @@ -178,6 +226,25 @@ func (h *RobotHandler) assetIDInUse(assetID string, excludeRobotID int64) (bool, return exists, nil } +func (h *RobotHandler) loadRobotRow(id int64) (robotRow, error) { + var r robotRow + err := h.db.Get(&r, ` + SELECT + r.id, + r.robot_type_id, + r.device_id, + r.factory_id, + r.asset_id, + r.status, + r.metadata, + r.created_at, + r.updated_at + FROM robots r + WHERE r.id = ? AND r.deleted_at IS NULL + `, id) + return r, err +} + func (h *RobotHandler) connectionState(deviceID string) (connected bool, connectedAt string) { connected, connectedAt, _, _ = h.connectionStateDetailed(deviceID) return connected, connectedAt @@ -484,6 +551,94 @@ func (h *RobotHandler) ListRobots(c *gin.Context) { }) } +// ListCloudAssetOptions lists bindable cloud asset IDs from the DP config. +// +// @Summary List cloud asset options +// @Description Lists cloud asset IDs that can be bound to a robot +// @Tags robots +// @Produce json +// @Param robot_id query string false "Current robot ID to exclude from occupancy checks" +// @Param q query string false "Case-insensitive device ID search" +// @Success 200 {object} CloudAssetOptionsResponse +// @Failure 400 {object} map[string]string +// @Failure 500 {object} map[string]string +// @Router /robots/cloud-asset-options [get] +func (h *RobotHandler) ListCloudAssetOptions(c *gin.Context) { + excludeRobotID := int64(0) + robotIDRaw := strings.TrimSpace(c.Query("robot_id")) + if robotIDRaw != "" { + id, err := strconv.ParseInt(robotIDRaw, 10, 64) + if err != nil || id <= 0 { + c.JSON(http.StatusBadRequest, gin.H{"error": "invalid robot_id format"}) + return + } + excludeRobotID = id + } + query := strings.ToLower(strings.TrimSpace(c.Query("q"))) + + currentAssetID := "" + if excludeRobotID > 0 { + current, err := h.loadRobotRow(excludeRobotID) + if err == sql.ErrNoRows { + c.JSON(http.StatusNotFound, gin.H{"error": "robot not found"}) + return + } + if err != nil { + logger.Printf("[ROBOT] Failed to query robot for cloud asset options: %v", err) + c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to list cloud asset options"}) + return + } + currentAssetID = assetIDValue(current.AssetID) + } + + profiles, err := services.ListDPDeviceProfiles(h.dpConfigPath) + if err != nil { + logger.Printf("[ROBOT] Failed to list cloud asset options: %v", err) + c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to list cloud asset options"}) + return + } + + var occupied []string + occupiedQuery := "SELECT asset_id FROM robots WHERE asset_id IS NOT NULL AND asset_id <> '' AND deleted_at IS NULL" + args := []interface{}{} + if excludeRobotID > 0 { + occupiedQuery += " AND id <> ?" + args = append(args, excludeRobotID) + } + if err := h.db.Select(&occupied, occupiedQuery, args...); err != nil { + logger.Printf("[ROBOT] Failed to query occupied cloud assets: %v", err) + c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to list cloud asset options"}) + return + } + occupiedSet := make(map[string]struct{}, len(occupied)) + for _, assetID := range occupied { + if value := strings.TrimSpace(assetID); value != "" { + occupiedSet[value] = struct{}{} + } + } + + options := make([]CloudAssetOption, 0, len(profiles)) + for _, profile := range profiles { + deviceID := strings.TrimSpace(profile.DeviceID) + assetID, err := normalizeAssetID(deviceID) + if err != nil || !assetID.Valid { + continue + } + if _, used := occupiedSet[assetID.String]; used { + continue + } + if currentAssetID != "" && assetID.String == currentAssetID { + continue + } + if query != "" && !strings.Contains(strings.ToLower(assetID.String), query) { + continue + } + options = append(options, CloudAssetOption{DeviceID: assetID.String}) + } + + c.JSON(http.StatusOK, CloudAssetOptionsResponse{Items: options}) +} + // CreateRobot handles robot creation requests. // // @Summary Create robot @@ -527,6 +682,10 @@ func (h *RobotHandler) CreateRobot(c *gin.Context) { return } if assetID.Valid { + if err := services.ValidateDPDeviceProfile(h.dpConfigPath, assetID.String); err != nil { + h.respondCloudAssetValidationError(c, err, "create robot with") + return + } inUse, err := h.assetIDInUse(assetID.String, 0) if err != nil { logger.Printf("[ROBOT] Failed to check asset_id uniqueness: %v", err) @@ -766,6 +925,11 @@ func (h *RobotHandler) UpdateRobot(c *gin.Context) { return } + if len(req.AssetID) > 0 { + c.JSON(http.StatusBadRequest, gin.H{"error": "asset_id must be managed through cloud asset binding API"}) + return + } + var current struct { AssetID sql.NullString `db:"asset_id"` } @@ -822,47 +986,6 @@ func (h *RobotHandler) UpdateRobot(c *gin.Context) { args = append(args, deviceID) } - if len(req.AssetID) > 0 { - var rawAssetID string - meta := bytes.TrimSpace(req.AssetID) - if bytes.Equal(meta, []byte("null")) { - rawAssetID = "" - } else if err := json.Unmarshal(req.AssetID, &rawAssetID); err != nil { - c.JSON(http.StatusBadRequest, gin.H{"error": "asset_id must be a string or null"}) - return - } - assetID, err := normalizeAssetID(rawAssetID) - if err != nil { - c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()}) - return - } - currentAssetID := assetIDValue(current.AssetID) - if currentAssetID != "" { - if !assetID.Valid { - c.JSON(http.StatusBadRequest, gin.H{"error": "asset_id cannot be cleared once set"}) - return - } - if assetID.String != currentAssetID { - c.JSON(http.StatusBadRequest, gin.H{"error": "asset_id cannot be changed once set"}) - return - } - } - if assetID.Valid && assetID.String != currentAssetID { - inUse, err := h.assetIDInUse(assetID.String, id) - if err != nil { - logger.Printf("[ROBOT] Failed to check asset_id uniqueness: %v", err) - c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to update robot"}) - return - } - if inUse { - c.JSON(http.StatusConflict, gin.H{"error": "asset_id is already assigned to another robot"}) - return - } - } - updates = append(updates, "asset_id = ?") - args = append(args, assetID) - } - if req.FactoryID != nil { if *req.FactoryID == "" { c.JSON(http.StatusBadRequest, gin.H{"error": "factory_id cannot be empty"}) @@ -1021,6 +1144,181 @@ func (h *RobotHandler) UpdateRobot(c *gin.Context) { c.JSON(http.StatusOK, h.responseFromRow(r)) } +// BindCloudAsset binds or changes a robot cloud asset ID. +// +// @Summary Bind robot cloud asset +// @Description Binds or changes the Data Platform device ID used for future uploads +// @Tags robots +// @Accept json +// @Produce json +// @Param id path string true "Robot ID" +// @Param body body BindCloudAssetRequest true "Cloud asset payload" +// @Success 200 {object} RobotResponse +// @Failure 400 {object} map[string]string +// @Failure 404 {object} map[string]string +// @Failure 409 {object} map[string]string +// @Failure 500 {object} map[string]string +// @Router /robots/{id}/cloud-asset [put] +func (h *RobotHandler) BindCloudAsset(c *gin.Context) { + idStr := c.Param("id") + id, err := strconv.ParseInt(idStr, 10, 64) + if err != nil { + c.JSON(http.StatusBadRequest, gin.H{"error": "invalid robot id"}) + return + } + + var req BindCloudAssetRequest + if err := c.ShouldBindJSON(&req); err != nil { + c.JSON(http.StatusBadRequest, gin.H{"error": "invalid request body"}) + return + } + assetID, err := normalizeAssetID(req.AssetID) + if err != nil { + c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()}) + return + } + if !assetID.Valid { + c.JSON(http.StatusBadRequest, gin.H{"error": "asset_id is required"}) + return + } + + current, err := h.loadRobotRow(id) + if err == sql.ErrNoRows { + c.JSON(http.StatusNotFound, gin.H{"error": "robot not found"}) + return + } + if err != nil { + logger.Printf("[ROBOT] Failed to query robot for cloud asset bind: %v", err) + c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to update robot"}) + return + } + + currentAssetID := assetIDValue(current.AssetID) + if assetID.String == currentAssetID { + c.JSON(http.StatusOK, h.responseFromRow(current)) + return + } + + if err := services.ValidateDPDeviceProfile(h.dpConfigPath, assetID.String); err != nil { + h.respondCloudAssetValidationError(c, err, "bind") + return + } + inUse, err := h.assetIDInUse(assetID.String, id) + if err != nil { + logger.Printf("[ROBOT] Failed to check asset_id uniqueness: %v", err) + c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to update robot"}) + return + } + if inUse { + c.JSON(http.StatusConflict, gin.H{"error": "asset_id is already assigned to another robot"}) + return + } + + now := time.Now().UTC() + result, err := h.db.Exec( + "UPDATE robots SET asset_id = ?, updated_at = ? WHERE id = ? AND deleted_at IS NULL", + assetID, + now, + id, + ) + if err != nil { + logger.Printf("[ROBOT] Failed to bind cloud asset: %v", err) + c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to update robot"}) + return + } + rowsAffected, err := result.RowsAffected() + if err != nil { + logger.Printf("[ROBOT] Failed to get rows affected for cloud asset bind: %v", err) + c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to update robot"}) + return + } + if rowsAffected == 0 { + c.JSON(http.StatusNotFound, gin.H{"error": "robot not found"}) + return + } + + updated, err := h.loadRobotRow(id) + if err != nil { + logger.Printf("[ROBOT] Failed to fetch cloud asset bound robot: %v", err) + c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to get updated robot"}) + return + } + if currentAssetID == "" { + logger.Printf("[ROBOT] Cloud asset bound: robot_id=%d asset_id=%s", id, assetID.String) + } else { + logger.Printf("[ROBOT] Cloud asset changed: robot_id=%d old_asset_id=%s new_asset_id=%s", id, currentAssetID, assetID.String) + } + c.JSON(http.StatusOK, h.responseFromRow(updated)) +} + +// UnbindCloudAsset clears a robot cloud asset ID. +// +// @Summary Unbind robot cloud asset +// @Description Clears the Data Platform device ID used for future uploads +// @Tags robots +// @Produce json +// @Param id path string true "Robot ID" +// @Success 200 {object} RobotResponse +// @Failure 400 {object} map[string]string +// @Failure 404 {object} map[string]string +// @Failure 500 {object} map[string]string +// @Router /robots/{id}/cloud-asset [delete] +func (h *RobotHandler) UnbindCloudAsset(c *gin.Context) { + idStr := c.Param("id") + id, err := strconv.ParseInt(idStr, 10, 64) + if err != nil { + c.JSON(http.StatusBadRequest, gin.H{"error": "invalid robot id"}) + return + } + + current, err := h.loadRobotRow(id) + if err == sql.ErrNoRows { + c.JSON(http.StatusNotFound, gin.H{"error": "robot not found"}) + return + } + if err != nil { + logger.Printf("[ROBOT] Failed to query robot for cloud asset unbind: %v", err) + c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to update robot"}) + return + } + + currentAssetID := assetIDValue(current.AssetID) + if currentAssetID == "" { + c.JSON(http.StatusOK, h.responseFromRow(current)) + return + } + + result, err := h.db.Exec( + "UPDATE robots SET asset_id = NULL, updated_at = ? WHERE id = ? AND deleted_at IS NULL", + time.Now().UTC(), + id, + ) + if err != nil { + logger.Printf("[ROBOT] Failed to unbind cloud asset: %v", err) + c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to update robot"}) + return + } + rowsAffected, err := result.RowsAffected() + if err != nil { + logger.Printf("[ROBOT] Failed to get rows affected for cloud asset unbind: %v", err) + c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to update robot"}) + return + } + if rowsAffected == 0 { + c.JSON(http.StatusNotFound, gin.H{"error": "robot not found"}) + return + } + + updated, err := h.loadRobotRow(id) + if err != nil { + logger.Printf("[ROBOT] Failed to fetch cloud asset unbound robot: %v", err) + c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to get updated robot"}) + return + } + logger.Printf("[ROBOT] Cloud asset unbound: robot_id=%d old_asset_id=%s", id, currentAssetID) + c.JSON(http.StatusOK, h.responseFromRow(updated)) +} + // DeleteRobot handles robot deletion requests (soft delete). // // @Summary Delete robot diff --git a/internal/api/handlers/robot_test.go b/internal/api/handlers/robot_test.go index fa7c1e1..a262e4a 100644 --- a/internal/api/handlers/robot_test.go +++ b/internal/api/handlers/robot_test.go @@ -10,6 +10,8 @@ import ( "encoding/json" "net/http" "net/http/httptest" + "os" + "path/filepath" "strings" "testing" "time" @@ -265,12 +267,12 @@ func TestRobotHandlerListRobots_ConnectedFilterUsesHubIntersection(t *testing.T) }) } -func TestRobotHandlerAssetID_CreateUpdateAndList(t *testing.T) { +func TestRobotHandlerAssetID_CreateValidatesDPProfileAndList(t *testing.T) { db := newTestRobotHandlerDB(t) defer db.Close() seedRobotLookups(t, db) - r := newTestRobotRouter(t, db) + r := newTestRobotRouterWithDPConfig(t, db, writeRobotDPConfigFixture(t, robotDPConfigJSON("asset-1"))) req := httptest.NewRequest(http.MethodPost, "/api/v1/robots", bytes.NewBufferString(`{ "robot_type_id": "10", @@ -311,44 +313,56 @@ func TestRobotHandlerAssetID_CreateUpdateAndList(t *testing.T) { req.Header.Set("Content-Type", "application/json") w = httptest.NewRecorder() r.ServeHTTP(w, req) - if w.Code != http.StatusOK { - t.Fatalf("same-value update status=%d want=%d body=%s", w.Code, http.StatusOK, w.Body.String()) + if w.Code != http.StatusBadRequest { + t.Fatalf("ordinary update status=%d want=%d body=%s", w.Code, http.StatusBadRequest, w.Body.String()) } } -func TestRobotHandlerAssetID_ImmutableOnceSet(t *testing.T) { +func TestRobotHandlerCloudAsset_BindChangeUnbind(t *testing.T) { db := newTestRobotHandlerDB(t) defer db.Close() seedRobotLookups(t, db) - seedRobot(t, db, 1, "local-device-1", "asset-1", nil) + seedRobot(t, db, 1, "local-device-1", "", nil) - r := newTestRobotRouter(t, db) + r := newTestRobotRouterWithDPConfig(t, db, writeRobotDPConfigFixture(t, robotDPConfigJSON("asset-1", "asset-2"))) for _, tt := range []struct { - name string - body string + name string + method string + path string + body string + wantAsset string }{ - {name: "change rejected", body: `{"asset_id":"asset-2"}`}, - {name: "clear rejected", body: `{"asset_id":""}`}, - {name: "blank clear rejected", body: `{"asset_id":" "}`}, + {name: "bind", method: http.MethodPut, path: "/api/v1/robots/1/cloud-asset", body: `{"asset_id":" asset-1 "}`, wantAsset: "asset-1"}, + {name: "same value idempotent", method: http.MethodPut, path: "/api/v1/robots/1/cloud-asset", body: `{"asset_id":"asset-1"}`, wantAsset: "asset-1"}, + {name: "change", method: http.MethodPut, path: "/api/v1/robots/1/cloud-asset", body: `{"asset_id":"asset-2"}`, wantAsset: "asset-2"}, + {name: "unbind", method: http.MethodDelete, path: "/api/v1/robots/1/cloud-asset", wantAsset: ""}, + {name: "unbind idempotent", method: http.MethodDelete, path: "/api/v1/robots/1/cloud-asset", wantAsset: ""}, } { t.Run(tt.name, func(t *testing.T) { - req := httptest.NewRequest(http.MethodPut, "/api/v1/robots/1", bytes.NewBufferString(tt.body)) + req := httptest.NewRequest(tt.method, tt.path, bytes.NewBufferString(tt.body)) req.Header.Set("Content-Type", "application/json") w := httptest.NewRecorder() r.ServeHTTP(w, req) - if w.Code != http.StatusBadRequest { - t.Fatalf("status=%d want=%d body=%s", w.Code, http.StatusBadRequest, w.Body.String()) + if w.Code != http.StatusOK { + t.Fatalf("status=%d want=%d body=%s", w.Code, http.StatusOK, w.Body.String()) + } + var resp RobotResponse + if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil { + t.Fatalf("unmarshal response: %v body=%s", err, w.Body.String()) + } + if resp.AssetID != tt.wantAsset { + t.Fatalf("asset_id=%q want=%q response=%#v", resp.AssetID, tt.wantAsset, resp) } }) } - req := httptest.NewRequest(http.MethodPut, "/api/v1/robots/1", bytes.NewBufferString(`{"asset_id":null}`)) + req := httptest.NewRequest(http.MethodPut, "/api/v1/robots/1/cloud-asset", bytes.NewBufferString(`{"asset_id":" "}`)) req.Header.Set("Content-Type", "application/json") w := httptest.NewRecorder() r.ServeHTTP(w, req) if w.Code != http.StatusBadRequest { - t.Fatalf("null clear status=%d want=%d body=%s", w.Code, http.StatusBadRequest, w.Body.String()) + t.Fatalf("blank bind status=%d want=%d body=%s", w.Code, http.StatusBadRequest, w.Body.String()) } } @@ -360,7 +374,7 @@ func TestRobotHandlerAssetID_UniqueAmongActiveRobots(t *testing.T) { deletedAt := time.Now().UTC() seedRobot(t, db, 2, "deleted-device", "deleted-asset", &deletedAt) - r := newTestRobotRouter(t, db) + r := newTestRobotRouterWithDPConfig(t, db, writeRobotDPConfigFixture(t, robotDPConfigJSON("asset-1", "deleted-asset"))) req := httptest.NewRequest(http.MethodPost, "/api/v1/robots", bytes.NewBufferString(`{ "robot_type_id": "10", @@ -389,6 +403,49 @@ func TestRobotHandlerAssetID_UniqueAmongActiveRobots(t *testing.T) { } } +func TestRobotHandlerCloudAssetOptions(t *testing.T) { + db := newTestRobotHandlerDB(t) + defer db.Close() + seedRobotLookups(t, db) + seedRobot(t, db, 1, "local-device-1", "asset-1", nil) + seedRobot(t, db, 2, "local-device-2", "current-asset", nil) + deletedAt := time.Now().UTC() + seedRobot(t, db, 3, "deleted-device", "deleted-asset", &deletedAt) + + body := `{ + "version": 3, + "endpoints": {"auth": "auth:1", "gateway": "gateway:2"}, + "devices": [ + {"deviceId":"asset-1","apiKey":"key","tags":{"k":"v"}}, + {"deviceId":"asset-2","apiKey":"key","tags":{"k":"v"}}, + {"deviceId":"current-asset","apiKey":"key","tags":{"k":"v"}}, + {"deviceId":"deleted-asset","apiKey":"key","tags":{"k":"v"}}, + {"deviceId":"bad-api","apiKey":" ","tags":{"k":"v"}}, + {"deviceId":"bad-tags","apiKey":"key","tags":{}} + ] + }` + r := newTestRobotRouterWithDPConfig(t, db, writeRobotDPConfigFixture(t, body)) + + req := httptest.NewRequest(http.MethodGet, "/api/v1/robots/cloud-asset-options?robot_id=2&q=ASSET", nil) + w := httptest.NewRecorder() + r.ServeHTTP(w, req) + if w.Code != http.StatusOK { + t.Fatalf("status=%d want=%d body=%s", w.Code, http.StatusOK, w.Body.String()) + } + var resp CloudAssetOptionsResponse + if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil { + t.Fatalf("unmarshal options: %v body=%s", err, w.Body.String()) + } + got := make([]string, 0, len(resp.Items)) + for _, item := range resp.Items { + got = append(got, item.DeviceID) + } + want := []string{"asset-2", "deleted-asset"} + if strings.Join(got, ",") != strings.Join(want, ",") { + t.Fatalf("options=%v want=%v", got, want) + } +} + func TestRobotHandlerAssetID_Validation(t *testing.T) { db := newTestRobotHandlerDB(t) defer db.Close() @@ -428,6 +485,14 @@ func newTestRobotRouter(t *testing.T, db *sqlx.DB) *gin.Engine { return newTestRobotRouterWithHubs(t, db, nil, nil) } +func newTestRobotRouterWithDPConfig(t *testing.T, db *sqlx.DB, dpConfigPath string) *gin.Engine { + t.Helper() + gin.SetMode(gin.TestMode) + r := gin.New() + NewRobotHandler(db, nil, nil, dpConfigPath).RegisterRoutes(r.Group("/api/v1")) + return r +} + func newTestRobotRouterWithHubs(t *testing.T, db *sqlx.DB, recorderHub *services.RecorderHub, transferHub *services.TransferHub) *gin.Engine { t.Helper() gin.SetMode(gin.TestMode) @@ -436,6 +501,30 @@ func newTestRobotRouterWithHubs(t *testing.T, db *sqlx.DB, recorderHub *services return r } +func writeRobotDPConfigFixture(t *testing.T, body string) string { + t.Helper() + path := filepath.Join(t.TempDir(), "dp-config.json") + if err := os.WriteFile(path, []byte(body), 0o600); err != nil { + t.Fatalf("write DP config fixture: %v", err) + } + return path +} + +func robotDPConfigJSON(deviceIDs ...string) string { + var b strings.Builder + b.WriteString(`{"version":3,"endpoints":{"auth":"auth:1","gateway":"gateway:2"},"devices":[`) + for i, deviceID := range deviceIDs { + if i > 0 { + b.WriteString(",") + } + b.WriteString(`{"deviceId":"`) + b.WriteString(deviceID) + b.WriteString(`","apiKey":"key","tags":{"k":"v"}}`) + } + b.WriteString(`]}`) + return b.String() +} + func connectRecorderForTest(t *testing.T, hub *services.RecorderHub, deviceID string, connectedAt time.Time) { t.Helper() conn := hub.NewRecorderConn(nil, deviceID, "127.0.0.1") diff --git a/internal/server/server.go b/internal/server/server.go index b4e96ba..5100689 100644 --- a/internal/server/server.go +++ b/internal/server/server.go @@ -143,7 +143,7 @@ func New(cfg *config.Config, db *sqlx.DB, s3Client *s3.Client, syncWorker *servi if db != nil { batchHandler = handlers.NewBatchHandler(db, recorderHub, recorderRPCTimeout) robotTypeHandler = handlers.NewRobotTypeHandler(db) - robotHandler = handlers.NewRobotHandler(db, recorderHub, transferHub) + robotHandler = handlers.NewRobotHandler(db, recorderHub, transferHub, cfg.Sync.DPConfigPath) deviceRegistrationHandler = handlers.NewDeviceRegistrationHandler(db) factoryHandler = handlers.NewFactoryHandler(db) dataCollectorHandler = handlers.NewDataCollectorHandler(db) diff --git a/internal/services/dp_asset_resolver.go b/internal/services/dp_asset_resolver.go index 78f3e29..05db684 100644 --- a/internal/services/dp_asset_resolver.go +++ b/internal/services/dp_asset_resolver.go @@ -27,8 +27,39 @@ func assetIDFromEpisodeMetadata(metadata sql.NullString) string { } func resolveAssetIDForEpisode(ctx context.Context, db *sqlx.DB, episodeID int64, metadata sql.NullString, workstationID sql.NullInt64) (string, error) { - if assetID := assetIDFromEpisodeMetadata(metadata); assetID != "" { - return assetID, nil + metadataAssetID := assetIDFromEpisodeMetadata(metadata) + if db != nil && workstationID.Valid && workstationID.Int64 > 0 { + var row struct { + RobotID sql.NullInt64 `db:"robot_id"` + ResolvedRobotID sql.NullInt64 `db:"resolved_robot_id"` + AssetID sql.NullString `db:"asset_id"` + } + err := db.GetContext(ctx, &row, ` + SELECT ws.robot_id, r.id AS resolved_robot_id, r.asset_id + FROM workstations ws + LEFT JOIN robots r ON r.id = ws.robot_id + WHERE ws.id = ? + LIMIT 1 + `, workstationID.Int64) + if err != nil && err != sql.ErrNoRows { + return "", fmt.Errorf("resolve asset_id for episode %d workstation %d: %w", episodeID, workstationID.Int64, err) + } + if err == nil && row.RobotID.Valid && row.RobotID.Int64 > 0 { + if !row.ResolvedRobotID.Valid || row.ResolvedRobotID.Int64 <= 0 { + if metadataAssetID != "" { + return metadataAssetID, nil + } + return "", fmt.Errorf("episode %d workstation %d robot %d not found while resolving asset_id", episodeID, workstationID.Int64, row.RobotID.Int64) + } + assetID := strings.TrimSpace(row.AssetID.String) + if row.AssetID.Valid && assetID != "" { + return assetID, nil + } + return "", fmt.Errorf("episode %d workstation %d has no robot asset_id", episodeID, workstationID.Int64) + } + } + if metadataAssetID != "" { + return metadataAssetID, nil } if db == nil { return "", fmt.Errorf("database is not available") @@ -36,26 +67,5 @@ func resolveAssetIDForEpisode(ctx context.Context, db *sqlx.DB, episodeID int64, if !workstationID.Valid || workstationID.Int64 <= 0 { return "", fmt.Errorf("episode %d has no asset_id metadata and no workstation_id", episodeID) } - - var row struct { - AssetID sql.NullString `db:"asset_id"` - } - err := db.GetContext(ctx, &row, ` - SELECT r.asset_id - FROM workstations ws - LEFT JOIN robots r ON r.id = ws.robot_id - WHERE ws.id = ? - LIMIT 1 - `, workstationID.Int64) - if err == sql.ErrNoRows { - return "", fmt.Errorf("episode %d workstation %d not found while resolving asset_id", episodeID, workstationID.Int64) - } - if err != nil { - return "", fmt.Errorf("resolve asset_id for episode %d workstation %d: %w", episodeID, workstationID.Int64, err) - } - assetID := strings.TrimSpace(row.AssetID.String) - if !row.AssetID.Valid || assetID == "" { - return "", fmt.Errorf("episode %d workstation %d has no robot asset_id", episodeID, workstationID.Int64) - } - return assetID, nil + return "", fmt.Errorf("episode %d workstation %d not found while resolving asset_id", episodeID, workstationID.Int64) } diff --git a/internal/services/dp_asset_resolver_test.go b/internal/services/dp_asset_resolver_test.go index d000738..309298e 100644 --- a/internal/services/dp_asset_resolver_test.go +++ b/internal/services/dp_asset_resolver_test.go @@ -42,15 +42,73 @@ func newTestAssetResolverDB(t *testing.T) *sqlx.DB { return db } -func TestResolveAssetIDForEpisode_MetadataWins(t *testing.T) { +func TestResolveAssetIDForEpisode_CurrentRobotAssetWinsOverMetadata(t *testing.T) { db := newTestAssetResolverDB(t) - if _, err := db.Exec(`INSERT INTO robots (id, device_id, asset_id) VALUES (1, 'local-device', 'fallback-asset')`); err != nil { + if _, err := db.Exec(`INSERT INTO robots (id, device_id, asset_id) VALUES (1, 'local-device', 'current-asset')`); err != nil { t.Fatalf("seed robot: %v", err) } if _, err := db.Exec(`INSERT INTO workstations (id, robot_id) VALUES (10, 1)`); err != nil { t.Fatalf("seed workstation: %v", err) } + got, err := resolveAssetIDForEpisode( + context.Background(), + db, + 1, + sql.NullString{String: `{"asset_id":" snapshot-asset "}`, Valid: true}, + sql.NullInt64{Int64: 10, Valid: true}, + ) + if err != nil { + t.Fatalf("resolveAssetIDForEpisode() error = %v", err) + } + if got != "current-asset" { + t.Fatalf("asset_id=%q want current-asset", got) + } +} + +func TestResolveAssetIDForEpisode_UnboundRobotDoesNotUseStaleMetadata(t *testing.T) { + db := newTestAssetResolverDB(t) + if _, err := db.Exec(`INSERT INTO robots (id, device_id, asset_id) VALUES (1, 'local-device', NULL)`); err != nil { + t.Fatalf("seed robot: %v", err) + } + if _, err := db.Exec(`INSERT INTO workstations (id, robot_id) VALUES (10, 1)`); err != nil { + t.Fatalf("seed workstation: %v", err) + } + + _, err := resolveAssetIDForEpisode( + context.Background(), + db, + 1, + sql.NullString{String: `{"asset_id":"stale-asset"}`, Valid: true}, + sql.NullInt64{Int64: 10, Valid: true}, + ) + if err == nil || !strings.Contains(err.Error(), "asset_id") { + t.Fatalf("error=%v want asset_id missing error", err) + } +} + +func TestResolveAssetIDForEpisode_MetadataFallbackWhenNoWorkstation(t *testing.T) { + got, err := resolveAssetIDForEpisode( + context.Background(), + nil, + 1, + sql.NullString{String: `{"asset_id":" snapshot-asset "}`, Valid: true}, + sql.NullInt64{}, + ) + if err != nil { + t.Fatalf("resolveAssetIDForEpisode() error = %v", err) + } + if got != "snapshot-asset" { + t.Fatalf("asset_id=%q want snapshot-asset", got) + } +} + +func TestResolveAssetIDForEpisode_MetadataFallbackWhenRobotMissing(t *testing.T) { + db := newTestAssetResolverDB(t) + if _, err := db.Exec(`INSERT INTO workstations (id, robot_id) VALUES (10, 999)`); err != nil { + t.Fatalf("seed workstation: %v", err) + } + got, err := resolveAssetIDForEpisode( context.Background(), db, diff --git a/internal/services/dp_config_loader.go b/internal/services/dp_config_loader.go index f85f773..733a228 100644 --- a/internal/services/dp_config_loader.go +++ b/internal/services/dp_config_loader.go @@ -10,6 +10,7 @@ import ( "net" "net/url" "os" + "sort" "strings" ) @@ -48,66 +49,134 @@ type DPDeviceUploadConfig struct { Profile DPDeviceProfile } -func loadDPDeviceUploadConfig(configPath string, assetID string) (*DPDeviceUploadConfig, error) { +func loadDPConfigFile(configPath string) (*DPConfigFile, DPResolvedEndpoint, DPResolvedEndpoint, error) { configPath = strings.TrimSpace(configPath) - assetID = strings.TrimSpace(assetID) if configPath == "" { - return nil, fmt.Errorf("KEYSTONE_SYNC_DP_CONFIG is required") - } - if assetID == "" { - return nil, fmt.Errorf("asset_id is required") + return nil, DPResolvedEndpoint{}, DPResolvedEndpoint{}, fmt.Errorf("KEYSTONE_SYNC_DP_CONFIG is required") } data, err := os.ReadFile(configPath) //nolint:gosec // operator-controlled config path if err != nil { - return nil, fmt.Errorf("read DP config %s: %w", configPath, err) + return nil, DPResolvedEndpoint{}, DPResolvedEndpoint{}, fmt.Errorf("read DP config %s: %w", configPath, err) } var cfg DPConfigFile if err := json.Unmarshal(data, &cfg); err != nil { - return nil, fmt.Errorf("parse DP config %s: %w", configPath, err) + return nil, DPResolvedEndpoint{}, DPResolvedEndpoint{}, fmt.Errorf("parse DP config %s: %w", configPath, err) } if cfg.Version != nil && *cfg.Version != 3 { - return nil, fmt.Errorf("DP config %s has unsupported version %d", configPath, *cfg.Version) + return nil, DPResolvedEndpoint{}, DPResolvedEndpoint{}, fmt.Errorf("DP config %s has unsupported version %d", configPath, *cfg.Version) } authEndpoint, err := parseDPResolvedEndpoint(cfg.Endpoints.Auth) if err != nil { - return nil, fmt.Errorf("invalid endpoints.auth in DP config %s: %w", configPath, err) + return nil, DPResolvedEndpoint{}, DPResolvedEndpoint{}, fmt.Errorf("invalid endpoints.auth in DP config %s: %w", configPath, err) } gatewayEndpoint, err := parseDPResolvedEndpoint(cfg.Endpoints.Gateway) if err != nil { - return nil, fmt.Errorf("invalid endpoints.gateway in DP config %s: %w", configPath, err) + return nil, DPResolvedEndpoint{}, DPResolvedEndpoint{}, fmt.Errorf("invalid endpoints.gateway in DP config %s: %w", configPath, err) } devices := make(map[string]DPDeviceProfile, len(cfg.Devices)) for idx, device := range cfg.Devices { deviceID := strings.TrimSpace(device.DeviceID) if deviceID == "" { - return nil, fmt.Errorf("DP config %s devices[%d].deviceId is empty", configPath, idx) + return nil, DPResolvedEndpoint{}, DPResolvedEndpoint{}, fmt.Errorf("DP config %s devices[%d].deviceId is empty", configPath, idx) } if _, exists := devices[deviceID]; exists { - return nil, fmt.Errorf("DP config %s has duplicate deviceId %q", configPath, deviceID) + return nil, DPResolvedEndpoint{}, DPResolvedEndpoint{}, fmt.Errorf("DP config %s has duplicate deviceId %q", configPath, deviceID) } device.DeviceID = deviceID devices[deviceID] = device + cfg.Devices[idx] = device } - profile, ok := devices[assetID] - if !ok { - return nil, fmt.Errorf("DP config %s has no device profile for asset_id %q", configPath, assetID) - } + return &cfg, authEndpoint, gatewayEndpoint, nil +} + +func requireUploadReadyDPDeviceProfile(configPath string, assetID string, profile DPDeviceProfile) (DPDeviceProfile, error) { profile.APIKey = strings.TrimSpace(profile.APIKey) if profile.APIKey == "" { - return nil, fmt.Errorf("DP config %s device %q apiKey is empty", configPath, assetID) + return DPDeviceProfile{}, fmt.Errorf("DP config %s device %q apiKey is empty", configPath, assetID) } if len(profile.Tags) == 0 { - return nil, fmt.Errorf("DP config %s device %q tags must be non-empty", configPath, assetID) + return DPDeviceProfile{}, fmt.Errorf("DP config %s device %q tags must be non-empty", configPath, assetID) } for key := range profile.Tags { if key == "" { - return nil, fmt.Errorf("DP config %s device %q has an empty tag key", configPath, assetID) + return DPDeviceProfile{}, fmt.Errorf("DP config %s device %q has an empty tag key", configPath, assetID) + } + } + return profile, nil +} + +func findDPDeviceProfile(cfg *DPConfigFile, assetID string) (DPDeviceProfile, bool) { + for _, device := range cfg.Devices { + if device.DeviceID == assetID { + return device, true + } + } + return DPDeviceProfile{}, false +} + +// ListDPDeviceProfiles returns upload-ready profiles from the data-platform config. +func ListDPDeviceProfiles(configPath string) ([]DPDeviceProfile, error) { + cfg, _, _, err := loadDPConfigFile(configPath) + if err != nil { + return nil, err + } + + profiles := make([]DPDeviceProfile, 0, len(cfg.Devices)) + for _, device := range cfg.Devices { + profile, err := requireUploadReadyDPDeviceProfile(configPath, device.DeviceID, device) + if err != nil { + continue } + profiles = append(profiles, profile) + } + sort.Slice(profiles, func(i, j int) bool { + return profiles[i].DeviceID < profiles[j].DeviceID + }) + return profiles, nil +} + +// ValidateDPDeviceProfile checks that one asset ID has an upload-ready device profile. +func ValidateDPDeviceProfile(configPath string, assetID string) error { + configPath = strings.TrimSpace(configPath) + assetID = strings.TrimSpace(assetID) + if assetID == "" { + return fmt.Errorf("asset_id is required") + } + cfg, _, _, err := loadDPConfigFile(configPath) + if err != nil { + return err + } + profile, ok := findDPDeviceProfile(cfg, assetID) + if !ok { + return fmt.Errorf("DP config %s has no device profile for asset_id %q", configPath, assetID) + } + _, err = requireUploadReadyDPDeviceProfile(configPath, assetID, profile) + return err +} + +func loadDPDeviceUploadConfig(configPath string, assetID string) (*DPDeviceUploadConfig, error) { + configPath = strings.TrimSpace(configPath) + assetID = strings.TrimSpace(assetID) + if assetID == "" { + return nil, fmt.Errorf("asset_id is required") + } + + cfg, authEndpoint, gatewayEndpoint, err := loadDPConfigFile(configPath) + if err != nil { + return nil, err + } + profile, ok := findDPDeviceProfile(cfg, assetID) + if !ok { + return nil, fmt.Errorf("DP config %s has no device profile for asset_id %q", configPath, assetID) + } + profile, err = requireUploadReadyDPDeviceProfile(configPath, assetID, profile) + if err != nil { + return nil, err } return &DPDeviceUploadConfig{ diff --git a/internal/services/dp_config_loader_test.go b/internal/services/dp_config_loader_test.go index deff985..ff5b7a7 100644 --- a/internal/services/dp_config_loader_test.go +++ b/internal/services/dp_config_loader_test.go @@ -174,3 +174,37 @@ func TestLoadDPDeviceUploadConfigRejectsContractErrors(t *testing.T) { }) } } + +func TestListDPDeviceProfilesFiltersProfileErrors(t *testing.T) { + body := `{ + "version":3, + "endpoints":{"auth":"auth:1","gateway":"gateway:2"}, + "devices":[ + {"deviceId":"z-ready","apiKey":"key","tags":{"k":"v"}}, + {"deviceId":"bad-api","apiKey":" ","tags":{"k":"v"}}, + {"deviceId":"bad-tags","apiKey":"key","tags":{}}, + {"deviceId":"bad-tag-key","apiKey":"key","tags":{"":"v"}}, + {"deviceId":"a-ready","apiKey":"key","tags":{"k":"v"}} + ] + }` + profiles, err := ListDPDeviceProfiles(writeDPConfigFixture(t, body)) + if err != nil { + t.Fatalf("ListDPDeviceProfiles() error = %v", err) + } + got := []string{} + for _, profile := range profiles { + got = append(got, profile.DeviceID) + } + want := []string{"a-ready", "z-ready"} + if strings.Join(got, ",") != strings.Join(want, ",") { + t.Fatalf("profiles=%v want=%v", got, want) + } +} + +func TestListDPDeviceProfilesRejectsConfigStructureErrors(t *testing.T) { + body := `{"version":3,"endpoints":{"auth":"auth:1","gateway":"gateway:2"},"devices":[{"deviceId":"asset-1","apiKey":"key","tags":{"k":"v"}},{"deviceId":" asset-1 ","apiKey":"key","tags":{"k":"v"}}]}` + _, err := ListDPDeviceProfiles(writeDPConfigFixture(t, body)) + if err == nil || !strings.Contains(err.Error(), "duplicate deviceId") { + t.Fatalf("error=%v want duplicate deviceId", err) + } +}