diff --git a/docs/designs/cloud-sync-go-direct-upload.zh.html b/docs/designs/cloud-sync-go-direct-upload.zh.html index 65f0a88..902590f 100644 --- a/docs/designs/cloud-sync-go-direct-upload.zh.html +++ b/docs/designs/cloud-sync-go-direct-upload.zh.html @@ -317,7 +317,7 @@
robots.asset_id 作为“云资产编号”,即本地 robot 与 Data Platform device 的稳定映射。robots.asset_id 作为“云资产编号”,即本地 robot 与 Data Platform device 的当前绑定关系。asset_id 快照写入 episodes.metadata.asset_id。KEYSTONE_SYNC_DP_CONFIG 指向的 data-platform config,按 asset_id 选择 device profile。apiKey 与 AuthService 交换 Bearer token。dp device init,device profile 由现场工程师提前初始化。asset_id。asset_id。CLISyncRunner、CLI sync API、CLI sync 配置和 cli_sync_runs 表迁移。
新路径保留原有 cloud sync worker 的触发、重试、状态更新和同步日志。每次处理 episode 时,
- worker 先解析 episode 对应的 asset_id,再根据该值读取 data-platform device
+ worker 先按 episode 关联的 workstation 解析当前 robot 绑定的 asset_id,再根据该值读取 data-platform device
profile,并用该 profile 的 API key 构造本次上传专用客户端。
Keystone DB episode
- -> resolve asset_id from episodes.metadata
- or fallback through episode.workstation_id -> workstations -> robots.asset_id
+ -> resolve asset_id through episode.workstation_id -> workstations -> robots.asset_id
+ or fallback to episodes.metadata.asset_id only when workstation/robot cannot be traced
-> load DP config from KEYSTONE_SYNC_DP_CONFIG
-> select devices[].deviceId == asset_id
-> build effective raw tags
@@ -396,8 +396,8 @@ 目标架构
Robot asset mapping
- 保存本地 robot 到 Data Platform device 的不可变映射。
- robots.asset_id、robot API、数据库迁移
+ 保存本地 robot 到 Data Platform device 的当前绑定,并提供受控绑定、更换和解绑。
+ robots.asset_id、robot API、数据库迁移、Synapse 远程下拉
Asset resolver
@@ -437,11 +437,58 @@ 云资产编号映射规则
Robot 字段规则
- robots.asset_id 初始允许为空,创建 robot 时可写可不写。
- - 首次设置非空后不可修改、不可清空;同值更新视为幂等。
+ robots.asset_id 初始允许为空,创建 robot 时可绑定可不绑定;不绑定不影响本地采集、QA 和入库。
+ - 非空
asset_id 必须来自 KEYSTONE_SYNC_DP_CONFIG 中可用且未被占用的 devices[].deviceId。
+ - 已有 robot 的云资产编号通过专用绑定接口管理,支持首次绑定、更换和解绑;普通 robot update 收到
asset_id 时应拒绝。
+ - 绑定、更换和解绑会影响之后发起的云同步;不自动改写历史 episode 的
metadata.asset_id。
- active robots 的非空
asset_id 必须唯一,软删除 robot 不占用唯一性。
- 保存前 trim;空字符串按 NULL;最大长度 100;不做 Data Platform device id 格式正则。
- - robot create / update / list / detail API 暴露
asset_id。
+ - robot create / list / detail API 暴露
asset_id;绑定、更换、解绑成功后返回完整 robot 对象。
+
+
+ 后台管理交互规则
+
+ - Synapse 机器人列表列名为“云资产编号”:有值显示完整编号,无值显示“未绑定”;列表和详情页不做 DP config 实时诊断。
+ - 新建和编辑弹窗第一行仍将“设备ID”和“云资产编号”并列展示;“云资产编号”复用后台表单现有远程下拉组件,不允许自由输入。
+ - 下拉每次打开时重新读取 DP config 候选项,不提供刷新按钮;搜索由候选接口按编号过滤。
+ - 候选项只显示完整云资产编号;长编号允许换行,不撑破弹窗布局。
+ - 新建 robot 时云资产编号可选、可清空,选择后随“创建”一起保存,不需要二次确认。
+ - 编辑 robot 时,云资产编号选择只修改表单草稿;点击“取消”不产生任何绑定副作用。
+ - 编辑已有未绑定 robot 时,选择云资产编号后随“保存”提交首次绑定,不需要二次确认。
+ - 编辑已有已绑定 robot 时,选择新编号后点击“保存”触发“更换云资产编号”确认。
+ - 编辑已有已绑定 robot 时,清空该字段后点击“保存”触发“解绑云资产编号”确认。
+ - 已绑定 robot 打开下拉时,当前编号通过 selected option 展示,即使该编号当前不在 DP config 候选中。
+ - 更换确认文案:将云资产编号从
{old} 更换为 {new}。
+ - 解绑确认文案:确认解绑云资产编号
{old}?
+ - 绑定 / 更换 / 解绑随表单保存成功后关闭弹窗并刷新列表。
+
+
+ 后台管理 API
+ GET /api/v1/robots/cloud-asset-options?robot_id=123&q=AB-F0001
+
+200 OK
+{
+ "items": [
+ { "device_id": "AB-F0001-T0001-000006" }
+ ]
+}
+
+PUT /api/v1/robots/:id/cloud-asset
+{
+ "asset_id": "AB-F0001-T0001-000006"
+}
+
+DELETE /api/v1/robots/:id/cloud-asset
+
+ robot_id 可选:不传用于新建;传入时用于排除其他 active robot 已占用的编号。
+ q 可选且 trim 后使用;搜索大小写不敏感,绑定校验仍大小写敏感。
+ - 候选接口不分页,只返回
items;不返回 config_path、total 或 profile 诊断信息。
+ - 候选列表按
device_id 升序排序,隐藏异常 profile 和已被其他 active robot 绑定的 profile;当前已绑定编号由前端 selected option 合并展示。
+ - DP config 整体不可读、不可解析、endpoint 无效、存在空
deviceId 或重复 deviceId 时,候选接口返回错误;前端只展示“无法加载云资产编号”。
+ - 候选加载、空态和搜索无结果沿用后台通用远程下拉组件文案。
+ PUT 传空 asset_id 返回 400,不执行解绑;解绑只走 DELETE,且 DELETE 幂等。
+ PUT 收到与当前相同的 asset_id 时幂等成功,不强制重新校验 DP config。
+ - 绑定 / 更换 / 解绑失败前端将错误挂到“云资产编号”字段或保存错误态;不展示底层 config 细节。
Episode 快照规则
@@ -453,12 +500,13 @@ Episode 快照规则
上传时解析优先级
- if episodes.metadata.asset_id is non-empty:
- use metadata.asset_id
-else:
+ if episode.workstation_id is non-empty:
load workstation by episode.workstation_id, including soft-deleted workstation rows
- load robots.asset_id by workstation.robot_id
+ load current robots.asset_id by workstation.robot_id
use robots.asset_id if non-empty
+ fail if the current robot is unbound
+else if episodes.metadata.asset_id is non-empty:
+ use metadata.asset_id as legacy fallback
if still empty:
fail as non-retryable configuration error
@@ -535,6 +583,15 @@ 解析规则
每个 episode 上传前重新读取 config 文件,避免长期进程缓存旧 device profile。
+ 后台候选过滤规则
+
+ - 候选接口复用同一份 DP config 解析逻辑,但返回的是“可绑定云资产编号”列表,不返回密钥、tags 或配置路径。
+ - config 文件不可读、JSON 不可解析、version 不支持、endpoint 无效、存在空
deviceId 或重复 deviceId 时,候选接口整体失败。
+ - 单个 profile 的
apiKey 为空、tags 为空、tag key 为空,或 deviceId 不满足 Keystone asset_id 基础校验时,该 profile 从候选中隐藏。
+ - 已被其他 active robot 绑定的
deviceId 从候选中隐藏;软删除 robot 不占用云资产编号。
+ - 候选接口每次请求都重新读取 config 文件;前端每次打开选择器面板时请求一次,不要求 Keystone 重启。
+
+
Endpoint 与 TLS 规则
https://host[:port] 使用 TLS gRPC;未写端口时补 443;TLS CA 使用系统 CA,server name 使用 URL host。
@@ -644,7 +701,7 @@ 直连上传流程
1. 领取 episode:沿用原有 cloud sync worker 的自动扫描、手动触发、重试和并发控制。
2. 加载 episode:读取 MCAP MinIO key、sidecar path、metadata、workstation id 和任务上下文。
- 3. 解析 asset_id:优先使用 episodes.metadata.asset_id,否则通过历史 workstation 反查 robots.asset_id。
+ 3. 解析 asset_id:优先通过历史 workstation 反查当前 robots.asset_id,无法追溯 workstation / robot 时才用 episodes.metadata.asset_id 兜底。
4. 加载 DP config:从 KEYSTONE_SYNC_DP_CONFIG 读取 device profile 和 endpoints。
5. 构造 raw tags:合并 device tags、reserved tags、sidecar tags 和 Keystone extra tags,执行冲突校验。
6. 构造 direct uploader:为本次 episode 创建专用 AuthClient、GatewayClient 和 cloud.Uploader。
@@ -700,19 +757,28 @@ 实施步骤
-
复用
robots.asset_id 字段,增加 active 非空唯一约束;
- create / update 实现 trim、控制字符校验、“首次非空设置后不可修改、不可清空、同值幂等”。
+ create 和专用绑定接口实现 trim、控制字符校验、active 非空唯一与 DP profile 可绑定校验。
+
+ -
+ 普通 robot update 拒绝
asset_id;新增
+ GET /robots/cloud-asset-options、PUT /robots/:id/cloud-asset、
+ DELETE /robots/:id/cloud-asset,绑定 / 更换 / 解绑成功后返回完整 robot 对象。
+
+ -
+ Synapse 机器人新建 / 编辑弹窗中的“云资产编号”复用现有
RemoteSelect;
+ 下拉打开时读取候选,编辑态只更新表单草稿,保存时再按差异调用绑定 / 更换 / 解绑接口。
-
episode 创建时解析 task -> workstation -> robot 的
asset_id,非空时写入
episodes.metadata.asset_id,但缺失不阻止 episode 创建。
-
- 新增 DP config loader,读取
SyncConfig.DPConfigPath / KEYSTONE_SYNC_DP_CONFIG,
- 校验 version、endpoint、重复 device id、空 apiKey 和空 tags,并按 asset_id 返回 profile。
+ 提取 DP config 公共解析与 profile 校验逻辑,读取 SyncConfig.DPConfigPath /
+ KEYSTONE_SYNC_DP_CONFIG,供候选接口、绑定接口和上传 loader 复用。
-
- 给
SyncWorker 增加 asset_id resolver:优先读 episodes.metadata.asset_id,
- 缺失时允许读取软删除 workstation 并反查 robots.asset_id。
+ 给 SyncWorker 增加 asset_id resolver:优先读取软删除 workstation 并反查当前
+ robots.asset_id;无法追溯 workstation / robot 时才使用 episodes.metadata.asset_id 兜底。
-
新增 raw tags builder,包含两个 reserved key 常量、非冲突插入、MinIO basename 选择、strict sidecar
@@ -760,13 +826,18 @@
风险与处理
asset_id 填错
- 本地 robot 会永久绑定错误的 Data Platform device。
- 字段首次非空设置后不可修改;未来由自动化流程写入,第一版不提供 break-glass 维护入口。
+ 之后发起的同步会使用错误的 Data Platform device 身份。
+ 前端不允许自由输入,只能从可绑定 DP profile 中选择;已绑定 robot 支持受控更换,更换会影响之后发起的云同步。
+
+
+ 解绑后继续云同步
+ 该机器人之后发起的云同步缺少 asset_id,本次 sync 失败。
+ 解绑需要二次确认;历史 episode 的 metadata.asset_id 不自动改写,但同步解析优先使用当前 robot 绑定。
device profile 缺失或不完整
上传前失败,episode 本次 sync 失败。
- 错误信息包含 asset_id 和 config path,但不打印 api key;提示现场执行 dp device init 或 dp device reinit。
+ 候选下拉隐藏不可用 profile;上传 worker 错误信息包含 asset_id 和 config path,但不打印 api key。
endpoint / TLS 配置错误
@@ -800,10 +871,12 @@ 风险与处理
测试计划
- - 单元测试 robot API / 存储:
asset_id 可首次设置、同值幂等、不可修改、不可清空、active 非空唯一。
+ - 单元测试 robot API / 存储:创建可不绑定;创建绑定需通过 DP profile 校验;普通 update 拒绝
asset_id;专用接口支持绑定、更换、解绑、同值幂等和 active 非空唯一。
+ - 单元测试 cloud asset options:过滤已占用、软删除不占用、隐藏不可用 profile、按
device_id 升序、q trim 且大小写不敏感、config 结构错误返回错误。
- 单元测试 episode 创建:有
asset_id 时写入 episodes.metadata.asset_id,缺失时仍创建 episode。
- - 单元测试 asset_id resolver:metadata 优先、fallback 读取软删除 workstation、缺失时报 non-retryable 错误、不 fallback 到
robots.device_id。
- - 单元测试 DP config loader:version、endpoint scheme/TLS、禁止 path/query/fragment、成功选择 device、缺失 device、空 apiKey、空 tags、重复 deviceId。
+ - 单元测试 asset_id resolver:当前 robot 绑定优先、解绑后不使用 stale metadata、无 workstation 时 metadata 遗留兜底、缺失时报 non-retryable 错误、不 fallback 到
robots.device_id。
+ - 单元测试 DP config loader:version、endpoint scheme/TLS、禁止 path/query/fragment、成功选择 device、缺失 device、空 apiKey、空 tags、空 tag key、重复 deviceId。
+ - 前端组件测试 / 手动验证:新建可选可清空;编辑态选择 / 清空后取消不产生副作用;保存时首次绑定直接提交;已绑定更换 / 解绑需要确认;加载、空态、搜索无结果和错误文案正确。
- 单元测试 raw tags builder:合并顺序、reserved device tag 注入、raw_file 使用 MinIO basename、相同 key 相同 value 幂等、相同 key 不同 value 报错、空 value 保留。
- 单元测试 SyncWorker 错误分类:non-retryable failed 写
next_retry_at=NULL,auto scan 跳过,manual sync 可重新尝试。
- 单元测试 uploader 持久化恢复:同 MCAP key 但
asset_id 不同不复用旧 state。
diff --git a/internal/api/handlers/robot.go b/internal/api/handlers/robot.go
index 94fe64e..5a4c166 100644
--- a/internal/api/handlers/robot.go
+++ b/internal/api/handlers/robot.go
@@ -26,17 +26,23 @@ import (
// RobotHandler handles robot related HTTP requests.
type RobotHandler struct {
- db *sqlx.DB
- recorderHub *services.RecorderHub
- transferHub *services.TransferHub
+ db *sqlx.DB
+ recorderHub *services.RecorderHub
+ transferHub *services.TransferHub
+ dpConfigPath string
}
// NewRobotHandler creates a new RobotHandler.
-func NewRobotHandler(db *sqlx.DB, recorderHub *services.RecorderHub, transferHub *services.TransferHub) *RobotHandler {
+func NewRobotHandler(db *sqlx.DB, recorderHub *services.RecorderHub, transferHub *services.TransferHub, dpConfigPath ...string) *RobotHandler {
+ configPath := ""
+ if len(dpConfigPath) > 0 {
+ configPath = strings.TrimSpace(dpConfigPath[0])
+ }
return &RobotHandler{
- db: db,
- recorderHub: recorderHub,
- transferHub: transferHub,
+ db: db,
+ recorderHub: recorderHub,
+ transferHub: transferHub,
+ dpConfigPath: configPath,
}
}
@@ -69,6 +75,21 @@ type RobotListResponse struct {
HasPrev bool `json:"hasPrev,omitempty"`
}
+// CloudAssetOption is one bindable cloud asset choice for robot management.
+type CloudAssetOption struct {
+ DeviceID string `json:"device_id"`
+}
+
+// CloudAssetOptionsResponse represents available cloud asset choices.
+type CloudAssetOptionsResponse struct {
+ Items []CloudAssetOption `json:"items"`
+}
+
+// BindCloudAssetRequest represents a robot cloud asset binding request.
+type BindCloudAssetRequest struct {
+ AssetID string `json:"asset_id"`
+}
+
// DeviceConnectionResponse is an in-memory connection snapshot keyed by Axon device_id (no database access).
type DeviceConnectionResponse struct {
DeviceID string `json:"device_id"`
@@ -106,9 +127,12 @@ type CreateRobotResponse struct {
func (h *RobotHandler) RegisterRoutes(apiV1 *gin.RouterGroup) {
apiV1.GET("/robots", h.ListRobots)
apiV1.POST("/robots", h.CreateRobot)
+ apiV1.GET("/robots/cloud-asset-options", h.ListCloudAssetOptions)
apiV1.GET("/devices/:device_id/connection", h.GetDeviceConnection)
apiV1.GET("/robots/:id", h.GetRobot)
apiV1.PUT("/robots/:id", h.UpdateRobot)
+ apiV1.PUT("/robots/:id/cloud-asset", h.BindCloudAsset)
+ apiV1.DELETE("/robots/:id/cloud-asset", h.UnbindCloudAsset)
apiV1.DELETE("/robots/:id", h.DeleteRobot)
}
@@ -159,6 +183,30 @@ func assetIDValue(ns sql.NullString) string {
return strings.TrimSpace(ns.String)
}
+func isDPConfigSystemError(err error) bool {
+ if err == nil {
+ return false
+ }
+ msg := err.Error()
+ return strings.Contains(msg, "KEYSTONE_SYNC_DP_CONFIG") ||
+ strings.Contains(msg, "read DP config") ||
+ strings.Contains(msg, "parse DP config") ||
+ strings.Contains(msg, "unsupported version") ||
+ strings.Contains(msg, "endpoints.") ||
+ strings.Contains(msg, "deviceId is empty") ||
+ strings.Contains(msg, "duplicate deviceId")
+}
+
+func (h *RobotHandler) respondCloudAssetValidationError(c *gin.Context, err error, action string) {
+ if isDPConfigSystemError(err) {
+ logger.Printf("[ROBOT] Failed to %s cloud asset using DP config: %v", action, err)
+ c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to validate cloud asset"})
+ return
+ }
+ logger.Printf("[ROBOT] Cloud asset is not bindable while %s: %v", action, err)
+ c.JSON(http.StatusBadRequest, gin.H{"error": "asset_id is not bindable"})
+}
+
func (h *RobotHandler) assetIDInUse(assetID string, excludeRobotID int64) (bool, error) {
assetID = strings.TrimSpace(assetID)
if assetID == "" {
@@ -178,6 +226,25 @@ func (h *RobotHandler) assetIDInUse(assetID string, excludeRobotID int64) (bool,
return exists, nil
}
+func (h *RobotHandler) loadRobotRow(id int64) (robotRow, error) {
+ var r robotRow
+ err := h.db.Get(&r, `
+ SELECT
+ r.id,
+ r.robot_type_id,
+ r.device_id,
+ r.factory_id,
+ r.asset_id,
+ r.status,
+ r.metadata,
+ r.created_at,
+ r.updated_at
+ FROM robots r
+ WHERE r.id = ? AND r.deleted_at IS NULL
+ `, id)
+ return r, err
+}
+
func (h *RobotHandler) connectionState(deviceID string) (connected bool, connectedAt string) {
connected, connectedAt, _, _ = h.connectionStateDetailed(deviceID)
return connected, connectedAt
@@ -484,6 +551,94 @@ func (h *RobotHandler) ListRobots(c *gin.Context) {
})
}
+// ListCloudAssetOptions lists bindable cloud asset IDs from the DP config.
+//
+// @Summary List cloud asset options
+// @Description Lists cloud asset IDs that can be bound to a robot
+// @Tags robots
+// @Produce json
+// @Param robot_id query string false "Current robot ID to exclude from occupancy checks"
+// @Param q query string false "Case-insensitive device ID search"
+// @Success 200 {object} CloudAssetOptionsResponse
+// @Failure 400 {object} map[string]string
+// @Failure 500 {object} map[string]string
+// @Router /robots/cloud-asset-options [get]
+func (h *RobotHandler) ListCloudAssetOptions(c *gin.Context) {
+ excludeRobotID := int64(0)
+ robotIDRaw := strings.TrimSpace(c.Query("robot_id"))
+ if robotIDRaw != "" {
+ id, err := strconv.ParseInt(robotIDRaw, 10, 64)
+ if err != nil || id <= 0 {
+ c.JSON(http.StatusBadRequest, gin.H{"error": "invalid robot_id format"})
+ return
+ }
+ excludeRobotID = id
+ }
+ query := strings.ToLower(strings.TrimSpace(c.Query("q")))
+
+ currentAssetID := ""
+ if excludeRobotID > 0 {
+ current, err := h.loadRobotRow(excludeRobotID)
+ if err == sql.ErrNoRows {
+ c.JSON(http.StatusNotFound, gin.H{"error": "robot not found"})
+ return
+ }
+ if err != nil {
+ logger.Printf("[ROBOT] Failed to query robot for cloud asset options: %v", err)
+ c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to list cloud asset options"})
+ return
+ }
+ currentAssetID = assetIDValue(current.AssetID)
+ }
+
+ profiles, err := services.ListDPDeviceProfiles(h.dpConfigPath)
+ if err != nil {
+ logger.Printf("[ROBOT] Failed to list cloud asset options: %v", err)
+ c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to list cloud asset options"})
+ return
+ }
+
+ var occupied []string
+ occupiedQuery := "SELECT asset_id FROM robots WHERE asset_id IS NOT NULL AND asset_id <> '' AND deleted_at IS NULL"
+ args := []interface{}{}
+ if excludeRobotID > 0 {
+ occupiedQuery += " AND id <> ?"
+ args = append(args, excludeRobotID)
+ }
+ if err := h.db.Select(&occupied, occupiedQuery, args...); err != nil {
+ logger.Printf("[ROBOT] Failed to query occupied cloud assets: %v", err)
+ c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to list cloud asset options"})
+ return
+ }
+ occupiedSet := make(map[string]struct{}, len(occupied))
+ for _, assetID := range occupied {
+ if value := strings.TrimSpace(assetID); value != "" {
+ occupiedSet[value] = struct{}{}
+ }
+ }
+
+ options := make([]CloudAssetOption, 0, len(profiles))
+ for _, profile := range profiles {
+ deviceID := strings.TrimSpace(profile.DeviceID)
+ assetID, err := normalizeAssetID(deviceID)
+ if err != nil || !assetID.Valid {
+ continue
+ }
+ if _, used := occupiedSet[assetID.String]; used {
+ continue
+ }
+ if currentAssetID != "" && assetID.String == currentAssetID {
+ continue
+ }
+ if query != "" && !strings.Contains(strings.ToLower(assetID.String), query) {
+ continue
+ }
+ options = append(options, CloudAssetOption{DeviceID: assetID.String})
+ }
+
+ c.JSON(http.StatusOK, CloudAssetOptionsResponse{Items: options})
+}
+
// CreateRobot handles robot creation requests.
//
// @Summary Create robot
@@ -527,6 +682,10 @@ func (h *RobotHandler) CreateRobot(c *gin.Context) {
return
}
if assetID.Valid {
+ if err := services.ValidateDPDeviceProfile(h.dpConfigPath, assetID.String); err != nil {
+ h.respondCloudAssetValidationError(c, err, "create robot with")
+ return
+ }
inUse, err := h.assetIDInUse(assetID.String, 0)
if err != nil {
logger.Printf("[ROBOT] Failed to check asset_id uniqueness: %v", err)
@@ -766,6 +925,11 @@ func (h *RobotHandler) UpdateRobot(c *gin.Context) {
return
}
+ if len(req.AssetID) > 0 {
+ c.JSON(http.StatusBadRequest, gin.H{"error": "asset_id must be managed through cloud asset binding API"})
+ return
+ }
+
var current struct {
AssetID sql.NullString `db:"asset_id"`
}
@@ -822,47 +986,6 @@ func (h *RobotHandler) UpdateRobot(c *gin.Context) {
args = append(args, deviceID)
}
- if len(req.AssetID) > 0 {
- var rawAssetID string
- meta := bytes.TrimSpace(req.AssetID)
- if bytes.Equal(meta, []byte("null")) {
- rawAssetID = ""
- } else if err := json.Unmarshal(req.AssetID, &rawAssetID); err != nil {
- c.JSON(http.StatusBadRequest, gin.H{"error": "asset_id must be a string or null"})
- return
- }
- assetID, err := normalizeAssetID(rawAssetID)
- if err != nil {
- c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
- return
- }
- currentAssetID := assetIDValue(current.AssetID)
- if currentAssetID != "" {
- if !assetID.Valid {
- c.JSON(http.StatusBadRequest, gin.H{"error": "asset_id cannot be cleared once set"})
- return
- }
- if assetID.String != currentAssetID {
- c.JSON(http.StatusBadRequest, gin.H{"error": "asset_id cannot be changed once set"})
- return
- }
- }
- if assetID.Valid && assetID.String != currentAssetID {
- inUse, err := h.assetIDInUse(assetID.String, id)
- if err != nil {
- logger.Printf("[ROBOT] Failed to check asset_id uniqueness: %v", err)
- c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to update robot"})
- return
- }
- if inUse {
- c.JSON(http.StatusConflict, gin.H{"error": "asset_id is already assigned to another robot"})
- return
- }
- }
- updates = append(updates, "asset_id = ?")
- args = append(args, assetID)
- }
-
if req.FactoryID != nil {
if *req.FactoryID == "" {
c.JSON(http.StatusBadRequest, gin.H{"error": "factory_id cannot be empty"})
@@ -1021,6 +1144,181 @@ func (h *RobotHandler) UpdateRobot(c *gin.Context) {
c.JSON(http.StatusOK, h.responseFromRow(r))
}
+// BindCloudAsset binds or changes a robot cloud asset ID.
+//
+// @Summary Bind robot cloud asset
+// @Description Binds or changes the Data Platform device ID used for future uploads
+// @Tags robots
+// @Accept json
+// @Produce json
+// @Param id path string true "Robot ID"
+// @Param body body BindCloudAssetRequest true "Cloud asset payload"
+// @Success 200 {object} RobotResponse
+// @Failure 400 {object} map[string]string
+// @Failure 404 {object} map[string]string
+// @Failure 409 {object} map[string]string
+// @Failure 500 {object} map[string]string
+// @Router /robots/{id}/cloud-asset [put]
+func (h *RobotHandler) BindCloudAsset(c *gin.Context) {
+ idStr := c.Param("id")
+ id, err := strconv.ParseInt(idStr, 10, 64)
+ if err != nil {
+ c.JSON(http.StatusBadRequest, gin.H{"error": "invalid robot id"})
+ return
+ }
+
+ var req BindCloudAssetRequest
+ if err := c.ShouldBindJSON(&req); err != nil {
+ c.JSON(http.StatusBadRequest, gin.H{"error": "invalid request body"})
+ return
+ }
+ assetID, err := normalizeAssetID(req.AssetID)
+ if err != nil {
+ c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
+ return
+ }
+ if !assetID.Valid {
+ c.JSON(http.StatusBadRequest, gin.H{"error": "asset_id is required"})
+ return
+ }
+
+ current, err := h.loadRobotRow(id)
+ if err == sql.ErrNoRows {
+ c.JSON(http.StatusNotFound, gin.H{"error": "robot not found"})
+ return
+ }
+ if err != nil {
+ logger.Printf("[ROBOT] Failed to query robot for cloud asset bind: %v", err)
+ c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to update robot"})
+ return
+ }
+
+ currentAssetID := assetIDValue(current.AssetID)
+ if assetID.String == currentAssetID {
+ c.JSON(http.StatusOK, h.responseFromRow(current))
+ return
+ }
+
+ if err := services.ValidateDPDeviceProfile(h.dpConfigPath, assetID.String); err != nil {
+ h.respondCloudAssetValidationError(c, err, "bind")
+ return
+ }
+ inUse, err := h.assetIDInUse(assetID.String, id)
+ if err != nil {
+ logger.Printf("[ROBOT] Failed to check asset_id uniqueness: %v", err)
+ c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to update robot"})
+ return
+ }
+ if inUse {
+ c.JSON(http.StatusConflict, gin.H{"error": "asset_id is already assigned to another robot"})
+ return
+ }
+
+ now := time.Now().UTC()
+ result, err := h.db.Exec(
+ "UPDATE robots SET asset_id = ?, updated_at = ? WHERE id = ? AND deleted_at IS NULL",
+ assetID,
+ now,
+ id,
+ )
+ if err != nil {
+ logger.Printf("[ROBOT] Failed to bind cloud asset: %v", err)
+ c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to update robot"})
+ return
+ }
+ rowsAffected, err := result.RowsAffected()
+ if err != nil {
+ logger.Printf("[ROBOT] Failed to get rows affected for cloud asset bind: %v", err)
+ c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to update robot"})
+ return
+ }
+ if rowsAffected == 0 {
+ c.JSON(http.StatusNotFound, gin.H{"error": "robot not found"})
+ return
+ }
+
+ updated, err := h.loadRobotRow(id)
+ if err != nil {
+ logger.Printf("[ROBOT] Failed to fetch cloud asset bound robot: %v", err)
+ c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to get updated robot"})
+ return
+ }
+ if currentAssetID == "" {
+ logger.Printf("[ROBOT] Cloud asset bound: robot_id=%d asset_id=%s", id, assetID.String)
+ } else {
+ logger.Printf("[ROBOT] Cloud asset changed: robot_id=%d old_asset_id=%s new_asset_id=%s", id, currentAssetID, assetID.String)
+ }
+ c.JSON(http.StatusOK, h.responseFromRow(updated))
+}
+
+// UnbindCloudAsset clears a robot cloud asset ID.
+//
+// @Summary Unbind robot cloud asset
+// @Description Clears the Data Platform device ID used for future uploads
+// @Tags robots
+// @Produce json
+// @Param id path string true "Robot ID"
+// @Success 200 {object} RobotResponse
+// @Failure 400 {object} map[string]string
+// @Failure 404 {object} map[string]string
+// @Failure 500 {object} map[string]string
+// @Router /robots/{id}/cloud-asset [delete]
+func (h *RobotHandler) UnbindCloudAsset(c *gin.Context) {
+ idStr := c.Param("id")
+ id, err := strconv.ParseInt(idStr, 10, 64)
+ if err != nil {
+ c.JSON(http.StatusBadRequest, gin.H{"error": "invalid robot id"})
+ return
+ }
+
+ current, err := h.loadRobotRow(id)
+ if err == sql.ErrNoRows {
+ c.JSON(http.StatusNotFound, gin.H{"error": "robot not found"})
+ return
+ }
+ if err != nil {
+ logger.Printf("[ROBOT] Failed to query robot for cloud asset unbind: %v", err)
+ c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to update robot"})
+ return
+ }
+
+ currentAssetID := assetIDValue(current.AssetID)
+ if currentAssetID == "" {
+ c.JSON(http.StatusOK, h.responseFromRow(current))
+ return
+ }
+
+ result, err := h.db.Exec(
+ "UPDATE robots SET asset_id = NULL, updated_at = ? WHERE id = ? AND deleted_at IS NULL",
+ time.Now().UTC(),
+ id,
+ )
+ if err != nil {
+ logger.Printf("[ROBOT] Failed to unbind cloud asset: %v", err)
+ c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to update robot"})
+ return
+ }
+ rowsAffected, err := result.RowsAffected()
+ if err != nil {
+ logger.Printf("[ROBOT] Failed to get rows affected for cloud asset unbind: %v", err)
+ c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to update robot"})
+ return
+ }
+ if rowsAffected == 0 {
+ c.JSON(http.StatusNotFound, gin.H{"error": "robot not found"})
+ return
+ }
+
+ updated, err := h.loadRobotRow(id)
+ if err != nil {
+ logger.Printf("[ROBOT] Failed to fetch cloud asset unbound robot: %v", err)
+ c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to get updated robot"})
+ return
+ }
+ logger.Printf("[ROBOT] Cloud asset unbound: robot_id=%d old_asset_id=%s", id, currentAssetID)
+ c.JSON(http.StatusOK, h.responseFromRow(updated))
+}
+
// DeleteRobot handles robot deletion requests (soft delete).
//
// @Summary Delete robot
diff --git a/internal/api/handlers/robot_test.go b/internal/api/handlers/robot_test.go
index fa7c1e1..a262e4a 100644
--- a/internal/api/handlers/robot_test.go
+++ b/internal/api/handlers/robot_test.go
@@ -10,6 +10,8 @@ import (
"encoding/json"
"net/http"
"net/http/httptest"
+ "os"
+ "path/filepath"
"strings"
"testing"
"time"
@@ -265,12 +267,12 @@ func TestRobotHandlerListRobots_ConnectedFilterUsesHubIntersection(t *testing.T)
})
}
-func TestRobotHandlerAssetID_CreateUpdateAndList(t *testing.T) {
+func TestRobotHandlerAssetID_CreateValidatesDPProfileAndList(t *testing.T) {
db := newTestRobotHandlerDB(t)
defer db.Close()
seedRobotLookups(t, db)
- r := newTestRobotRouter(t, db)
+ r := newTestRobotRouterWithDPConfig(t, db, writeRobotDPConfigFixture(t, robotDPConfigJSON("asset-1")))
req := httptest.NewRequest(http.MethodPost, "/api/v1/robots", bytes.NewBufferString(`{
"robot_type_id": "10",
@@ -311,44 +313,56 @@ func TestRobotHandlerAssetID_CreateUpdateAndList(t *testing.T) {
req.Header.Set("Content-Type", "application/json")
w = httptest.NewRecorder()
r.ServeHTTP(w, req)
- if w.Code != http.StatusOK {
- t.Fatalf("same-value update status=%d want=%d body=%s", w.Code, http.StatusOK, w.Body.String())
+ if w.Code != http.StatusBadRequest {
+ t.Fatalf("ordinary update status=%d want=%d body=%s", w.Code, http.StatusBadRequest, w.Body.String())
}
}
-func TestRobotHandlerAssetID_ImmutableOnceSet(t *testing.T) {
+func TestRobotHandlerCloudAsset_BindChangeUnbind(t *testing.T) {
db := newTestRobotHandlerDB(t)
defer db.Close()
seedRobotLookups(t, db)
- seedRobot(t, db, 1, "local-device-1", "asset-1", nil)
+ seedRobot(t, db, 1, "local-device-1", "", nil)
- r := newTestRobotRouter(t, db)
+ r := newTestRobotRouterWithDPConfig(t, db, writeRobotDPConfigFixture(t, robotDPConfigJSON("asset-1", "asset-2")))
for _, tt := range []struct {
- name string
- body string
+ name string
+ method string
+ path string
+ body string
+ wantAsset string
}{
- {name: "change rejected", body: `{"asset_id":"asset-2"}`},
- {name: "clear rejected", body: `{"asset_id":""}`},
- {name: "blank clear rejected", body: `{"asset_id":" "}`},
+ {name: "bind", method: http.MethodPut, path: "/api/v1/robots/1/cloud-asset", body: `{"asset_id":" asset-1 "}`, wantAsset: "asset-1"},
+ {name: "same value idempotent", method: http.MethodPut, path: "/api/v1/robots/1/cloud-asset", body: `{"asset_id":"asset-1"}`, wantAsset: "asset-1"},
+ {name: "change", method: http.MethodPut, path: "/api/v1/robots/1/cloud-asset", body: `{"asset_id":"asset-2"}`, wantAsset: "asset-2"},
+ {name: "unbind", method: http.MethodDelete, path: "/api/v1/robots/1/cloud-asset", wantAsset: ""},
+ {name: "unbind idempotent", method: http.MethodDelete, path: "/api/v1/robots/1/cloud-asset", wantAsset: ""},
} {
t.Run(tt.name, func(t *testing.T) {
- req := httptest.NewRequest(http.MethodPut, "/api/v1/robots/1", bytes.NewBufferString(tt.body))
+ req := httptest.NewRequest(tt.method, tt.path, bytes.NewBufferString(tt.body))
req.Header.Set("Content-Type", "application/json")
w := httptest.NewRecorder()
r.ServeHTTP(w, req)
- if w.Code != http.StatusBadRequest {
- t.Fatalf("status=%d want=%d body=%s", w.Code, http.StatusBadRequest, w.Body.String())
+ if w.Code != http.StatusOK {
+ t.Fatalf("status=%d want=%d body=%s", w.Code, http.StatusOK, w.Body.String())
+ }
+ var resp RobotResponse
+ if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil {
+ t.Fatalf("unmarshal response: %v body=%s", err, w.Body.String())
+ }
+ if resp.AssetID != tt.wantAsset {
+ t.Fatalf("asset_id=%q want=%q response=%#v", resp.AssetID, tt.wantAsset, resp)
}
})
}
- req := httptest.NewRequest(http.MethodPut, "/api/v1/robots/1", bytes.NewBufferString(`{"asset_id":null}`))
+ req := httptest.NewRequest(http.MethodPut, "/api/v1/robots/1/cloud-asset", bytes.NewBufferString(`{"asset_id":" "}`))
req.Header.Set("Content-Type", "application/json")
w := httptest.NewRecorder()
r.ServeHTTP(w, req)
if w.Code != http.StatusBadRequest {
- t.Fatalf("null clear status=%d want=%d body=%s", w.Code, http.StatusBadRequest, w.Body.String())
+ t.Fatalf("blank bind status=%d want=%d body=%s", w.Code, http.StatusBadRequest, w.Body.String())
}
}
@@ -360,7 +374,7 @@ func TestRobotHandlerAssetID_UniqueAmongActiveRobots(t *testing.T) {
deletedAt := time.Now().UTC()
seedRobot(t, db, 2, "deleted-device", "deleted-asset", &deletedAt)
- r := newTestRobotRouter(t, db)
+ r := newTestRobotRouterWithDPConfig(t, db, writeRobotDPConfigFixture(t, robotDPConfigJSON("asset-1", "deleted-asset")))
req := httptest.NewRequest(http.MethodPost, "/api/v1/robots", bytes.NewBufferString(`{
"robot_type_id": "10",
@@ -389,6 +403,49 @@ func TestRobotHandlerAssetID_UniqueAmongActiveRobots(t *testing.T) {
}
}
+func TestRobotHandlerCloudAssetOptions(t *testing.T) {
+ db := newTestRobotHandlerDB(t)
+ defer db.Close()
+ seedRobotLookups(t, db)
+ seedRobot(t, db, 1, "local-device-1", "asset-1", nil)
+ seedRobot(t, db, 2, "local-device-2", "current-asset", nil)
+ deletedAt := time.Now().UTC()
+ seedRobot(t, db, 3, "deleted-device", "deleted-asset", &deletedAt)
+
+ body := `{
+ "version": 3,
+ "endpoints": {"auth": "auth:1", "gateway": "gateway:2"},
+ "devices": [
+ {"deviceId":"asset-1","apiKey":"key","tags":{"k":"v"}},
+ {"deviceId":"asset-2","apiKey":"key","tags":{"k":"v"}},
+ {"deviceId":"current-asset","apiKey":"key","tags":{"k":"v"}},
+ {"deviceId":"deleted-asset","apiKey":"key","tags":{"k":"v"}},
+ {"deviceId":"bad-api","apiKey":" ","tags":{"k":"v"}},
+ {"deviceId":"bad-tags","apiKey":"key","tags":{}}
+ ]
+ }`
+ r := newTestRobotRouterWithDPConfig(t, db, writeRobotDPConfigFixture(t, body))
+
+ req := httptest.NewRequest(http.MethodGet, "/api/v1/robots/cloud-asset-options?robot_id=2&q=ASSET", nil)
+ w := httptest.NewRecorder()
+ r.ServeHTTP(w, req)
+ if w.Code != http.StatusOK {
+ t.Fatalf("status=%d want=%d body=%s", w.Code, http.StatusOK, w.Body.String())
+ }
+ var resp CloudAssetOptionsResponse
+ if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil {
+ t.Fatalf("unmarshal options: %v body=%s", err, w.Body.String())
+ }
+ got := make([]string, 0, len(resp.Items))
+ for _, item := range resp.Items {
+ got = append(got, item.DeviceID)
+ }
+ want := []string{"asset-2", "deleted-asset"}
+ if strings.Join(got, ",") != strings.Join(want, ",") {
+ t.Fatalf("options=%v want=%v", got, want)
+ }
+}
+
func TestRobotHandlerAssetID_Validation(t *testing.T) {
db := newTestRobotHandlerDB(t)
defer db.Close()
@@ -428,6 +485,14 @@ func newTestRobotRouter(t *testing.T, db *sqlx.DB) *gin.Engine {
return newTestRobotRouterWithHubs(t, db, nil, nil)
}
+func newTestRobotRouterWithDPConfig(t *testing.T, db *sqlx.DB, dpConfigPath string) *gin.Engine {
+ t.Helper()
+ gin.SetMode(gin.TestMode)
+ r := gin.New()
+ NewRobotHandler(db, nil, nil, dpConfigPath).RegisterRoutes(r.Group("/api/v1"))
+ return r
+}
+
func newTestRobotRouterWithHubs(t *testing.T, db *sqlx.DB, recorderHub *services.RecorderHub, transferHub *services.TransferHub) *gin.Engine {
t.Helper()
gin.SetMode(gin.TestMode)
@@ -436,6 +501,30 @@ func newTestRobotRouterWithHubs(t *testing.T, db *sqlx.DB, recorderHub *services
return r
}
+func writeRobotDPConfigFixture(t *testing.T, body string) string {
+ t.Helper()
+ path := filepath.Join(t.TempDir(), "dp-config.json")
+ if err := os.WriteFile(path, []byte(body), 0o600); err != nil {
+ t.Fatalf("write DP config fixture: %v", err)
+ }
+ return path
+}
+
+func robotDPConfigJSON(deviceIDs ...string) string {
+ var b strings.Builder
+ b.WriteString(`{"version":3,"endpoints":{"auth":"auth:1","gateway":"gateway:2"},"devices":[`)
+ for i, deviceID := range deviceIDs {
+ if i > 0 {
+ b.WriteString(",")
+ }
+ b.WriteString(`{"deviceId":"`)
+ b.WriteString(deviceID)
+ b.WriteString(`","apiKey":"key","tags":{"k":"v"}}`)
+ }
+ b.WriteString(`]}`)
+ return b.String()
+}
+
func connectRecorderForTest(t *testing.T, hub *services.RecorderHub, deviceID string, connectedAt time.Time) {
t.Helper()
conn := hub.NewRecorderConn(nil, deviceID, "127.0.0.1")
diff --git a/internal/server/server.go b/internal/server/server.go
index b4e96ba..5100689 100644
--- a/internal/server/server.go
+++ b/internal/server/server.go
@@ -143,7 +143,7 @@ func New(cfg *config.Config, db *sqlx.DB, s3Client *s3.Client, syncWorker *servi
if db != nil {
batchHandler = handlers.NewBatchHandler(db, recorderHub, recorderRPCTimeout)
robotTypeHandler = handlers.NewRobotTypeHandler(db)
- robotHandler = handlers.NewRobotHandler(db, recorderHub, transferHub)
+ robotHandler = handlers.NewRobotHandler(db, recorderHub, transferHub, cfg.Sync.DPConfigPath)
deviceRegistrationHandler = handlers.NewDeviceRegistrationHandler(db)
factoryHandler = handlers.NewFactoryHandler(db)
dataCollectorHandler = handlers.NewDataCollectorHandler(db)
diff --git a/internal/services/dp_asset_resolver.go b/internal/services/dp_asset_resolver.go
index 78f3e29..05db684 100644
--- a/internal/services/dp_asset_resolver.go
+++ b/internal/services/dp_asset_resolver.go
@@ -27,8 +27,39 @@ func assetIDFromEpisodeMetadata(metadata sql.NullString) string {
}
func resolveAssetIDForEpisode(ctx context.Context, db *sqlx.DB, episodeID int64, metadata sql.NullString, workstationID sql.NullInt64) (string, error) {
- if assetID := assetIDFromEpisodeMetadata(metadata); assetID != "" {
- return assetID, nil
+ metadataAssetID := assetIDFromEpisodeMetadata(metadata)
+ if db != nil && workstationID.Valid && workstationID.Int64 > 0 {
+ var row struct {
+ RobotID sql.NullInt64 `db:"robot_id"`
+ ResolvedRobotID sql.NullInt64 `db:"resolved_robot_id"`
+ AssetID sql.NullString `db:"asset_id"`
+ }
+ err := db.GetContext(ctx, &row, `
+ SELECT ws.robot_id, r.id AS resolved_robot_id, r.asset_id
+ FROM workstations ws
+ LEFT JOIN robots r ON r.id = ws.robot_id
+ WHERE ws.id = ?
+ LIMIT 1
+ `, workstationID.Int64)
+ if err != nil && err != sql.ErrNoRows {
+ return "", fmt.Errorf("resolve asset_id for episode %d workstation %d: %w", episodeID, workstationID.Int64, err)
+ }
+ if err == nil && row.RobotID.Valid && row.RobotID.Int64 > 0 {
+ if !row.ResolvedRobotID.Valid || row.ResolvedRobotID.Int64 <= 0 {
+ if metadataAssetID != "" {
+ return metadataAssetID, nil
+ }
+ return "", fmt.Errorf("episode %d workstation %d robot %d not found while resolving asset_id", episodeID, workstationID.Int64, row.RobotID.Int64)
+ }
+ assetID := strings.TrimSpace(row.AssetID.String)
+ if row.AssetID.Valid && assetID != "" {
+ return assetID, nil
+ }
+ return "", fmt.Errorf("episode %d workstation %d has no robot asset_id", episodeID, workstationID.Int64)
+ }
+ }
+ if metadataAssetID != "" {
+ return metadataAssetID, nil
}
if db == nil {
return "", fmt.Errorf("database is not available")
@@ -36,26 +67,5 @@ func resolveAssetIDForEpisode(ctx context.Context, db *sqlx.DB, episodeID int64,
if !workstationID.Valid || workstationID.Int64 <= 0 {
return "", fmt.Errorf("episode %d has no asset_id metadata and no workstation_id", episodeID)
}
-
- var row struct {
- AssetID sql.NullString `db:"asset_id"`
- }
- err := db.GetContext(ctx, &row, `
- SELECT r.asset_id
- FROM workstations ws
- LEFT JOIN robots r ON r.id = ws.robot_id
- WHERE ws.id = ?
- LIMIT 1
- `, workstationID.Int64)
- if err == sql.ErrNoRows {
- return "", fmt.Errorf("episode %d workstation %d not found while resolving asset_id", episodeID, workstationID.Int64)
- }
- if err != nil {
- return "", fmt.Errorf("resolve asset_id for episode %d workstation %d: %w", episodeID, workstationID.Int64, err)
- }
- assetID := strings.TrimSpace(row.AssetID.String)
- if !row.AssetID.Valid || assetID == "" {
- return "", fmt.Errorf("episode %d workstation %d has no robot asset_id", episodeID, workstationID.Int64)
- }
- return assetID, nil
+ return "", fmt.Errorf("episode %d workstation %d not found while resolving asset_id", episodeID, workstationID.Int64)
}
diff --git a/internal/services/dp_asset_resolver_test.go b/internal/services/dp_asset_resolver_test.go
index d000738..309298e 100644
--- a/internal/services/dp_asset_resolver_test.go
+++ b/internal/services/dp_asset_resolver_test.go
@@ -42,15 +42,73 @@ func newTestAssetResolverDB(t *testing.T) *sqlx.DB {
return db
}
-func TestResolveAssetIDForEpisode_MetadataWins(t *testing.T) {
+func TestResolveAssetIDForEpisode_CurrentRobotAssetWinsOverMetadata(t *testing.T) {
db := newTestAssetResolverDB(t)
- if _, err := db.Exec(`INSERT INTO robots (id, device_id, asset_id) VALUES (1, 'local-device', 'fallback-asset')`); err != nil {
+ if _, err := db.Exec(`INSERT INTO robots (id, device_id, asset_id) VALUES (1, 'local-device', 'current-asset')`); err != nil {
t.Fatalf("seed robot: %v", err)
}
if _, err := db.Exec(`INSERT INTO workstations (id, robot_id) VALUES (10, 1)`); err != nil {
t.Fatalf("seed workstation: %v", err)
}
+ got, err := resolveAssetIDForEpisode(
+ context.Background(),
+ db,
+ 1,
+ sql.NullString{String: `{"asset_id":" snapshot-asset "}`, Valid: true},
+ sql.NullInt64{Int64: 10, Valid: true},
+ )
+ if err != nil {
+ t.Fatalf("resolveAssetIDForEpisode() error = %v", err)
+ }
+ if got != "current-asset" {
+ t.Fatalf("asset_id=%q want current-asset", got)
+ }
+}
+
+func TestResolveAssetIDForEpisode_UnboundRobotDoesNotUseStaleMetadata(t *testing.T) {
+ db := newTestAssetResolverDB(t)
+ if _, err := db.Exec(`INSERT INTO robots (id, device_id, asset_id) VALUES (1, 'local-device', NULL)`); err != nil {
+ t.Fatalf("seed robot: %v", err)
+ }
+ if _, err := db.Exec(`INSERT INTO workstations (id, robot_id) VALUES (10, 1)`); err != nil {
+ t.Fatalf("seed workstation: %v", err)
+ }
+
+ _, err := resolveAssetIDForEpisode(
+ context.Background(),
+ db,
+ 1,
+ sql.NullString{String: `{"asset_id":"stale-asset"}`, Valid: true},
+ sql.NullInt64{Int64: 10, Valid: true},
+ )
+ if err == nil || !strings.Contains(err.Error(), "asset_id") {
+ t.Fatalf("error=%v want asset_id missing error", err)
+ }
+}
+
+func TestResolveAssetIDForEpisode_MetadataFallbackWhenNoWorkstation(t *testing.T) {
+ got, err := resolveAssetIDForEpisode(
+ context.Background(),
+ nil,
+ 1,
+ sql.NullString{String: `{"asset_id":" snapshot-asset "}`, Valid: true},
+ sql.NullInt64{},
+ )
+ if err != nil {
+ t.Fatalf("resolveAssetIDForEpisode() error = %v", err)
+ }
+ if got != "snapshot-asset" {
+ t.Fatalf("asset_id=%q want snapshot-asset", got)
+ }
+}
+
+func TestResolveAssetIDForEpisode_MetadataFallbackWhenRobotMissing(t *testing.T) {
+ db := newTestAssetResolverDB(t)
+ if _, err := db.Exec(`INSERT INTO workstations (id, robot_id) VALUES (10, 999)`); err != nil {
+ t.Fatalf("seed workstation: %v", err)
+ }
+
got, err := resolveAssetIDForEpisode(
context.Background(),
db,
diff --git a/internal/services/dp_config_loader.go b/internal/services/dp_config_loader.go
index f85f773..733a228 100644
--- a/internal/services/dp_config_loader.go
+++ b/internal/services/dp_config_loader.go
@@ -10,6 +10,7 @@ import (
"net"
"net/url"
"os"
+ "sort"
"strings"
)
@@ -48,66 +49,134 @@ type DPDeviceUploadConfig struct {
Profile DPDeviceProfile
}
-func loadDPDeviceUploadConfig(configPath string, assetID string) (*DPDeviceUploadConfig, error) {
+func loadDPConfigFile(configPath string) (*DPConfigFile, DPResolvedEndpoint, DPResolvedEndpoint, error) {
configPath = strings.TrimSpace(configPath)
- assetID = strings.TrimSpace(assetID)
if configPath == "" {
- return nil, fmt.Errorf("KEYSTONE_SYNC_DP_CONFIG is required")
- }
- if assetID == "" {
- return nil, fmt.Errorf("asset_id is required")
+ return nil, DPResolvedEndpoint{}, DPResolvedEndpoint{}, fmt.Errorf("KEYSTONE_SYNC_DP_CONFIG is required")
}
data, err := os.ReadFile(configPath) //nolint:gosec // operator-controlled config path
if err != nil {
- return nil, fmt.Errorf("read DP config %s: %w", configPath, err)
+ return nil, DPResolvedEndpoint{}, DPResolvedEndpoint{}, fmt.Errorf("read DP config %s: %w", configPath, err)
}
var cfg DPConfigFile
if err := json.Unmarshal(data, &cfg); err != nil {
- return nil, fmt.Errorf("parse DP config %s: %w", configPath, err)
+ return nil, DPResolvedEndpoint{}, DPResolvedEndpoint{}, fmt.Errorf("parse DP config %s: %w", configPath, err)
}
if cfg.Version != nil && *cfg.Version != 3 {
- return nil, fmt.Errorf("DP config %s has unsupported version %d", configPath, *cfg.Version)
+ return nil, DPResolvedEndpoint{}, DPResolvedEndpoint{}, fmt.Errorf("DP config %s has unsupported version %d", configPath, *cfg.Version)
}
authEndpoint, err := parseDPResolvedEndpoint(cfg.Endpoints.Auth)
if err != nil {
- return nil, fmt.Errorf("invalid endpoints.auth in DP config %s: %w", configPath, err)
+ return nil, DPResolvedEndpoint{}, DPResolvedEndpoint{}, fmt.Errorf("invalid endpoints.auth in DP config %s: %w", configPath, err)
}
gatewayEndpoint, err := parseDPResolvedEndpoint(cfg.Endpoints.Gateway)
if err != nil {
- return nil, fmt.Errorf("invalid endpoints.gateway in DP config %s: %w", configPath, err)
+ return nil, DPResolvedEndpoint{}, DPResolvedEndpoint{}, fmt.Errorf("invalid endpoints.gateway in DP config %s: %w", configPath, err)
}
devices := make(map[string]DPDeviceProfile, len(cfg.Devices))
for idx, device := range cfg.Devices {
deviceID := strings.TrimSpace(device.DeviceID)
if deviceID == "" {
- return nil, fmt.Errorf("DP config %s devices[%d].deviceId is empty", configPath, idx)
+ return nil, DPResolvedEndpoint{}, DPResolvedEndpoint{}, fmt.Errorf("DP config %s devices[%d].deviceId is empty", configPath, idx)
}
if _, exists := devices[deviceID]; exists {
- return nil, fmt.Errorf("DP config %s has duplicate deviceId %q", configPath, deviceID)
+ return nil, DPResolvedEndpoint{}, DPResolvedEndpoint{}, fmt.Errorf("DP config %s has duplicate deviceId %q", configPath, deviceID)
}
device.DeviceID = deviceID
devices[deviceID] = device
+ cfg.Devices[idx] = device
}
- profile, ok := devices[assetID]
- if !ok {
- return nil, fmt.Errorf("DP config %s has no device profile for asset_id %q", configPath, assetID)
- }
+ return &cfg, authEndpoint, gatewayEndpoint, nil
+}
+
+func requireUploadReadyDPDeviceProfile(configPath string, assetID string, profile DPDeviceProfile) (DPDeviceProfile, error) {
profile.APIKey = strings.TrimSpace(profile.APIKey)
if profile.APIKey == "" {
- return nil, fmt.Errorf("DP config %s device %q apiKey is empty", configPath, assetID)
+ return DPDeviceProfile{}, fmt.Errorf("DP config %s device %q apiKey is empty", configPath, assetID)
}
if len(profile.Tags) == 0 {
- return nil, fmt.Errorf("DP config %s device %q tags must be non-empty", configPath, assetID)
+ return DPDeviceProfile{}, fmt.Errorf("DP config %s device %q tags must be non-empty", configPath, assetID)
}
for key := range profile.Tags {
if key == "" {
- return nil, fmt.Errorf("DP config %s device %q has an empty tag key", configPath, assetID)
+ return DPDeviceProfile{}, fmt.Errorf("DP config %s device %q has an empty tag key", configPath, assetID)
+ }
+ }
+ return profile, nil
+}
+
+func findDPDeviceProfile(cfg *DPConfigFile, assetID string) (DPDeviceProfile, bool) {
+ for _, device := range cfg.Devices {
+ if device.DeviceID == assetID {
+ return device, true
+ }
+ }
+ return DPDeviceProfile{}, false
+}
+
+// ListDPDeviceProfiles returns upload-ready profiles from the data-platform config.
+func ListDPDeviceProfiles(configPath string) ([]DPDeviceProfile, error) {
+ cfg, _, _, err := loadDPConfigFile(configPath)
+ if err != nil {
+ return nil, err
+ }
+
+ profiles := make([]DPDeviceProfile, 0, len(cfg.Devices))
+ for _, device := range cfg.Devices {
+ profile, err := requireUploadReadyDPDeviceProfile(configPath, device.DeviceID, device)
+ if err != nil {
+ continue
}
+ profiles = append(profiles, profile)
+ }
+ sort.Slice(profiles, func(i, j int) bool {
+ return profiles[i].DeviceID < profiles[j].DeviceID
+ })
+ return profiles, nil
+}
+
+// ValidateDPDeviceProfile checks that one asset ID has an upload-ready device profile.
+func ValidateDPDeviceProfile(configPath string, assetID string) error {
+ configPath = strings.TrimSpace(configPath)
+ assetID = strings.TrimSpace(assetID)
+ if assetID == "" {
+ return fmt.Errorf("asset_id is required")
+ }
+ cfg, _, _, err := loadDPConfigFile(configPath)
+ if err != nil {
+ return err
+ }
+ profile, ok := findDPDeviceProfile(cfg, assetID)
+ if !ok {
+ return fmt.Errorf("DP config %s has no device profile for asset_id %q", configPath, assetID)
+ }
+ _, err = requireUploadReadyDPDeviceProfile(configPath, assetID, profile)
+ return err
+}
+
+func loadDPDeviceUploadConfig(configPath string, assetID string) (*DPDeviceUploadConfig, error) {
+ configPath = strings.TrimSpace(configPath)
+ assetID = strings.TrimSpace(assetID)
+ if assetID == "" {
+ return nil, fmt.Errorf("asset_id is required")
+ }
+
+ cfg, authEndpoint, gatewayEndpoint, err := loadDPConfigFile(configPath)
+ if err != nil {
+ return nil, err
+ }
+ profile, ok := findDPDeviceProfile(cfg, assetID)
+ if !ok {
+ return nil, fmt.Errorf("DP config %s has no device profile for asset_id %q", configPath, assetID)
+ }
+ profile, err = requireUploadReadyDPDeviceProfile(configPath, assetID, profile)
+ if err != nil {
+ return nil, err
}
return &DPDeviceUploadConfig{
diff --git a/internal/services/dp_config_loader_test.go b/internal/services/dp_config_loader_test.go
index deff985..ff5b7a7 100644
--- a/internal/services/dp_config_loader_test.go
+++ b/internal/services/dp_config_loader_test.go
@@ -174,3 +174,37 @@ func TestLoadDPDeviceUploadConfigRejectsContractErrors(t *testing.T) {
})
}
}
+
+func TestListDPDeviceProfilesFiltersProfileErrors(t *testing.T) {
+ body := `{
+ "version":3,
+ "endpoints":{"auth":"auth:1","gateway":"gateway:2"},
+ "devices":[
+ {"deviceId":"z-ready","apiKey":"key","tags":{"k":"v"}},
+ {"deviceId":"bad-api","apiKey":" ","tags":{"k":"v"}},
+ {"deviceId":"bad-tags","apiKey":"key","tags":{}},
+ {"deviceId":"bad-tag-key","apiKey":"key","tags":{"":"v"}},
+ {"deviceId":"a-ready","apiKey":"key","tags":{"k":"v"}}
+ ]
+ }`
+ profiles, err := ListDPDeviceProfiles(writeDPConfigFixture(t, body))
+ if err != nil {
+ t.Fatalf("ListDPDeviceProfiles() error = %v", err)
+ }
+ got := []string{}
+ for _, profile := range profiles {
+ got = append(got, profile.DeviceID)
+ }
+ want := []string{"a-ready", "z-ready"}
+ if strings.Join(got, ",") != strings.Join(want, ",") {
+ t.Fatalf("profiles=%v want=%v", got, want)
+ }
+}
+
+func TestListDPDeviceProfilesRejectsConfigStructureErrors(t *testing.T) {
+ body := `{"version":3,"endpoints":{"auth":"auth:1","gateway":"gateway:2"},"devices":[{"deviceId":"asset-1","apiKey":"key","tags":{"k":"v"}},{"deviceId":" asset-1 ","apiKey":"key","tags":{"k":"v"}}]}`
+ _, err := ListDPDeviceProfiles(writeDPConfigFixture(t, body))
+ if err == nil || !strings.Contains(err.Error(), "duplicate deviceId") {
+ t.Fatalf("error=%v want duplicate deviceId", err)
+ }
+}