diff --git a/docs/designs/task-recorder-upload-guard-design.md b/docs/designs/task-recorder-upload-guard-design.md new file mode 100644 index 0000000..75328d3 --- /dev/null +++ b/docs/designs/task-recorder-upload-guard-design.md @@ -0,0 +1,311 @@ + + +# Task Recorder Upload Guard Design + +**Status:** proposed + +**Scope:** Keystone recorder RPC APIs, recording callbacks, Transfer reconciliation, and Synapse operator guardrails. + +## 1. Problem + +The observed failure for `task_20260612_052143_068_76_9a4d20e3` is not a task +database status regression. Keystone correctly kept the DB task in +`uploading` and skipped `uploading -> in_progress` after the second `begin`. + +The unsafe part is earlier: Keystone still sent recorder-side `config` and +`begin` RPCs for the same `task_id` while the DB task was already `uploading`. +That allowed the recorder to start a second local recording using the same task +name while axon_transfer was uploading the original MCAP file. + +The important log shape is: + +```text +10:03:41 finish callback for task_..._76 +10:03:41 DB status in_progress -> uploading +10:03:43 transfer upload_started for task_..._76 +10:03:45 recorder config applied for task_..._76 again +10:03:56 recorder ready -> recording for task_..._76 again +10:03:56 DB begin transition skipped because current_status=uploading +10:06:18 second finish callback for task_..._76 +10:06:18 upload_request sent again +10:06:18 transfer reports upload_not_found +10:07:41 original upload completes and is ACKed +``` + +The target fix is therefore to stop illegal recorder commands before they leave +Keystone, and to make recording finish callbacks idempotent once upload has +already started or completed. + +## 2. Current Behavior + +- `POST /recorder/:device_id/config` checks recorder connectivity, transfer + connectivity, fresh recorder state, and recorder idle state. It does not + require the DB task to be `pending`. +- `GET /tasks/:id/config` validates task metadata but does not require the task + to be `pending`. +- `POST /recorder/:device_id/begin` sends the RPC first, then tries to advance + DB status from `pending` or `ready` to `in_progress`. If the task is already + `uploading`, the DB update is skipped, but the recorder has already begun. +- `POST /callbacks/finish` calls `markOwnedTaskUploading`, which currently + accepts `pending`, `ready`, `in_progress`, and `uploading`. A duplicate finish + callback for an `uploading` task therefore triggers another `upload_request`. +- axon_transfer does not resend recorder finish callbacks. Recorder finish is a + recorder-side HTTP callback. Transfer only reports upload lifecycle events and + status snapshots over WebSocket. + +## 3. Invariants + +- A task may be configured on a recorder only while the authoritative DB task + status is `pending`. +- A task may be begun on a recorder only while the authoritative DB status is + `ready`, with a narrow `pending` compatibility case for a recorder that is + already ready for the same task. +- `uploading`, `completed`, `failed`, and `cancelled` are never valid sources + for recorder `config` or `begin`. +- A finish callback for a task that is already `uploading` or `completed` is + idempotent and must not send another `upload_request`. +- Transfer recovery must be driven by transfer `connected/status` reconciliation + and persisted upload state, not by hoping for another recorder finish + callback. +- `uploaded_wait_ack` must use the same verified ACK path as + `upload_complete`; Keystone must not blindly ACK a task just because its ID + appears in `waiting_ack_task_ids`. + +## 4. API State Rules + +### `GET /tasks/:id/config` + +This is a UX and prevention layer. It should reject tasks whose DB status is not +`pending`. + +| DB status | Result | +| --- | --- | +| `pending` | Return task config. | +| `ready` | `409 task_not_configurable`; refresh UI state. | +| `in_progress` | `409 task_not_configurable`; recording has started. | +| `uploading` | `409 task_not_configurable`; upload owns the task. | +| `completed` | `409 task_not_configurable`; terminal. | +| `failed` | `409 task_not_configurable`; terminal unless a future explicit retry flow exists. | +| `cancelled` | `409 task_not_configurable`; terminal. | + +This endpoint is not the safety boundary. Operators or automation can bypass it +and call recorder RPC APIs directly, so the recorder RPC handlers must enforce +the same rules before sending WebSocket messages. + +### `POST /recorder/:device_id/config` + +This is a backend hard gate before the `config` RPC is sent. + +Required checks: + +- request contains `task_config.task_id`; +- task exists, is not deleted, belongs to `device_id`, and is `pending`; +- recorder connection exists and has a synced, fresh state snapshot; +- recorder state is `idle`; +- transfer connection exists when the existing config path requires it. + +Rejected statuses should return `409` with a stable code such as +`task_not_configurable`. No recorder RPC should be sent on rejection. + +After a successful recorder RPC, Keystone may still perform the existing +`pending -> ready` transition. If the update affects zero rows, Keystone should +log the current status and return success for the RPC result only if the +pre-RPC gate already passed; zero rows after the RPC should be rare and worth +logging. + +### `POST /recorder/:device_id/begin` + +This is also a backend hard gate before the `begin` RPC is sent. + +Allowed cases: + +| DB status | Recorder state | Recorder task_id | Result | +| --- | --- | --- | --- | +| `ready` | `ready` | same task | Send `begin`; then mark `in_progress`. | +| `pending` | `ready` | same task | Send `begin`; then mark `in_progress`. | + +The `pending` compatibility case exists for transient Keystone/recorder state +skew where the recorder has already accepted config and reports ready, but the +DB row has not reached `ready` or was rolled back to `pending` after a +disconnect. + +Rejected cases: + +| DB status | Result | +| --- | --- | +| `pending` with recorder not ready for the same task | `409 task_not_beginable`. | +| `in_progress` | `409 task_not_beginable`; already recording. | +| `uploading` | `409 task_not_beginable`; upload owns the task. | +| `completed` | `409 task_not_beginable`; terminal. | +| `failed` | `409 task_not_beginable`; terminal unless a future explicit retry flow exists. | +| `cancelled` | `409 task_not_beginable`; terminal. | + +The key ordering requirement is: validate DB ownership/status and recorder +cached/refreshed state before sending `begin`. A skipped DB transition after a +successful RPC is too late to protect the recorder. + +## 5. Finish Callback Semantics + +`POST /callbacks/finish` should be idempotent for already-uploading or +already-completed tasks. + +| Current DB status | HTTP result | DB transition | Send `upload_request` | +| --- | --- | --- | --- | +| `pending` | `200` | `pending -> uploading` | Yes. | +| `ready` | `200` | `ready -> uploading` | Yes. | +| `in_progress` | `200` | `in_progress -> uploading` | Yes. | +| `uploading` | `200` | No-op | No. | +| `completed` | `200` | No-op | No. | +| `failed` | `409` | No-op | No. | +| `cancelled` | `409` | No-op | No. | +| task not found or not owned by device | `409` | No-op | No. | + +This design deliberately does not try to distinguish a duplicate callback from +an illegal second same-name recording. The current callback payload has no +session ID, recording generation, or MCAP object identity that can prove which +physical recording produced the callback. The safe rule is status-based +idempotency plus stricter prevention at `config` and `begin`. + +Implementation note: the upload-request branch should be reached only when the +finish handler actually changed the task into `uploading` from +`pending/ready/in_progress`. If the current status is already `uploading` or +`completed`, return `200` with `upload_request_sent=false`. + +## 6. Transfer Reconciliation + +Recorder callbacks and transfer recovery solve different problems: + +- Recorder finish callback tells Keystone that local recording ended and upload + should be requested. +- Transfer `connected/status` tells Keystone what axon_transfer currently knows + about upload state. +- Transfer should not invent or replay recorder finish callbacks. + +### `upload_complete` and `uploaded_wait_ack` + +`upload_complete` already runs the verified ACK flow: + +1. verify expected objects exist in S3/MinIO; +2. update Keystone episode/task metadata; +3. send `upload_ack` to axon_transfer. + +When transfer reports a task in `uploaded_wait_ack`, Keystone may only ACK it by +reusing that same verified ACK flow. It must have enough data to identify and +verify the uploaded object, such as the matching record in `uploads[]` with +`task_id`, status, object key/S3 key, file size, and checksum where available. + +If the status snapshot only contains `waiting_ack_task_ids` and lacks a matching +`uploads[]` record, Keystone should not ACK. It may send `status_query` to ask +for a fuller snapshot. + +### Re-sending `upload_request` + +Keystone may re-send `upload_request` automatically only for a narrow recovery +case: + +- DB task status is `uploading`; +- transfer is connected; +- transfer status/`uploads[]` does not show the task as `pending`, `active`, + `retry-wait`, or `uploaded_wait_ack`; +- Keystone has a previous `error_message` that clearly means the original + `upload_request` was not sent, failed to send, or timed out. + +Keystone should not auto retry when the previous transfer response was +`upload_not_found`. That error means the transfer scanner did not find a local +MCAP for the task, so blindly resending can loop and obscure the real cause. + +For this round, use process-local in-flight and cooldown maps to prevent +reconciliation storms. Do not add a DB schema migration only for retry +bookkeeping unless later requirements need cross-process persistence. + +### Status Snapshot Handling + +Keystone should parse and cache enough of transfer `connected/status` to make +the reconciliation decisions above: + +- counts: `pending_count`, `active_count`, `uploading_count`, + `retry_wait_count`, `waiting_ack_count`, `completed_count`, `failed_count`; +- `waiting_ack_task_ids`; +- `uploads[]` records, including at least `task_id`, `status`, `s3_key` or + `object_key`, `file_size_bytes`, and checksum fields when present. + +## 7. Synapse Guardrails + +Synapse operator pages are an auxiliary guard, not the source of truth. + +Expected UI behavior: + +- refresh task state immediately before prepare/config actions; +- request task config only for `pending` tasks; +- disable or hide prepare/config controls for `ready`, `in_progress`, + `uploading`, `completed`, `failed`, and `cancelled`; +- on `task_not_configurable` or `task_not_beginable`, show a concise message + and refresh the task list/device state; +- keep auto-prepare logic constrained to `pending` tasks. + +These guardrails reduce accidental operator actions, but backend hard gates are +still required for correctness. + +## 8. Test Plan + +### Keystone recorder RPC tests + +- `config` sends no RPC and returns `409 task_not_configurable` for + `ready/in_progress/uploading/completed/failed/cancelled`. +- `config` sends RPC for owned `pending` task when recorder is idle and transfer + is connected. +- `begin` sends RPC for owned `ready` task when recorder state is ready for the + same `task_id`. +- `begin` sends RPC for owned `pending` task only when recorder state is ready + for the same `task_id`. +- `begin` sends no RPC for `uploading/completed/failed/cancelled/in_progress`. + +### Keystone callback tests + +- finish callback transitions `pending`, `ready`, and `in_progress` to + `uploading` and sends exactly one `upload_request`. +- finish callback for `uploading` returns `200`, does not change DB state, and + does not send `upload_request`. +- finish callback for `completed` returns `200`, does not change DB state, and + does not send `upload_request`. +- finish callback for `failed` and `cancelled` returns `409` and sends no + `upload_request`. + +### Transfer reconciliation tests + +- `uploaded_wait_ack` with a complete `uploads[]` record uses the same verified + ACK flow as `upload_complete`. +- `waiting_ack_task_ids` without a matching upload record does not send ACK and + may trigger `status_query`. +- DB `uploading` plus a send-failure `error_message` and no active transfer + upload can re-send `upload_request` once per cooldown window. +- `upload_not_found` does not auto re-send `upload_request`. + +### Synapse checks + +- operator prepare/config buttons are disabled for non-`pending` statuses; +- auto-prepare skips non-`pending` tasks; +- backend `409` codes refresh local task state and show a friendly message. + +## 9. Non-goals + +- Do not introduce a recorder session ID or recording generation in this round. +- Do not change MCAP naming rules in this round. +- Do not add a DB schema migration solely for reconciliation cooldown state. +- Do not make axon_transfer resend recorder finish callbacks. +- Do not treat `upload_not_found` as an automatic retryable condition. + +## 10. Future Work + +- Add a recorder session ID or recording generation to `config`, `begin`, start + callback, finish callback, sidecar metadata, and upload records. That would + allow Keystone to distinguish duplicate callbacks from a second physical + recording with the same task ID. +- Persist structured upload request state instead of encoding retry decisions + in `tasks.error_message`. +- Add a transfer-side startup flush for `uploaded_wait_ack` records so Keystone + can reconcile completed uploads immediately after reconnect. diff --git a/internal/api/handlers/axon_rpc.go b/internal/api/handlers/axon_rpc.go index 92a4bd5..8cc50de 100644 --- a/internal/api/handlers/axon_rpc.go +++ b/internal/api/handlers/axon_rpc.go @@ -252,6 +252,10 @@ func (h *RecorderHandler) Config(c *gin.Context) { return } + if !h.requireTaskConfigurable(c, taskID) { + return + } + if !h.callRPC(c, "config", params) { return } @@ -259,6 +263,37 @@ func (h *RecorderHandler) Config(c *gin.Context) { advanceTaskPendingToReady(h.db, c.Param("device_id"), taskID, "config") } +func (h *RecorderHandler) requireTaskConfigurable(c *gin.Context, taskID string) bool { + if h == nil || h.db == nil { + return true + } + taskID = strings.TrimSpace(taskID) + if taskID == "" { + c.JSON(http.StatusBadRequest, gin.H{ + "code": "task_id_required", + "error": "task_config.task_id is required", + }) + return false + } + + deviceID := strings.TrimSpace(c.Param("device_id")) + status, ok, err := currentOwnedTaskStatus(c.Request.Context(), h.db, deviceID, taskID) + if err != nil { + logger.Printf("%s failed to check task configurability: err=%v", recorderTaskLogPrefix(deviceID, taskID), err) + c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to check task status"}) + return false + } + if !ok || status != "pending" { + c.JSON(http.StatusConflict, gin.H{ + "code": "task_not_configurable", + "error": "task is not configurable", + "current_status": taskStatusLogValue(status, "not_found"), + }) + return false + } + return true +} + // Begin sends begin recording RPC to the recorder. // // @Summary Begin recording @@ -286,6 +321,10 @@ func (h *RecorderHandler) Begin(c *gin.Context) { } } + if !h.requireTaskBeginable(c, taskID) { + return + } + if !h.callRPC(c, "begin", params) { return } @@ -305,6 +344,83 @@ func (h *RecorderHandler) Begin(c *gin.Context) { } } +func (h *RecorderHandler) requireTaskBeginable(c *gin.Context, taskID string) bool { + if h == nil || h.db == nil { + return true + } + taskID = strings.TrimSpace(taskID) + if taskID == "" { + c.JSON(http.StatusBadRequest, gin.H{ + "code": "task_id_required", + "error": "task_id is required", + }) + return false + } + + deviceID := strings.TrimSpace(c.Param("device_id")) + status, ok, err := currentOwnedTaskStatus(c.Request.Context(), h.db, deviceID, taskID) + if err != nil { + logger.Printf("%s failed to check task beginability: err=%v", recorderTaskLogPrefix(deviceID, taskID), err) + c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to check task status"}) + return false + } + if !ok || (status != "pending" && status != "ready") { + c.JSON(http.StatusConflict, gin.H{ + "code": "task_not_beginable", + "error": "task is not beginable", + "current_status": taskStatusLogValue(status, "not_found"), + }) + return false + } + + if h.hub == nil { + c.JSON(http.StatusInternalServerError, gin.H{"error": "recorder hub is not configured"}) + return false + } + rc := h.hub.Get(deviceID) + if rc == nil { + c.JSON(http.StatusNotFound, gin.H{"error": services.ErrRecorderNotConnected.Error()}) + return false + } + if !rc.IsStateSynced() { + c.JSON(http.StatusConflict, gin.H{ + "code": "recorder_state_syncing", + "error": "recorder state is syncing; retry after initial state snapshot", + }) + return false + } + + state := rc.GetState() + if recorderStateAge(state) > defaultRecorderFreshMaxAge { + refreshed, _, err := h.refreshRecorderState(c.Request.Context(), deviceID, rc, -1) + if err != nil { + statusCode := http.StatusConflict + if errors.Is(err, services.ErrRecorderRPCTimeout) { + statusCode = http.StatusGatewayTimeout + } + out := h.recorderStateResponse(deviceID, rc, false) + out["code"] = "recorder_state_refresh_failed" + out["error"] = err.Error() + c.JSON(statusCode, out) + return false + } + state = refreshed + } + + current := strings.ToLower(strings.TrimSpace(state.CurrentState)) + if current != "ready" || strings.TrimSpace(state.TaskID) != taskID { + c.JSON(http.StatusConflict, gin.H{ + "code": "task_not_beginable", + "error": "recorder is not ready for task", + "current_status": status, + "recorder_state": state.CurrentState, + "recorder_task_id": state.TaskID, + }) + return false + } + return true +} + // Finish sends finish recording RPC to the recorder. // // @Summary Finish recording diff --git a/internal/api/handlers/recorder_axon_interaction_test.go b/internal/api/handlers/recorder_axon_interaction_test.go index b9617a7..b629476 100644 --- a/internal/api/handlers/recorder_axon_interaction_test.go +++ b/internal/api/handlers/recorder_axon_interaction_test.go @@ -190,6 +190,37 @@ func TestRecorderConfigRejectsAdditionalBusyStates(t *testing.T) { } } +func TestRecorderConfigRejectsUploadingTaskBeforeRPC(t *testing.T) { + db := newTaskStateRecoveryDB(t) + defer db.Close() + seedTaskStateRecoveryTask(t, db, "task-config-uploading", "uploading") + + hub := services.NewRecorderHub() + rpcCalled := make(chan services.RPCRequest, 1) + rc := attachRecorderRPCResponderWithConn(t, hub, "robot-001", func(req services.RPCRequest) services.RPCResponse { + rpcCalled <- req + return services.RPCResponse{Success: true} + }) + handler := NewRecorderHandler(hub, &config.RecorderConfig{ResponseTimeout: 1}, db) + _ = handler.applyRecorderStateSnapshot(rc, services.RecorderState{CurrentState: "idle"}, "state_update") + + router := newRecorderInteractionRouter(handler) + w := recorderInteractionPost(t, router, "/recorder/robot-001/config", `{"task_config":{"task_id":"task-config-uploading"}}`) + + if w.Code != http.StatusConflict { + t.Fatalf("status=%d want=%d body=%s", w.Code, http.StatusConflict, w.Body.String()) + } + if !strings.Contains(w.Body.String(), "task_not_configurable") { + t.Fatalf("response body %q does not include task_not_configurable", w.Body.String()) + } + select { + case req := <-rpcCalled: + t.Fatalf("unexpected config RPC sent for uploading task: %#v", req) + default: + } + assertTaskStateRecoveryStatus(t, db, "task-config-uploading", "uploading") +} + func TestRecorderConfigTimeoutAndDisconnectedKeepTaskPending(t *testing.T) { t.Run("timeout", func(t *testing.T) { db := newTaskStateRecoveryDB(t) @@ -238,9 +269,10 @@ func TestRecorderBeginSuccessTimeoutAndDisconnectedTaskState(t *testing.T) { seedTaskStateRecoveryTask(t, db, "task-begin-success", "pending") hub := services.NewRecorderHub() - attachRecorderRPCResponder(t, hub, "robot-001", func(req services.RPCRequest) services.RPCResponse { + rc := attachRecorderRPCResponderWithConn(t, hub, "robot-001", func(req services.RPCRequest) services.RPCResponse { return services.RPCResponse{Success: true} }) + rc.UpdateState(services.RecorderState{CurrentState: "ready", TaskID: "task-begin-success"}) handler := NewRecorderHandler(hub, &config.RecorderConfig{ResponseTimeout: 1}, db) router := newRecorderInteractionRouter(handler) @@ -259,7 +291,8 @@ func TestRecorderBeginSuccessTimeoutAndDisconnectedTaskState(t *testing.T) { seedTaskStateRecoveryTask(t, db, "task-begin-timeout", "pending") hub := services.NewRecorderHub() - _, requests := attachRecorderRPCObserverWithConn(t, hub, "robot-001") + rc, requests := attachRecorderRPCObserverWithConn(t, hub, "robot-001") + rc.UpdateState(services.RecorderState{CurrentState: "ready", TaskID: "task-begin-timeout"}) handler := NewRecorderHandler(hub, &config.RecorderConfig{ResponseTimeout: 1}, db) router := newRecorderInteractionRouter(handler) @@ -290,38 +323,87 @@ func TestRecorderBeginSuccessTimeoutAndDisconnectedTaskState(t *testing.T) { }) } +func TestRecorderBeginRejectsUploadingTaskBeforeRPC(t *testing.T) { + db := newTaskStateRecoveryDB(t) + defer db.Close() + seedTaskStateRecoveryTask(t, db, "task-begin-uploading", "uploading") + + hub := services.NewRecorderHub() + rpcCalled := make(chan services.RPCRequest, 1) + rc := attachRecorderRPCResponderWithConn(t, hub, "robot-001", func(req services.RPCRequest) services.RPCResponse { + rpcCalled <- req + return services.RPCResponse{Success: true} + }) + handler := NewRecorderHandler(hub, &config.RecorderConfig{ResponseTimeout: 1}, db) + _ = handler.applyRecorderStateSnapshot(rc, services.RecorderState{CurrentState: "ready", TaskID: "task-begin-uploading"}, "state_update") + + router := newRecorderInteractionRouter(handler) + w := recorderInteractionPost(t, router, "/recorder/robot-001/begin", `{"task_id":"task-begin-uploading"}`) + + if w.Code != http.StatusConflict { + t.Fatalf("status=%d want=%d body=%s", w.Code, http.StatusConflict, w.Body.String()) + } + if !strings.Contains(w.Body.String(), "task_not_beginable") { + t.Fatalf("response body %q does not include task_not_beginable", w.Body.String()) + } + select { + case req := <-rpcCalled: + t.Fatalf("unexpected begin RPC sent for uploading task: %#v", req) + default: + } + assertTaskStateRecoveryStatus(t, db, "task-begin-uploading", "uploading") +} + func TestRecorderBeginSuccessAdvancesReadyAndLeavesInProgress(t *testing.T) { - for _, tt := range []struct { - name string - initial string - wantStatus string - }{ - {name: "ready advances to in_progress", initial: "ready", wantStatus: "in_progress"}, - {name: "in_progress stays in_progress", initial: "in_progress", wantStatus: "in_progress"}, - } { - t.Run(tt.name, func(t *testing.T) { - db := newTaskStateRecoveryDB(t) - defer db.Close() - seedTaskStateRecoveryTask(t, db, "task-begin-idempotent", tt.initial) + t.Run("ready advances to in_progress", func(t *testing.T) { + db := newTaskStateRecoveryDB(t) + defer db.Close() + seedTaskStateRecoveryTask(t, db, "task-begin-idempotent", "ready") - hub := services.NewRecorderHub() - attachRecorderRPCResponder(t, hub, "robot-001", func(req services.RPCRequest) services.RPCResponse { - return services.RPCResponse{Success: true} - }) - handler := NewRecorderHandler(hub, &config.RecorderConfig{ResponseTimeout: 1}, db) - router := newRecorderInteractionRouter(handler) + hub := services.NewRecorderHub() + rc := attachRecorderRPCResponderWithConn(t, hub, "robot-001", func(req services.RPCRequest) services.RPCResponse { + return services.RPCResponse{Success: true} + }) + rc.UpdateState(services.RecorderState{CurrentState: "ready", TaskID: "task-begin-idempotent"}) + handler := NewRecorderHandler(hub, &config.RecorderConfig{ResponseTimeout: 1}, db) + router := newRecorderInteractionRouter(handler) - w := recorderInteractionPost(t, router, "/recorder/robot-001/begin", `{"task_id":"task-begin-idempotent"}`) + w := recorderInteractionPost(t, router, "/recorder/robot-001/begin", `{"task_id":"task-begin-idempotent"}`) - if w.Code != http.StatusOK { - t.Fatalf("status=%d want=%d body=%s", w.Code, http.StatusOK, w.Body.String()) - } - assertTaskStateRecoveryStatus(t, db, "task-begin-idempotent", tt.wantStatus) - if tt.initial == "ready" { - assertTaskStateRecoveryTimestampSet(t, db, "task-begin-idempotent", "started_at") - } + if w.Code != http.StatusOK { + t.Fatalf("status=%d want=%d body=%s", w.Code, http.StatusOK, w.Body.String()) + } + assertTaskStateRecoveryStatus(t, db, "task-begin-idempotent", "in_progress") + assertTaskStateRecoveryTimestampSet(t, db, "task-begin-idempotent", "started_at") + }) + + t.Run("in_progress is rejected before rpc", func(t *testing.T) { + db := newTaskStateRecoveryDB(t) + defer db.Close() + seedTaskStateRecoveryTask(t, db, "task-begin-idempotent", "in_progress") + + hub := services.NewRecorderHub() + rpcCalled := make(chan services.RPCRequest, 1) + rc := attachRecorderRPCResponderWithConn(t, hub, "robot-001", func(req services.RPCRequest) services.RPCResponse { + rpcCalled <- req + return services.RPCResponse{Success: true} }) - } + rc.UpdateState(services.RecorderState{CurrentState: "ready", TaskID: "task-begin-idempotent"}) + handler := NewRecorderHandler(hub, &config.RecorderConfig{ResponseTimeout: 1}, db) + router := newRecorderInteractionRouter(handler) + + w := recorderInteractionPost(t, router, "/recorder/robot-001/begin", `{"task_id":"task-begin-idempotent"}`) + + if w.Code != http.StatusConflict { + t.Fatalf("status=%d want=%d body=%s", w.Code, http.StatusConflict, w.Body.String()) + } + select { + case req := <-rpcCalled: + t.Fatalf("unexpected begin RPC sent for in_progress task: %#v", req) + default: + } + assertTaskStateRecoveryStatus(t, db, "task-begin-idempotent", "in_progress") + }) } func TestRecorderForwardOnlyActionsDoNotMutateTaskState(t *testing.T) { @@ -1240,6 +1322,10 @@ func TestRecorderWebSocketRPCActionProtocol(t *testing.T) { for _, tc := range cases { t.Run(tc.name, func(t *testing.T) { + if tc.wantAction == "begin" { + axon.sendStateUpdate(t, "ready", "task-protocol") + waitForRecorderCachedState(t, hub, "robot-001", true, "ready", "task-protocol") + } resultC := recorderInteractionRequestAsync(router, tc.method, tc.path, tc.body) req := axon.receiveRPC(t, tc.wantAction) if tc.check != nil { diff --git a/internal/api/handlers/task.go b/internal/api/handlers/task.go index 584a3bc..2759648 100644 --- a/internal/api/handlers/task.go +++ b/internal/api/handlers/task.go @@ -873,7 +873,44 @@ func (h *TaskHandler) OnRecordingFinish(c *gin.Context) { logger.Printf("%s received finish callback", recorderTaskLogPrefix(callback.DeviceID, callback.TaskID)) if h.db != nil { - previousStatus, _, _ := currentOwnedTaskStatus(c.Request.Context(), h.db, deviceID, callback.TaskID) + previousStatus, owned, err := currentOwnedTaskStatus(c.Request.Context(), h.db, deviceID, callback.TaskID) + if err != nil { + logger.Printf("%s failed to query task status after finish callback: err=%v", recorderTaskLogPrefix(deviceID, callback.TaskID), err) + c.JSON(http.StatusInternalServerError, gin.H{"error_msg": "Failed to query task status"}) + return + } + if !owned { + logger.Printf("%s finish callback rejected: task is not owned by device", recorderTaskLogPrefix(deviceID, callback.TaskID)) + c.JSON(http.StatusConflict, gin.H{ + "error_msg": "task is not owned by device or is not uploadable", + }) + return + } + switch previousStatus { + case "uploading", "completed": + logger.Printf("%s finish callback idempotent: current_status=%s", recorderTaskLogPrefix(deviceID, callback.TaskID), previousStatus) + c.JSON(http.StatusOK, gin.H{ + "success": true, + "message": "Recording finish callback already handled", + "task_status": previousStatus, + "upload_request_sent": false, + }) + return + case "failed", "cancelled": + logger.Printf("%s finish callback rejected: current_status=%s", recorderTaskLogPrefix(deviceID, callback.TaskID), previousStatus) + c.JSON(http.StatusConflict, gin.H{ + "error_msg": "task is not uploadable", + }) + return + case "pending", "ready", "in_progress": + default: + logger.Printf("%s finish callback rejected: current_status=%s", recorderTaskLogPrefix(deviceID, callback.TaskID), previousStatus) + c.JSON(http.StatusConflict, gin.H{ + "error_msg": "task is not uploadable", + }) + return + } + res, err := markOwnedTaskUploading(c.Request.Context(), h.db, deviceID, callback.TaskID) if err != nil { logger.Printf("%s failed to mark task uploading after finish callback: err=%v", recorderTaskLogPrefix(deviceID, callback.TaskID), err) @@ -882,6 +919,22 @@ func (h *TaskHandler) OnRecordingFinish(c *gin.Context) { } else if n, _ := res.RowsAffected(); n > 0 { logger.Printf("%s task status updated: %s -> uploading reason=finish_callback", recorderTaskLogPrefix(deviceID, callback.TaskID), taskStatusLogValue(previousStatus, "unknown")) } else { + currentStatus, _, statusErr := currentOwnedTaskStatus(c.Request.Context(), h.db, deviceID, callback.TaskID) + if statusErr != nil { + logger.Printf("%s failed to recheck task status after finish callback noop: err=%v", recorderTaskLogPrefix(deviceID, callback.TaskID), statusErr) + c.JSON(http.StatusInternalServerError, gin.H{"error_msg": "Failed to query task status"}) + return + } + if currentStatus == "uploading" || currentStatus == "completed" { + logger.Printf("%s finish callback idempotent after noop: current_status=%s", recorderTaskLogPrefix(deviceID, callback.TaskID), currentStatus) + c.JSON(http.StatusOK, gin.H{ + "success": true, + "message": "Recording finish callback already handled", + "task_status": currentStatus, + "upload_request_sent": false, + }) + return + } logger.Printf("%s task uploading transition skipped after finish callback", recorderTaskLogPrefix(deviceID, callback.TaskID)) c.JSON(http.StatusConflict, gin.H{ "error_msg": "task is not owned by device or is not uploadable", @@ -975,6 +1028,30 @@ func (h *TaskHandler) GetTaskConfig(c *gin.Context) { return } + var currentStatus string + if err := h.db.Get(¤tStatus, ` + SELECT status + FROM tasks + WHERE id = ? AND deleted_at IS NULL + LIMIT 1 + `, id); err != nil { + if err == sql.ErrNoRows { + c.JSON(http.StatusNotFound, gin.H{"error_msg": "Task not found: " + idStr}) + return + } + logger.Printf("[TASK] Failed to query task status for config: %v", err) + c.JSON(http.StatusInternalServerError, gin.H{"error_msg": "Failed to query task"}) + return + } + if strings.TrimSpace(currentStatus) != "pending" { + c.JSON(http.StatusConflict, gin.H{ + "code": "task_not_configurable", + "error_msg": "Task is not configurable", + "current_status": strings.TrimSpace(currentStatus), + }) + return + } + type taskConfigRow struct { TaskID string `db:"task_id"` WorkstationID sql.NullInt64 `db:"workstation_id"` diff --git a/internal/api/handlers/task_state_recovery_test.go b/internal/api/handlers/task_state_recovery_test.go index d57e017..5d51131 100644 --- a/internal/api/handlers/task_state_recovery_test.go +++ b/internal/api/handlers/task_state_recovery_test.go @@ -313,9 +313,10 @@ func TestRecorderBeginDoesNotAdvanceTaskWhenRPCResponseUnsuccessful(t *testing.T seedTaskStateRecoveryTask(t, db, "task-begin-false", "pending") hub := services.NewRecorderHub() - attachRecorderRPCResponder(t, hub, "robot-001", func(req services.RPCRequest) services.RPCResponse { + rc := attachRecorderRPCResponderWithConn(t, hub, "robot-001", func(req services.RPCRequest) services.RPCResponse { return services.RPCResponse{Success: false, Message: "device rejected begin"} }) + rc.UpdateState(services.RecorderState{CurrentState: "ready", TaskID: "task-begin-false"}) handler := NewRecorderHandler(hub, &config.RecorderConfig{ResponseTimeout: 1}, db) gin.SetMode(gin.TestMode) @@ -527,6 +528,135 @@ func TestRecordingFinishDisconnectedTransferRecordsUploadRequestError(t *testing } } +func TestRecordingFinishUploadingTaskIsIdempotentWithoutUploadRequest(t *testing.T) { + db := newTaskStateRecoveryDB(t) + defer db.Close() + seedTaskStateRecoveryTask(t, db, "task-finish-uploading", "uploading") + + hub := &recordingFinishTransferHub{ + conn: &services.TransferConn{DeviceID: "robot-001"}, + } + handler := &TaskHandler{db: db, hub: hub} + + gin.SetMode(gin.TestMode) + router := gin.New() + handler.RegisterCallbackRoutes(router.Group("/callbacks")) + + body, err := json.Marshal(RecordingFinishCallback{ + TaskID: "task-finish-uploading", + DeviceID: "robot-001", + Status: "finished", + FinishedAt: time.Now().UTC().Format(time.RFC3339), + OutputPath: "/data/task-finish-uploading.mcap", + }) + if err != nil { + t.Fatalf("marshal callback: %v", err) + } + + req := httptest.NewRequest(http.MethodPost, "/callbacks/finish", bytes.NewReader(body)) + req.Header.Set("Content-Type", "application/json") + w := httptest.NewRecorder() + router.ServeHTTP(w, req) + + if w.Code != http.StatusOK { + t.Fatalf("status=%d want=%d body=%s", w.Code, http.StatusOK, w.Body.String()) + } + if hub.sendDeviceID != "" { + t.Fatalf("Send device=%q want empty for idempotent uploading finish", hub.sendDeviceID) + } + assertTaskStateRecoveryStatus(t, db, "task-finish-uploading", "uploading") + if !strings.Contains(w.Body.String(), `"upload_request_sent":false`) { + t.Fatalf("response body %q does not report unsent upload request", w.Body.String()) + } +} + +func TestRecordingFinishTerminalStatusHandling(t *testing.T) { + tests := []struct { + name string + status string + wantStatus int + }{ + {name: "completed is idempotent", status: "completed", wantStatus: http.StatusOK}, + {name: "failed is rejected", status: "failed", wantStatus: http.StatusConflict}, + {name: "cancelled is rejected", status: "cancelled", wantStatus: http.StatusConflict}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + db := newTaskStateRecoveryDB(t) + defer db.Close() + seedTaskStateRecoveryTask(t, db, "task-finish-terminal", tt.status) + + hub := &recordingFinishTransferHub{ + conn: &services.TransferConn{DeviceID: "robot-001"}, + } + handler := &TaskHandler{db: db, hub: hub} + + gin.SetMode(gin.TestMode) + router := gin.New() + handler.RegisterCallbackRoutes(router.Group("/callbacks")) + + body, err := json.Marshal(RecordingFinishCallback{ + TaskID: "task-finish-terminal", + DeviceID: "robot-001", + Status: "finished", + FinishedAt: time.Now().UTC().Format(time.RFC3339), + OutputPath: "/data/task-finish-terminal.mcap", + }) + if err != nil { + t.Fatalf("marshal callback: %v", err) + } + + req := httptest.NewRequest(http.MethodPost, "/callbacks/finish", bytes.NewReader(body)) + req.Header.Set("Content-Type", "application/json") + w := httptest.NewRecorder() + router.ServeHTTP(w, req) + + if w.Code != tt.wantStatus { + t.Fatalf("status=%d want=%d body=%s", w.Code, tt.wantStatus, w.Body.String()) + } + if hub.sendDeviceID != "" { + t.Fatalf("Send device=%q want empty for terminal finish status=%s", hub.sendDeviceID, tt.status) + } + assertTaskStateRecoveryStatus(t, db, "task-finish-terminal", tt.status) + }) + } +} + +func TestGetTaskConfigRejectsUploadingTask(t *testing.T) { + db, err := sqlx.Open("sqlite", ":memory:") + if err != nil { + t.Fatalf("open sqlite db: %v", err) + } + defer db.Close() + if _, err := db.Exec(`CREATE TABLE tasks ( + id INTEGER PRIMARY KEY, + task_id TEXT NOT NULL, + status TEXT NOT NULL, + deleted_at TIMESTAMP NULL + )`); err != nil { + t.Fatalf("create tasks schema: %v", err) + } + if _, err := db.Exec(`INSERT INTO tasks (id, task_id, status) VALUES (1, 'task-config-uploading', 'uploading')`); err != nil { + t.Fatalf("seed task: %v", err) + } + + handler := &TaskHandler{db: db} + gin.SetMode(gin.TestMode) + router := gin.New() + router.GET("/tasks/:id/config", handler.GetTaskConfig) + + w := httptest.NewRecorder() + router.ServeHTTP(w, httptest.NewRequest(http.MethodGet, "/tasks/1/config", nil)) + + if w.Code != http.StatusConflict { + t.Fatalf("status=%d want=%d body=%s", w.Code, http.StatusConflict, w.Body.String()) + } + if !strings.Contains(w.Body.String(), "task_not_configurable") { + t.Fatalf("response body %q does not include task_not_configurable", w.Body.String()) + } +} + type recordingFinishTransferHub struct { conn *services.TransferConn getDeviceID string diff --git a/internal/api/handlers/task_uploading.go b/internal/api/handlers/task_uploading.go index 61ccff7..4b40532 100644 --- a/internal/api/handlers/task_uploading.go +++ b/internal/api/handlers/task_uploading.go @@ -63,7 +63,7 @@ func markOwnedTaskUploading(ctx context.Context, exec taskStateExecutor, deviceI updated_at = ?, error_message = NULL WHERE task_id = ? - AND status IN ('pending', 'ready', 'in_progress', 'uploading') + AND status IN ('pending', 'ready', 'in_progress') AND deleted_at IS NULL AND EXISTS ( SELECT 1 diff --git a/internal/api/handlers/transfer.go b/internal/api/handlers/transfer.go index 6350cda..76c667b 100644 --- a/internal/api/handlers/transfer.go +++ b/internal/api/handlers/transfer.go @@ -50,6 +50,8 @@ type TransferHandler struct { recorderRPCTimeout time.Duration stateBroker *services.DeviceStateBroker qaEnqueuer episodeQAEnqueuer + reconcileMu sync.Mutex + reconcileAttempts map[string]time.Time } // NewTransferHandler creates a new TransferHandler. @@ -331,16 +333,24 @@ func (h *TransferHandler) onConnected(dc *services.TransferConn, msg map[string] return } s := services.DeviceStatus{ - Version: stringVal(data, "version"), - PendingCount: intVal(data, "pending_count"), - UploadingCount: intVal(data, "uploading_count"), - WaitingACKCount: intVal(data, "waiting_ack_count"), - FailedCount: intVal(data, "failed_count"), + Version: stringVal(data, "version"), + PendingCount: intVal(data, "pending_count"), + ActiveCount: intVal(data, "active_count"), + UploadingCount: intVal(data, "uploading_count"), + RetryWaitCount: intVal(data, "retry_wait_count"), + WaitingACKCount: intVal(data, "waiting_ack_count"), + WaitingACKTaskIDs: stringSliceVal(data, "waiting_ack_task_ids"), + CompletedCount: intVal(data, "completed_count"), + FailedCount: intVal(data, "failed_count"), + PendingBytes: int64Val(data, "pending_bytes"), + BytesPerSec: int64Val(data, "bytes_per_sec"), + Uploads: transferUploadsVal(data, "uploads"), } dc.UpdateStatus(s) // #nosec G706 -- Set aside for now logger.Printf("%s connected: version=%s pending=%d uploading=%d failed=%d", transferLogPrefix(dc.DeviceID), s.Version, s.PendingCount, s.UploadingCount, s.FailedCount) + h.reconcileUploadRequestsFromStatus(dc) } // onUploadStarted handles "upload_started" message @@ -1026,27 +1036,129 @@ func (h *TransferHandler) onStatus(dc *services.TransferConn, msg map[string]int return } - // Parse waiting_ack_task_ids - var waitingIDs []string - if raw, ok := data["waiting_ack_task_ids"].([]interface{}); ok { - for _, v := range raw { - if s, ok := v.(string); ok { - waitingIDs = append(waitingIDs, s) - } - } - } - s := services.DeviceStatus{ PendingCount: intVal(data, "pending_count"), + ActiveCount: intVal(data, "active_count"), UploadingCount: intVal(data, "uploading_count"), + RetryWaitCount: intVal(data, "retry_wait_count"), WaitingACKCount: intVal(data, "waiting_ack_count"), - WaitingACKTaskIDs: waitingIDs, + WaitingACKTaskIDs: stringSliceVal(data, "waiting_ack_task_ids"), CompletedCount: intVal(data, "completed_count"), FailedCount: intVal(data, "failed_count"), PendingBytes: int64Val(data, "pending_bytes"), BytesPerSec: int64Val(data, "bytes_per_sec"), + Uploads: transferUploadsVal(data, "uploads"), } dc.UpdateStatus(s) + h.reconcileUploadRequestsFromStatus(dc) +} + +const transferReconcileCooldown = 30 * time.Second + +type uploadRequestReconcileTask struct { + TaskID string `db:"task_id"` + ErrorMessage string `db:"error_message"` +} + +func (h *TransferHandler) reconcileUploadRequestsFromStatus(dc *services.TransferConn) { + if h == nil || h.db == nil || h.hub == nil || dc == nil { + return + } + + status := dc.GetStatus() + active := activeTransferUploadTasks(status.Uploads) + var rows []uploadRequestReconcileTask + if err := h.db.SelectContext(context.Background(), &rows, ` + SELECT t.task_id, COALESCE(t.error_message, '') AS error_message + FROM tasks t + JOIN workstations ws ON ws.id = t.workstation_id AND ws.deleted_at IS NULL + JOIN robots r ON r.id = ws.robot_id AND r.deleted_at IS NULL + WHERE t.status = 'uploading' + AND t.deleted_at IS NULL + AND r.device_id = ? + AND t.error_message IS NOT NULL + AND TRIM(t.error_message) <> '' + `, dc.DeviceID); err != nil { + logger.Printf("%s upload_request reconciliation query failed: %v", transferLogPrefix(dc.DeviceID), err) + return + } + + for _, row := range rows { + taskID := strings.TrimSpace(row.TaskID) + if taskID == "" { + continue + } + if !shouldAutoRequeueUploadRequest(row.ErrorMessage) { + continue + } + if _, ok := active[taskID]; ok { + continue + } + if h.skipRecentTransferReconcile(dc.DeviceID, taskID) { + continue + } + + msg := map[string]interface{}{ + "type": "upload_request", + "task_id": taskID, + "priority": 1, + } + writeTimeout := transferWriteTimeout(h.cfg) + if err := h.hub.SendToConnWithTimeout(context.Background(), dc, msg, writeTimeout); err != nil { + logTransferSendFailure(dc.DeviceID, "upload_request", writeTimeout, err) + continue + } + if _, err := clearOwnedUploadingTaskError(context.Background(), h.db, dc.DeviceID, taskID); err != nil { + logger.Printf("%s failed to clear reconciled upload_request error: err=%v", transferTaskLogPrefix(dc.DeviceID, taskID), err) + } + logger.Printf("%s reconciled upload_request after transfer status", transferTaskLogPrefix(dc.DeviceID, taskID)) + } +} + +func activeTransferUploadTasks(uploads []services.Upload) map[string]struct{} { + active := make(map[string]struct{}, len(uploads)) + for _, upload := range uploads { + taskID := strings.TrimSpace(upload.TaskID) + if taskID == "" { + continue + } + switch strings.ToLower(strings.TrimSpace(upload.Status)) { + case "pending", "active", "retry-wait", "retry_wait", "uploaded_wait_ack": + active[taskID] = struct{}{} + } + } + return active +} + +func shouldAutoRequeueUploadRequest(message string) bool { + msg := strings.ToLower(strings.TrimSpace(message)) + if msg == "" { + return false + } + if strings.Contains(msg, "upload_not_found") || + strings.Contains(msg, "upload file not found") || + strings.Contains(msg, "no mcap file") { + return false + } + return strings.Contains(msg, "upload_request not sent") || + strings.Contains(msg, "upload_request failed") || + strings.Contains(msg, "upload_request timed out") || + strings.Contains(msg, "transfer write timeout") +} + +func (h *TransferHandler) skipRecentTransferReconcile(deviceID, taskID string) bool { + key := strings.TrimSpace(deviceID) + "\x00" + strings.TrimSpace(taskID) + now := time.Now() + h.reconcileMu.Lock() + defer h.reconcileMu.Unlock() + if h.reconcileAttempts == nil { + h.reconcileAttempts = make(map[string]time.Time) + } + if last, ok := h.reconcileAttempts[key]; ok && now.Sub(last) < transferReconcileCooldown { + return true + } + h.reconcileAttempts[key] = now + return false } // ListDevices returns all currently connected devices. @@ -1210,7 +1322,62 @@ func extractIP(remoteAddr string) string { func stringVal(m map[string]interface{}, key string) string { v, _ := m[key].(string) - return v + return strings.TrimSpace(v) +} + +func stringSliceVal(m map[string]interface{}, key string) []string { + raw, ok := m[key].([]interface{}) + if !ok { + return nil + } + values := make([]string, 0, len(raw)) + for _, v := range raw { + s, ok := v.(string) + if !ok { + continue + } + s = strings.TrimSpace(s) + if s != "" { + values = append(values, s) + } + } + return values +} + +func transferUploadsVal(m map[string]interface{}, key string) []services.Upload { + raw, ok := m[key].([]interface{}) + if !ok { + return nil + } + uploads := make([]services.Upload, 0, len(raw)) + for _, item := range raw { + record, ok := item.(map[string]interface{}) + if !ok { + continue + } + taskID := stringVal(record, "task_id") + if taskID == "" { + continue + } + uploads = append(uploads, services.Upload{ + TaskID: taskID, + Status: stringVal(record, "status"), + S3Key: stringVal(record, "s3_key"), + ObjectKey: stringVal(record, "object_key"), + FileSizeBytes: int64Val(record, "file_size_bytes"), + ChecksumSHA256: stringVal(record, "checksum_sha256"), + BytesUploaded: int64Val(record, "bytes_uploaded"), + UploadMode: stringVal(record, "upload_mode"), + RetryCount: intVal(record, "retry_count"), + NextRetryAt: stringVal(record, "next_retry_at"), + LastError: stringVal(record, "last_error"), + CreatedAt: stringVal(record, "created_at"), + UpdatedAt: stringVal(record, "updated_at"), + CompletedAt: stringVal(record, "completed_at"), + DeleteLastError: stringVal(record, "delete_last_error"), + }) + } + return uploads } func intVal(m map[string]interface{}, key string) int { diff --git a/internal/api/handlers/transfer_status_test.go b/internal/api/handlers/transfer_status_test.go new file mode 100644 index 0000000..5518252 --- /dev/null +++ b/internal/api/handlers/transfer_status_test.go @@ -0,0 +1,182 @@ +// SPDX-FileCopyrightText: 2026 ArcheBase +// +// SPDX-License-Identifier: MulanPSL-2.0 + +package handlers + +import ( + "context" + "testing" + "time" + + "archebase.com/keystone-edge/internal/config" + "archebase.com/keystone-edge/internal/services" + "github.com/coder/websocket" + "github.com/coder/websocket/wsjson" +) + +func TestTransferStatusCachesUploadRecords(t *testing.T) { + hub := services.NewTransferHub(10) + dc := hub.NewTransferConn(nil, "robot-001", "127.0.0.1") + handler := NewTransferHandler(hub, &config.TransferConfig{}, nil, nil, "", "", nil, 0) + + handler.onStatus(dc, map[string]interface{}{ + "type": "status", + "data": map[string]interface{}{ + "waiting_ack_task_ids": []interface{}{"task-uploaded"}, + "uploads": []interface{}{ + map[string]interface{}{ + "task_id": "task-uploaded", + "status": "uploaded_wait_ack", + "s3_key": "factory/robot/task-uploaded.mcap", + "object_key": "factory/robot/task-uploaded.mcap", + "file_size_bytes": float64(1234), + "checksum_sha256": "abc123", + "bytes_uploaded": float64(1234), + "upload_mode": "mcap_json", + "retry_count": float64(2), + "next_retry_at": "2026-06-16T00:00:00Z", + "last_error": "previous failure", + "created_at": "2026-06-15T00:00:00Z", + "updated_at": "2026-06-16T00:00:00Z", + "completed_at": "2026-06-16T00:01:00Z", + "delete_last_error": "cleanup pending", + }, + }, + }, + }) + + status := dc.GetStatus() + if len(status.Uploads) != 1 { + t.Fatalf("uploads len=%d want=1: %#v", len(status.Uploads), status.Uploads) + } + got := status.Uploads[0] + if got.TaskID != "task-uploaded" || got.Status != "uploaded_wait_ack" { + t.Fatalf("upload identity/status=%#v", got) + } + if got.S3Key != "factory/robot/task-uploaded.mcap" || got.ObjectKey != "factory/robot/task-uploaded.mcap" { + t.Fatalf("upload object keys=%#v", got) + } + if got.FileSizeBytes != 1234 || got.ChecksumSHA256 != "abc123" || got.BytesUploaded != 1234 { + t.Fatalf("upload file metadata=%#v", got) + } + if got.RetryCount != 2 || got.NextRetryAt == "" || got.LastError == "" || got.DeleteLastError == "" { + t.Fatalf("upload retry metadata=%#v", got) + } +} + +func TestTransferStatusRequeuesUploadRequestAfterPreviousSendFailure(t *testing.T) { + db := newTaskStateRecoveryDB(t) + defer db.Close() + seedTaskStateRecoveryTask(t, db, "task-requeue", "uploading") + if _, err := db.Exec( + `UPDATE tasks SET error_message = ? WHERE task_id = ?`, + "transfer disconnected; upload_request not sent", + "task-requeue", + ); err != nil { + t.Fatalf("seed upload_request error: %v", err) + } + + hub := services.NewTransferHub(10) + serverConn, clientConn := newRecorderHandlerTestWebSocketPair(t) + dc := hub.NewTransferConn(serverConn, "robot-001", "127.0.0.1") + if !hub.Connect("robot-001", dc) { + t.Fatalf("connect transfer failed") + } + handler := NewTransferHandler(hub, &config.TransferConfig{WriteTimeout: 1}, db, nil, "", "", nil, 0) + + handler.onStatus(dc, map[string]interface{}{ + "type": "status", + "data": map[string]interface{}{ + "uploads": []interface{}{}, + }, + }) + + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + var msg map[string]interface{} + if err := wsjson.Read(ctx, clientConn, &msg); err != nil { + t.Fatalf("read requeued upload_request: %v", err) + } + if got := stringVal(msg, "type"); got != "upload_request" { + t.Fatalf("message type=%q want upload_request: %#v", got, msg) + } + if got := stringVal(msg, "task_id"); got != "task-requeue" { + t.Fatalf("task_id=%q want task-requeue: %#v", got, msg) + } +} + +func TestTransferStatusDoesNotRequeueUploadNotFound(t *testing.T) { + db := newTaskStateRecoveryDB(t) + defer db.Close() + seedTaskStateRecoveryTask(t, db, "task-not-found", "uploading") + if _, err := db.Exec( + `UPDATE tasks SET error_message = ? WHERE task_id = ?`, + "No MCAP file matching task-not-found in /data", + "task-not-found", + ); err != nil { + t.Fatalf("seed upload_not_found error: %v", err) + } + + hub := services.NewTransferHub(10) + serverConn, clientConn := newRecorderHandlerTestWebSocketPair(t) + dc := hub.NewTransferConn(serverConn, "robot-001", "127.0.0.1") + if !hub.Connect("robot-001", dc) { + t.Fatalf("connect transfer failed") + } + handler := NewTransferHandler(hub, &config.TransferConfig{WriteTimeout: 1}, db, nil, "", "", nil, 0) + + handler.onStatus(dc, map[string]interface{}{ + "type": "status", + "data": map[string]interface{}{ + "uploads": []interface{}{}, + }, + }) + + assertNoTransferMessage(t, clientConn) +} + +func TestTransferStatusDoesNotRequeueWhenUploadAlreadyActive(t *testing.T) { + db := newTaskStateRecoveryDB(t) + defer db.Close() + seedTaskStateRecoveryTask(t, db, "task-active", "uploading") + if _, err := db.Exec( + `UPDATE tasks SET error_message = ? WHERE task_id = ?`, + "upload_request failed: transfer write timeout", + "task-active", + ); err != nil { + t.Fatalf("seed upload_request error: %v", err) + } + + hub := services.NewTransferHub(10) + serverConn, clientConn := newRecorderHandlerTestWebSocketPair(t) + dc := hub.NewTransferConn(serverConn, "robot-001", "127.0.0.1") + if !hub.Connect("robot-001", dc) { + t.Fatalf("connect transfer failed") + } + handler := NewTransferHandler(hub, &config.TransferConfig{WriteTimeout: 1}, db, nil, "", "", nil, 0) + + handler.onStatus(dc, map[string]interface{}{ + "type": "status", + "data": map[string]interface{}{ + "uploads": []interface{}{ + map[string]interface{}{ + "task_id": "task-active", + "status": "active", + }, + }, + }, + }) + + assertNoTransferMessage(t, clientConn) +} + +func assertNoTransferMessage(t *testing.T, conn *websocket.Conn) { + t.Helper() + ctx, cancel := context.WithTimeout(context.Background(), 150*time.Millisecond) + defer cancel() + var msg map[string]interface{} + if err := wsjson.Read(ctx, conn, &msg); err == nil { + t.Fatalf("unexpected transfer message: %#v", msg) + } +} diff --git a/internal/services/transfer_hub.go b/internal/services/transfer_hub.go index 1a9378d..f2f9bdf 100644 --- a/internal/services/transfer_hub.go +++ b/internal/services/transfer_hub.go @@ -37,16 +37,38 @@ type DeviceEvent struct { type DeviceStatus struct { Version string `json:"version"` PendingCount int `json:"pending_count"` + ActiveCount int `json:"active_count"` UploadingCount int `json:"uploading_count"` + RetryWaitCount int `json:"retry_wait_count"` WaitingACKCount int `json:"waiting_ack_count"` WaitingACKTaskIDs []string `json:"waiting_ack_task_ids"` CompletedCount int `json:"completed_count"` FailedCount int `json:"failed_count"` PendingBytes int64 `json:"pending_bytes"` BytesPerSec int64 `json:"bytes_per_sec"` + Uploads []Upload `json:"uploads"` UpdatedAt time.Time `json:"updated_at"` } +// Upload is one axon_transfer upload_state record reported in a status snapshot. +type Upload struct { + TaskID string `json:"task_id"` + Status string `json:"status"` + S3Key string `json:"s3_key"` + ObjectKey string `json:"object_key"` + FileSizeBytes int64 `json:"file_size_bytes"` + ChecksumSHA256 string `json:"checksum_sha256"` + BytesUploaded int64 `json:"bytes_uploaded"` + UploadMode string `json:"upload_mode"` + RetryCount int `json:"retry_count"` + NextRetryAt string `json:"next_retry_at"` + LastError string `json:"last_error"` + CreatedAt string `json:"created_at"` + UpdatedAt string `json:"updated_at"` + CompletedAt string `json:"completed_at"` + DeleteLastError string `json:"delete_last_error"` +} + // ringBuffer is a fixed-size circular buffer for DeviceEvent type ringBuffer struct { buf []DeviceEvent