diff --git a/docs/designs/single-episode-cloud-sync-progress.zh.html b/docs/designs/single-episode-cloud-sync-progress.zh.html new file mode 100644 index 0000000..236ffc9 --- /dev/null +++ b/docs/designs/single-episode-cloud-sync-progress.zh.html @@ -0,0 +1,868 @@ + + + + + + + 单个 Episode 云同步进度设计 + + + +
+
+
+

Cloud Sync Runtime Progress

+

单个 Episode 云同步进度设计

+

+ 本文档定义 Synapse 在单个 episode 云同步时如何显示真实上传进度,以及 Keystone 如何在不落库的前提下提供运行时百分比。 +

+ +
+ +
+ +
+

1. 背景与范围

+

+ Synapse 数据明细页已经支持对单个 episode 触发云同步。当前交互只会在提交成功后刷新列表,用户能看到 + pendingin_progresscompletedfailed 等状态, + 但看不到正在上传时的百分比进度。 +

+
+
SynapseDataDetails 单条同步
+
KeystoneSyncWorker 调度
+
MinIO读取 episode MCAP
+
OSSmultipart upload
+
GatewayCompleteUpload 确认
+
+
+
+ 目标 +

在单个 episode 进入 in_progress 后,行内显示真实上传百分比和字节数。

+
+
+ 数据来源 +

进度来自 OSS 分片上传成功后的 uploaded_bytes / total_bytes

+
+
+ 状态边界 +

不把云端后处理、索引或 cloud_processed 算入本进度。

+
+
+
+ +
+

2. 非目标

+ +
+ +
+

3. 进度语义

+

进度只在 status = in_progress 时展示。其他状态只显示状态和现有提示。

+ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
状态UI 展示进度语义
not_started未同步无进度
pending已入队 / 排队中无进度
in_progress 且有内存进度百分比、已上传/总大小、小进度条OSS 分片上传成功进度
in_progress 但无内存进度同步中进度未知
completed已同步不显示百分比,完成态代表 100%
failed失败 + 错误摘要不显示失败前百分比
+ +
+
+

百分比规则

+
    +
  • 后端计算 percent,前端不重复计算。
  • +
  • percent = floor(uploaded_bytes * 100 / total_bytes)
  • +
  • in_progress 状态下 percent 最大为 99
  • +
  • 只有 completed 才代表完整 100%。
  • +
  • total_bytes <= 0 时不返回 progress
  • +
+
+
+

行内示例

+
+ 同步中 +
37% · 120.0MB / 320.0MB
+
+
+
+
+
+ +
+

4. 后端设计

+

4.1 Uploader Progress Callback

+

+ internal/cloud 保持为通用上传库,不持有 episode 数字 ID,也不依赖 Keystone API 或数据库。 + cloud.UploadRequest 增加可选 progress callback。 +

+
type UploadProgressFunc func(uploadedBytes int64, totalBytes int64)
+
+type UploadRequest struct {
+    EpisodeID   string
+    McapKey     string
+    AssetID     string
+    RawTags     map[string]string
+    ClientHints map[string]string
+
+    Progress UploadProgressFunc
+}
+

+ Uploader 在每个 OSS multipart part 上传成功后调用回调。回调不能发生在读取 MinIO range 后, + 避免把本地读取误认为云端同步成功。 +

+
offset += int64(n)
+if req.Progress != nil {
+    req.Progress(offset, fileSize)
+}
+ +

4.2 SyncWorker 内存进度

+

SyncWorker 维护进程内 progress map,key 使用数据库 numeric episodes.id

+
type SyncProgressSnapshot struct {
+    UploadedBytes int64
+    TotalBytes    int64
+    UpdatedAt     time.Time
+}
+
+progressMu        sync.RWMutex
+progressByEpisode map[int64]SyncProgressSnapshot
+ + +

4.3 SyncWorker 调用 Uploader

+
return uploader.Upload(ctx, cloud.UploadRequest{
+    EpisodeID: ep.EpisodeUUID,
+    McapKey:   mcapKey,
+    AssetID:   assetID,
+    RawTags:   rawTags,
+    Progress: func(uploadedBytes int64, totalBytes int64) {
+        w.setEpisodeProgress(ep.ID, uploadedBytes, totalBytes)
+    },
+})
+
+ 如果 Keystone 重启,内存 progress 会丢失。数据库可能仍有 in_progress,API 此时返回状态但没有 + progress,前端降级显示“同步中”。 +
+
+ +
+

5. API 设计

+

5.1 单条状态接口扩展

+
GET /api/v1/sync/episodes/:id/status
+

扩展返回 cloud_syncedcloud_synced_atcloud_processed 和可选 progress

+
{
+  "id": 42,
+  "episode_id": 123,
+  "episode_public_id": "EP-20260615-001",
+  "status": "in_progress",
+  "attempt_count": 1,
+  "started_at": "2026-06-15T10:00:00Z",
+  "completed_at": null,
+  "error_message": null,
+  "cloud_synced": false,
+  "cloud_synced_at": null,
+  "cloud_processed": false,
+  "progress": {
+    "uploaded_bytes": 125829120,
+    "total_bytes": 335544320,
+    "percent": 37,
+    "updated_at": "2026-06-15T10:00:12Z"
+      }
+}
+ + +

5.2 新增批量状态接口

+
GET /api/v1/sync/episode-statuses?ids=1,2,3
+

+ 该路径不放在 /sync/episodes/status,是为了避开现有 + /sync/episodes/:id/status/sync/episodes/:id/logs 的动态路由冲突。 +

+ + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
规则行为
ids 为空返回 400
非正整数或格式错误返回 400
超过 50 个 ID返回 400
部分 ID 不存在或已删除返回 200,缺失项写入 errors
syncWorker == nil 或 worker 未运行返回 DB 状态,不返回运行时 progress
+
{
+  "items": [
+    {
+      "id": 42,
+      "episode_id": 1,
+      "episode_public_id": "EP-20260615-001",
+      "status": "in_progress",
+      "attempt_count": 1,
+      "cloud_synced": false,
+      "cloud_synced_at": null,
+      "cloud_processed": false,
+      "progress": {
+        "uploaded_bytes": 125829120,
+        "total_bytes": 335544320,
+        "percent": 37,
+        "updated_at": "2026-06-15T10:00:12Z"
+      }
+    }
+  ],
+  "errors": [
+    {
+      "episode_id": 999,
+      "error": "episode not found"
+    }
+  ]
+}
+
+ 批量接口应使用一次 SQL 查询当前请求 ID 的 episode 云字段和 latest sync log,再按请求 ID 顺序组装响应并合并内存 progress。 +
+
+ +
+

6. 前端设计

+

6.1 API Client

+

synapse/src/api/sync.js 增加批量状态方法。

+
getStatuses: (ids = []) => api.get('/sync/episode-statuses', { ids: ids.join(',') })
+ +

6.2 当前页轮询

+

使用一个全局 interval,不使用每行 timer。只追踪当前页可见的 pendingin_progress 行。

+
const activeSyncEpisodeIds = ref(new Set())
+const syncProgressFailures = ref(new Map())
+let syncProgressTimer = null
+
+
+ 启动点 +

同步提交成功、loadEpisodes() 成功、页面进入时发现活跃行。

+
+
+ 停止点 +

active set 为空、页面卸载、换页筛选、某 ID 连续 3 次失败。

+
+
+ 轮询频率 +

每 1000ms 批量请求一次当前页活跃行状态。

+
+
+
+ 同一轮询器不应并发发起多个状态请求。若实现允许请求重叠,必须使用 request sequence 忽略旧响应,避免旧 + in_progress 覆盖新终态或导致百分比回退。 +
+ +

6.3 行内 Patch

+

返回后只 patch 当前行,不整页刷新。

+
item.sync_status = status.status
+item.latest_sync_log = status
+item.sync_progress = status.progress || null
+item.cloud_synced = status.cloud_synced
+item.cloud_synced_at = status.cloud_synced_at
+item.cloud_processed = status.cloud_processed
+ +

6.4 失败处理

+ +
+ +
+

7. 关键风险与接受的取舍

+ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
取舍影响接受原因
进度只放 Keystone 内存服务重启后进度丢失,只能显示“同步中”。避免 DB 热写和 schema 变更;状态终态仍由 sync_logs 持久化。
假设单 Keystone 进程多副本部署下,请求打到非 worker 实例会拿不到 progress。当前方案面向单实例边缘部署;多实例需要 sticky routing 或共享 progress store。
按 OSS part 成功更新单分片小文件可能从“同步中”直接变成“已同步”,看不到中间百分比。进度语义更准确,不把 MinIO 读取误报为云端上传。
终态后立即删除 progress失败后不保留失败前百分比。UI 只在 in_progress 展示百分比,终态以 DB 状态和错误为准。
终态 DB 写入失败不在本方案修复可能遗留 in_progress 且无 progress。属于 stale sync 状态恢复问题,后续单独处理;本方案只做运行时展示。
+
+ +
+

8. 数据模型

+

不新增 migration。继续使用现有持久化字段保存最终状态和历史。

+ +
+ 完成时 markSyncCompleted 仍把最终 bytes_transferred = result.FileSize 写入 + sync_logs。中间 uploaded_bytes 不写 DB。 +
+
+ +
+

9. 错误和降级

+ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
场景后端行为前端行为
worker 未运行查询接口返回 DB 状态,无 progress显示状态,无百分比
Keystone 重启内存 progress 丢失in_progress 显示“同步中”
上传失败sync_logs.status = failed显示失败和错误,不显示失败前百分比
批量状态部分 ID 不存在200 + errors忽略不存在行或记录 warning
轮询连续失败不影响同步任务停止该 ID 轮询,提示手动刷新
+
+ +
+

10. 测试计划

+
+
+

后端

+
    +
  • internal/cloud:多个 part 成功时 callback 收到正确进度。
  • +
  • internal/cloud:part 上传失败时不推进失败 part 的进度。
  • +
  • internal/services:progress map set/get、完成或失败后立即删除。
  • +
  • internal/api/handlers:单条和批量 status 合并 progress。
  • +
  • internal/api/handlers:missing ID、非法 ids、超过 50 个 ids。
  • +
+
cd keystone
+go test ./internal/cloud ./internal/services ./internal/api/handlers
+
+
+

前端

+
    +
  • syncApi 增加批量 status 方法。
  • +
  • DataDetails.vue 当前页 active rows 轮询。
  • +
  • 行内百分比、字节数和小进度条展示。
  • +
  • 终态只 patch 行,不整页刷新。
  • +
+
cd synapse
+npm run build
+
+
+
+ +
+

11. 实施顺序

+
    +
  1. 后端 cloud.UploadRequest 增加 progress callback,并在 OSS part 成功后触发。
  2. +
  3. 后端 SyncWorker 增加内存 progress map 和生命周期方法。
  4. +
  5. 后端扩展单条 status response,新增批量 status endpoint。
  6. +
  7. 后端补单元测试。
  8. +
  9. 前端 syncApi 增加批量状态方法。
  10. +
  11. 前端 DataDetails.vue 增加当前页 active rows 轮询和行内 progress patch。
  12. +
  13. 前端增加百分比、字节数和小进度条展示。
  14. +
  15. 运行后端相关测试和前端 build。
  16. +
+
+ + +
+ + diff --git a/internal/api/handlers/sync.go b/internal/api/handlers/sync.go index 12f9955..7aa8a83 100644 --- a/internal/api/handlers/sync.go +++ b/internal/api/handlers/sync.go @@ -5,12 +5,14 @@ package handlers import ( + "context" "database/sql" "errors" "fmt" "net/http" "strconv" "strings" + "time" "archebase.com/keystone-edge/internal/logger" "archebase.com/keystone-edge/internal/services" @@ -36,6 +38,7 @@ func (h *SyncHandler) RegisterRoutes(apiV1 *gin.RouterGroup) { apiV1.POST("/sync/episodes/:id", h.TriggerEpisodeSync) apiV1.GET("/sync/episodes", h.ListSyncJobs) apiV1.GET("/sync/episodes/summary", h.ListEpisodeSyncSummaries) + apiV1.GET("/sync/episode-statuses", h.ListSyncStatuses) apiV1.GET("/sync/episodes/:id/logs", h.ListEpisodeSyncLogs) apiV1.GET("/sync/episodes/:id/status", h.GetSyncStatus) apiV1.GET("/sync/config", h.GetSyncConfig) @@ -176,6 +179,26 @@ type syncEpisodeSummaryRow struct { CompletedAt sql.NullTime `db:"completed_at"` } +type syncStatusDBRow struct { + EpisodeID int64 `db:"episode_id"` + EpisodePublicID sql.NullString `db:"episode_public_id"` + CloudSynced bool `db:"cloud_synced"` + CloudProcessed bool `db:"cloud_processed"` + CloudSyncedAt sql.NullTime `db:"cloud_synced_at"` + SyncLogID sql.NullInt64 `db:"sync_log_id"` + SourceFactoryID sql.NullString `db:"source_factory_id"` + SourcePath sql.NullString `db:"source_path"` + DestinationPath sql.NullString `db:"destination_path"` + Status sql.NullString `db:"status"` + BytesTransferred sql.NullInt64 `db:"bytes_transferred"` + DurationSec sql.NullInt64 `db:"duration_sec"` + ErrorMessage sql.NullString `db:"error_message"` + AttemptCount sql.NullInt64 `db:"attempt_count"` + NextRetryAt sql.NullTime `db:"next_retry_at"` + StartedAt sql.NullTime `db:"started_at"` + CompletedAt sql.NullTime `db:"completed_at"` +} + // SyncJobResponse represents a sync job in the API response. type SyncJobResponse struct { ID int64 `json:"id"` @@ -193,6 +216,35 @@ type SyncJobResponse struct { CompletedAt *string `json:"completed_at,omitempty"` } +// SyncProgressResponse represents runtime-only upload progress for an in-progress sync. +type SyncProgressResponse struct { + UploadedBytes int64 `json:"uploaded_bytes"` + TotalBytes int64 `json:"total_bytes"` + Percent int `json:"percent"` + UpdatedAt string `json:"updated_at"` +} + +// EpisodeSyncStatusResponse represents one episode's latest sync status plus runtime progress. +type EpisodeSyncStatusResponse struct { + SyncJobResponse + CloudSynced bool `json:"cloud_synced"` + CloudSyncedAt *string `json:"cloud_synced_at,omitempty"` + CloudProcessed bool `json:"cloud_processed"` + Progress *SyncProgressResponse `json:"progress,omitempty"` +} + +// EpisodeSyncStatusError describes one missing or invalid episode in a batch status request. +type EpisodeSyncStatusError struct { + EpisodeID int64 `json:"episode_id"` + Error string `json:"error"` +} + +// EpisodeSyncStatusListResponse represents batched episode sync status results. +type EpisodeSyncStatusListResponse struct { + Items []EpisodeSyncStatusResponse `json:"items"` + Errors []EpisodeSyncStatusError `json:"errors,omitempty"` +} + // SyncEpisodeSummaryResponse represents an episode-centered sync summary. type SyncEpisodeSummaryResponse struct { ID int64 `json:"id"` @@ -629,6 +681,44 @@ func (h *SyncHandler) ListEpisodeSyncLogs(c *gin.Context) { }) } +// ListSyncStatuses returns sync statuses for multiple episodes. +// +// @Summary List episode sync statuses +// @Description Returns latest sync status for multiple episode numeric IDs, including runtime progress when available +// @Tags sync +// @Produce json +// @Param ids query string true "Comma-separated episode numeric IDs (max 50)" +// @Success 200 {object} EpisodeSyncStatusListResponse +// @Failure 400 {object} map[string]string +// @Router /sync/episode-statuses [get] +func (h *SyncHandler) ListSyncStatuses(c *gin.Context) { + ids, err := parseSyncStatusIDs(c.Query("ids")) + if err != nil { + c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()}) + return + } + + statuses, err := h.loadEpisodeSyncStatuses(c.Request.Context(), ids) + if err != nil { + logger.Printf("[SYNC] Failed to query batch sync statuses: %v", err) + c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to get sync statuses"}) + return + } + + items := make([]EpisodeSyncStatusResponse, 0, len(ids)) + errs := make([]EpisodeSyncStatusError, 0) + for _, id := range ids { + status, ok := statuses[id] + if !ok { + errs = append(errs, EpisodeSyncStatusError{EpisodeID: id, Error: "episode not found"}) + continue + } + items = append(items, status) + } + + c.JSON(http.StatusOK, EpisodeSyncStatusListResponse{Items: items, Errors: errs}) +} + // GetSyncStatus returns the sync status for a specific episode. // // @Summary Get episode sync status @@ -636,7 +726,7 @@ func (h *SyncHandler) ListEpisodeSyncLogs(c *gin.Context) { // @Tags sync // @Produce json // @Param id path int true "Episode ID" -// @Success 200 {object} SyncJobResponse +// @Success 200 {object} EpisodeSyncStatusResponse // @Failure 400 {object} map[string]string // @Failure 404 {object} map[string]string "Episode not found" // @Router /sync/episodes/{id}/status [get] @@ -648,31 +738,71 @@ func (h *SyncHandler) GetSyncStatus(c *gin.Context) { return } - var episode struct { - ID int64 `db:"id"` - PublicID sql.NullString `db:"episode_id"` - } - err = h.db.Get(&episode, ` - SELECT id, episode_id - FROM episodes - WHERE id = ? AND deleted_at IS NULL - `, episodeID) - if err == sql.ErrNoRows { - c.JSON(http.StatusNotFound, gin.H{"error": "episode not found"}) - return - } + statuses, err := h.loadEpisodeSyncStatuses(c.Request.Context(), []int64{episodeID}) if err != nil { logger.Printf("[SYNC] Failed to query episode %d for sync status: %v", episodeID, err) c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to get sync status"}) return } + status, ok := statuses[episodeID] + if !ok { + c.JSON(http.StatusNotFound, gin.H{"error": "episode not found"}) + return + } + + c.JSON(http.StatusOK, status) +} + +const maxBatchSyncStatusIDs = 50 + +func parseSyncStatusIDs(raw string) ([]int64, error) { + raw = strings.TrimSpace(raw) + if raw == "" { + return nil, fmt.Errorf("ids is required") + } + parts := strings.Split(raw, ",") + ids := make([]int64, 0, len(parts)) + seen := make(map[int64]struct{}, len(parts)) + for _, part := range parts { + part = strings.TrimSpace(part) + if part == "" { + return nil, fmt.Errorf("ids must be comma-separated positive integers") + } + id, err := strconv.ParseInt(part, 10, 64) + if err != nil || id <= 0 { + return nil, fmt.Errorf("ids must be comma-separated positive integers") + } + if _, ok := seen[id]; ok { + continue + } + seen[id] = struct{}{} + ids = append(ids, id) + } + if len(ids) == 0 { + return nil, fmt.Errorf("ids is required") + } + if len(ids) > maxBatchSyncStatusIDs { + return nil, fmt.Errorf("ids must contain at most %d values", maxBatchSyncStatusIDs) + } + return ids, nil +} + +func (h *SyncHandler) loadEpisodeSyncStatuses(ctx context.Context, episodeIDs []int64) (map[int64]EpisodeSyncStatusResponse, error) { + out := make(map[int64]EpisodeSyncStatusResponse, len(episodeIDs)) + if len(episodeIDs) == 0 { + return out, nil + } - var row syncLogRow - err = h.db.Get(&row, ` + placeholders, args := int64Placeholders(episodeIDs) + queryArgs := append(append([]interface{}{}, args...), args...) + query := ` SELECT - sl.id, - sl.episode_id, + e.id AS episode_id, e.episode_id AS episode_public_id, + COALESCE(e.cloud_synced, FALSE) AS cloud_synced, + COALESCE(e.cloud_processed, FALSE) AS cloud_processed, + e.cloud_synced_at, + sl.id AS sync_log_id, sl.source_factory_id, sl.source_path, sl.destination_path, @@ -684,29 +814,100 @@ func (h *SyncHandler) GetSyncStatus(c *gin.Context) { sl.next_retry_at, sl.started_at, sl.completed_at - FROM sync_logs sl - LEFT JOIN episodes e ON e.id = sl.episode_id AND e.deleted_at IS NULL - WHERE sl.episode_id = ? - ORDER BY sl.id DESC - LIMIT 1 - `, episodeID) - if err == sql.ErrNoRows { - c.JSON(http.StatusOK, SyncJobResponse{ + FROM episodes e + LEFT JOIN ( + SELECT sl_latest.* + FROM sync_logs sl_latest + INNER JOIN ( + SELECT episode_id, MAX(id) AS latest_id + FROM sync_logs + WHERE episode_id IN (` + placeholders + `) + GROUP BY episode_id + ) latest ON latest.episode_id = sl_latest.episode_id AND latest.latest_id = sl_latest.id + ) sl ON sl.episode_id = e.id + WHERE e.id IN (` + placeholders + `) + AND e.deleted_at IS NULL + ` + + var rows []syncStatusDBRow + // #nosec G201 -- placeholders are generated for integer IDs and values are parameterized. + if err := h.db.SelectContext(ctx, &rows, query, queryArgs...); err != nil { + return nil, err + } + for _, row := range rows { + resp := syncStatusResponseFromRow(row) + h.attachSyncProgress(&resp) + out[row.EpisodeID] = resp + } + return out, nil +} + +func syncStatusResponseFromRow(row syncStatusDBRow) EpisodeSyncStatusResponse { + resp := EpisodeSyncStatusResponse{ + SyncJobResponse: SyncJobResponse{ ID: 0, - EpisodeID: episode.ID, - EpisodePublicID: nullableString(episode.PublicID), + EpisodeID: row.EpisodeID, + EpisodePublicID: nullableString(row.EpisodePublicID), Status: "not_started", AttemptCount: 0, + }, + CloudSynced: row.CloudSynced, + CloudSyncedAt: nullableTime(row.CloudSyncedAt), + CloudProcessed: row.CloudProcessed, + } + if row.SyncLogID.Valid { + attemptCount := 0 + if row.AttemptCount.Valid { + attemptCount = int(row.AttemptCount.Int64) + } + resp.SyncJobResponse = syncJobResponseFromRow(syncLogRow{ + ID: row.SyncLogID.Int64, + EpisodeID: row.EpisodeID, + EpisodePublicID: row.EpisodePublicID, + SourceFactoryID: row.SourceFactoryID, + SourcePath: row.SourcePath, + DestinationPath: row.DestinationPath, + Status: row.Status.String, + BytesTransferred: row.BytesTransferred, + DurationSec: row.DurationSec, + ErrorMessage: row.ErrorMessage, + AttemptCount: attemptCount, + NextRetryAt: row.NextRetryAt, + StartedAt: row.StartedAt, + CompletedAt: row.CompletedAt, }) + } + return resp +} + +func (h *SyncHandler) attachSyncProgress(resp *EpisodeSyncStatusResponse) { + if h == nil || h.syncWorker == nil || resp == nil || resp.Status != "in_progress" { return } - if err != nil { - logger.Printf("[SYNC] Failed to query sync status for episode %d: %v", episodeID, err) - c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to get sync status"}) + progress, ok := h.syncWorker.GetEpisodeProgress(resp.EpisodeID) + if !ok || progress.TotalBytes <= 0 { return } - - c.JSON(http.StatusOK, syncJobResponseFromRow(row)) + uploaded := progress.UploadedBytes + if uploaded < 0 { + uploaded = 0 + } + if uploaded > progress.TotalBytes { + uploaded = progress.TotalBytes + } + percent := int(float64(uploaded) * 100 / float64(progress.TotalBytes)) + if percent < 0 { + percent = 0 + } + if percent > 99 { + percent = 99 + } + resp.Progress = &SyncProgressResponse{ + UploadedBytes: uploaded, + TotalBytes: progress.TotalBytes, + Percent: percent, + UpdatedAt: progress.UpdatedAt.UTC().Format(time.RFC3339), + } } // GetSyncConfig returns the current sync configuration (sanitized). diff --git a/internal/api/handlers/sync_test.go b/internal/api/handlers/sync_test.go index 477b436..57d4d80 100644 --- a/internal/api/handlers/sync_test.go +++ b/internal/api/handlers/sync_test.go @@ -214,6 +214,88 @@ func TestGetSyncStatusReturnsNotFoundWhenEpisodeDoesNotExist(t *testing.T) { } } +func TestListSyncStatusesReturnsItemsInRequestOrderAndErrors(t *testing.T) { + gin.SetMode(gin.TestMode) + db := setupSyncHandlerTestDB(t) + + syncedAt := time.Date(2026, 5, 9, 11, 0, 0, 0, time.UTC) + if _, err := db.Exec(` + INSERT INTO episodes (id, episode_id, cloud_synced, cloud_processed, cloud_synced_at, deleted_at) + VALUES + (1, 'episode-a', FALSE, FALSE, NULL, NULL), + (2, 'episode-b', TRUE, FALSE, ?, NULL) + `, syncedAt); err != nil { + t.Fatalf("insert episodes: %v", err) + } + if _, err := db.Exec(` + INSERT INTO sync_logs (id, episode_id, status, attempt_count, started_at, completed_at, bytes_transferred) + VALUES + (10, 1, 'pending', 0, ?, NULL, NULL), + (11, 2, 'completed', 1, ?, ?, 4096) + `, syncedAt.Add(-time.Hour), syncedAt.Add(-2*time.Hour), syncedAt); err != nil { + t.Fatalf("insert sync logs: %v", err) + } + + router := gin.New() + handler := NewSyncHandler(db, nil) + handler.RegisterRoutes(router.Group("/api/v1")) + + req := httptest.NewRequest(http.MethodGet, "/api/v1/sync/episode-statuses?ids=2,999,1", nil) + rec := httptest.NewRecorder() + router.ServeHTTP(rec, req) + + if rec.Code != http.StatusOK { + t.Fatalf("status = %d, body = %s", rec.Code, rec.Body.String()) + } + + var got EpisodeSyncStatusListResponse + if err := json.Unmarshal(rec.Body.Bytes(), &got); err != nil { + t.Fatalf("decode response: %v", err) + } + if len(got.Items) != 2 { + t.Fatalf("items len = %d, want 2", len(got.Items)) + } + if got.Items[0].EpisodeID != 2 || got.Items[0].Status != "completed" { + t.Fatalf("first item = %+v, want episode 2 completed", got.Items[0]) + } + if !got.Items[0].CloudSynced || got.Items[0].CloudSyncedAt == nil { + t.Fatalf("first item cloud fields = synced %t at %v, want synced with timestamp", got.Items[0].CloudSynced, got.Items[0].CloudSyncedAt) + } + if got.Items[1].EpisodeID != 1 || got.Items[1].Status != "pending" { + t.Fatalf("second item = %+v, want episode 1 pending", got.Items[1]) + } + if len(got.Errors) != 1 || got.Errors[0].EpisodeID != 999 || got.Errors[0].Error != "episode not found" { + t.Fatalf("errors = %+v, want missing episode 999", got.Errors) + } +} + +func TestListSyncStatusesRejectsInvalidIDs(t *testing.T) { + gin.SetMode(gin.TestMode) + db := setupSyncHandlerTestDB(t) + router := gin.New() + handler := NewSyncHandler(db, nil) + handler.RegisterRoutes(router.Group("/api/v1")) + + tests := []string{ + "/api/v1/sync/episode-statuses", + "/api/v1/sync/episode-statuses?ids=", + "/api/v1/sync/episode-statuses?ids=1,,2", + "/api/v1/sync/episode-statuses?ids=0", + "/api/v1/sync/episode-statuses?ids=abc", + } + for _, path := range tests { + t.Run(path, func(t *testing.T) { + req := httptest.NewRequest(http.MethodGet, path, nil) + rec := httptest.NewRecorder() + router.ServeHTTP(rec, req) + + if rec.Code != http.StatusBadRequest { + t.Fatalf("status = %d, body = %s", rec.Code, rec.Body.String()) + } + }) + } +} + func setupSyncHandlerTestDB(t *testing.T) *sqlx.DB { t.Helper() @@ -227,6 +309,9 @@ func setupSyncHandlerTestDB(t *testing.T) *sqlx.DB { `CREATE TABLE episodes ( id INTEGER PRIMARY KEY, episode_id TEXT, + cloud_synced BOOLEAN DEFAULT FALSE, + cloud_processed BOOLEAN DEFAULT FALSE, + cloud_synced_at TIMESTAMP NULL, deleted_at TIMESTAMP NULL )`, `CREATE TABLE sync_logs ( diff --git a/internal/cloud/uploader.go b/internal/cloud/uploader.go index 8d746da..0a518f7 100644 --- a/internal/cloud/uploader.go +++ b/internal/cloud/uploader.go @@ -37,6 +37,9 @@ type UploaderConfig struct { MaxRestartCount uint32 } +// UploadProgressFunc is called after bytes are uploaded successfully. +type UploadProgressFunc func(uploadedBytes int64, totalBytes int64) + // UploadRequest describes an episode upload from MinIO to cloud. type UploadRequest struct { // EpisodeID is the unique episode identifier used as client hint. @@ -49,6 +52,8 @@ type UploadRequest struct { RawTags map[string]string // ClientHints are passed to CreateLogicalUpload for server-side routing. ClientHints map[string]string + // Progress is called after each OSS multipart part is uploaded successfully. + Progress UploadProgressFunc } // UploadResult describes the outcome of a successful cloud upload. @@ -603,7 +608,7 @@ func (u *Uploader) uploadParts(ctx context.Context, req UploadRequest, session * // connection is not left idle during OSS part uploads. A single streaming // response would risk idle connection timeout (~20-25s on MinIO or network // intermediaries) when upload speed is slow. - session, parts, partMD5s, err := u.streamMultipartParts(ctx, req.EpisodeID, session, multipartUploadID, fileSize, fixedPartSizeBytes, u.minioRangeReader(req.McapKey)) + session, parts, partMD5s, err := u.streamMultipartParts(ctx, req.EpisodeID, session, multipartUploadID, fileSize, fixedPartSizeBytes, u.minioRangeReader(req.McapKey), req.Progress) if err != nil { u.abortMultipartUpload(session, multipartUploadID) return nil, "", nil, nil, err @@ -625,7 +630,7 @@ func (u *Uploader) uploadParts(ctx context.Context, req UploadRequest, session * return session, multipartUploadID, parts, partMD5s, nil } -func (u *Uploader) streamMultipartParts(ctx context.Context, episodeID string, session *UploadSession, multipartUploadID string, fileSize int64, partSizeBytes int64, newPartStream partStreamFactory) (*UploadSession, []UploadedPart, [][16]byte, error) { +func (u *Uploader) streamMultipartParts(ctx context.Context, episodeID string, session *UploadSession, multipartUploadID string, fileSize int64, partSizeBytes int64, newPartStream partStreamFactory, progressFns ...UploadProgressFunc) (*UploadSession, []UploadedPart, [][16]byte, error) { partSizeBytes = normalizedPartSizeBytes(partSizeBytes) session.PartSizeBytes = partSizeBytes partSize := int(partSizeBytes) @@ -633,6 +638,11 @@ func (u *Uploader) streamMultipartParts(ctx context.Context, episodeID string, s return session, nil, nil, fmt.Errorf("invalid part_size_bytes %d", partSizeBytes) } + var progress UploadProgressFunc + if len(progressFns) > 0 { + progress = progressFns[0] + } + buf := make([]byte, partSize) var parts []UploadedPart var partMD5s [][16]byte @@ -697,6 +707,9 @@ func (u *Uploader) streamMultipartParts(ctx context.Context, episodeID string, s offset += int64(n) partNumber++ + if progress != nil { + progress(offset, fileSize) + } logger.Printf("[CLOUD-UPLOAD] Progress: episode=%s parts=%d offset=%d/%d", episodeID, len(parts), offset, fileSize) diff --git a/internal/cloud/uploader_test.go b/internal/cloud/uploader_test.go index 99e3432..0296081 100644 --- a/internal/cloud/uploader_test.go +++ b/internal/cloud/uploader_test.go @@ -692,6 +692,101 @@ func TestStreamMultipartParts_UploadsExpectedPartBoundaries(t *testing.T) { } } +func TestStreamMultipartParts_ReportsProgressAfterSuccessfulParts(t *testing.T) { + oss := &fakeOSS{ + uploadPartFn: func(_ context.Context, _ *UploadSession, _ string, _ int, _ []byte) (string, error) { + return "etag", nil + }, + } + u := newDecideResumeUploader("", &fakeGateway{}, oss) + session := makeSession("logical-progress", "upload-progress") + session.PartSizeBytes = 4 + + payload := []byte("abcdefghijkl") + factory := func(_ context.Context, offset, length int64) (io.ReadCloser, error) { + return io.NopCloser(bytes.NewReader(payload[offset : offset+length])), nil + } + + var progress []struct { + uploaded int64 + total int64 + } + _, parts, _, err := u.streamMultipartParts( + context.Background(), + "episode-progress", + session, + "multipart-progress", + int64(len(payload)), + session.PartSizeBytes, + factory, + func(uploadedBytes int64, totalBytes int64) { + progress = append(progress, struct { + uploaded int64 + total int64 + }{uploaded: uploadedBytes, total: totalBytes}) + }, + ) + if err != nil { + t.Fatalf("streamMultipartParts() error = %v", err) + } + if len(parts) != 3 { + t.Fatalf("uploaded part count = %d, want 3", len(parts)) + } + want := []struct { + uploaded int64 + total int64 + }{{4, 12}, {8, 12}, {12, 12}} + if len(progress) != len(want) { + t.Fatalf("progress count = %d, want %d (%v)", len(progress), len(want), progress) + } + for i := range want { + if progress[i] != want[i] { + t.Fatalf("progress[%d] = %+v, want %+v", i, progress[i], want[i]) + } + } +} + +func TestStreamMultipartParts_DoesNotReportFailedPartProgress(t *testing.T) { + var uploadPartCalls int + oss := &fakeOSS{ + uploadPartFn: func(_ context.Context, _ *UploadSession, _ string, _ int, _ []byte) (string, error) { + uploadPartCalls++ + if uploadPartCalls == 2 { + return "", errors.New("upload failed") + } + return "etag", nil + }, + } + u := newDecideResumeUploader("", &fakeGateway{}, oss) + session := makeSession("logical-progress-fail", "upload-progress-fail") + session.PartSizeBytes = 4 + + payload := []byte("abcdefghijkl") + factory := func(_ context.Context, offset, length int64) (io.ReadCloser, error) { + return io.NopCloser(bytes.NewReader(payload[offset : offset+length])), nil + } + + var progress []int64 + _, _, _, err := u.streamMultipartParts( + context.Background(), + "episode-progress-fail", + session, + "multipart-progress-fail", + int64(len(payload)), + session.PartSizeBytes, + factory, + func(uploadedBytes int64, _ int64) { + progress = append(progress, uploadedBytes) + }, + ) + if err == nil { + t.Fatal("expected upload error, got nil") + } + if len(progress) != 1 || progress[0] != 4 { + t.Fatalf("progress = %v, want only first successful part", progress) + } +} + func TestStreamMultipartParts_EarlyEOFStopsInsteadOfUploadingEmptyParts(t *testing.T) { var uploadedPartNumbers []int oss := &fakeOSS{ diff --git a/internal/services/sync_worker.go b/internal/services/sync_worker.go index 373b25a..6eaa50c 100644 --- a/internal/services/sync_worker.go +++ b/internal/services/sync_worker.go @@ -56,6 +56,13 @@ type syncEpisodeUploadRow struct { OrganizationID sql.NullInt64 `db:"organization_id"` } +// SyncProgressSnapshot is the latest in-memory progress for an active episode sync. +type SyncProgressSnapshot struct { + UploadedBytes int64 + TotalBytes int64 + UpdatedAt time.Time +} + // SyncWorker is a background goroutine that processes queued cloud sync work // and optionally discovers approved episodes for automatic cloud upload. type SyncWorker struct { @@ -70,6 +77,9 @@ type SyncWorker struct { enqueuedEpisode map[int64]struct{} stopDone chan struct{} + progressMu sync.RWMutex + progressByEpisode map[int64]SyncProgressSnapshot + running atomic.Bool stopping atomic.Bool wg sync.WaitGroup @@ -105,15 +115,58 @@ var ( // NewSyncWorker creates a new sync worker. Call Start() to begin background processing. func NewSyncWorker(db *sqlx.DB, uploader *cloud.Uploader, minioClient *s3.Client, minioBucket string, cfg SyncWorkerConfig, syncCfg *config.SyncConfig) *SyncWorker { return &SyncWorker{ - db: db, - uploader: uploader, - minioClient: minioClient, - minioBucket: minioBucket, - cfg: cfg, - syncCfg: syncCfg, - enqueueCh: make(chan syncEnqueueRequest, 100), - enqueuedEpisode: make(map[int64]struct{}), + db: db, + uploader: uploader, + minioClient: minioClient, + minioBucket: minioBucket, + cfg: cfg, + syncCfg: syncCfg, + enqueueCh: make(chan syncEnqueueRequest, 100), + enqueuedEpisode: make(map[int64]struct{}), + progressByEpisode: make(map[int64]SyncProgressSnapshot), + } +} + +func (w *SyncWorker) setEpisodeProgress(episodeID int64, uploadedBytes int64, totalBytes int64) { + if w == nil { + return + } + if uploadedBytes < 0 { + uploadedBytes = 0 + } + if totalBytes < 0 { + totalBytes = 0 + } + w.progressMu.Lock() + defer w.progressMu.Unlock() + if w.progressByEpisode == nil { + w.progressByEpisode = make(map[int64]SyncProgressSnapshot) + } + w.progressByEpisode[episodeID] = SyncProgressSnapshot{ + UploadedBytes: uploadedBytes, + TotalBytes: totalBytes, + UpdatedAt: time.Now().UTC(), + } +} + +func (w *SyncWorker) finishEpisodeProgress(episodeID int64) { + if w == nil { + return + } + w.progressMu.Lock() + defer w.progressMu.Unlock() + delete(w.progressByEpisode, episodeID) +} + +// GetEpisodeProgress returns the current in-memory upload progress for an episode. +func (w *SyncWorker) GetEpisodeProgress(episodeID int64) (SyncProgressSnapshot, bool) { + if w == nil { + return SyncProgressSnapshot{}, false } + w.progressMu.RLock() + defer w.progressMu.RUnlock() + progress, ok := w.progressByEpisode[episodeID] + return progress, ok } // Start begins the background sync worker loop. @@ -891,12 +944,14 @@ func (w *SyncWorker) processEpisodeWithMode(ctx context.Context, episodeID int64 if err != nil { duration := int64(time.Since(startTime).Seconds()) w.markSyncFailed(ctx, syncLogID, episodeID, duration, err, attemptCount) + w.finishEpisodeProgress(episodeID) return } // Success: update episode and sync_log duration := int64(time.Since(startTime).Seconds()) w.markSyncCompleted(ctx, syncLogID, episodeID, result, duration) + w.finishEpisodeProgress(episodeID) } func (w *SyncWorker) uploadEpisodeDirect(ctx context.Context, ep syncEpisodeUploadRow) (*cloud.UploadResult, error) { @@ -951,6 +1006,9 @@ func (w *SyncWorker) uploadEpisodeDirect(ctx context.Context, ep syncEpisodeUplo McapKey: mcapKey, AssetID: assetID, RawTags: rawTags, + Progress: func(uploadedBytes int64, totalBytes int64) { + w.setEpisodeProgress(ep.ID, uploadedBytes, totalBytes) + }, }) } diff --git a/internal/services/sync_worker_test.go b/internal/services/sync_worker_test.go index d5d26d6..d4472a2 100644 --- a/internal/services/sync_worker_test.go +++ b/internal/services/sync_worker_test.go @@ -109,6 +109,40 @@ func TestEnqueueEpisode_AllowsReenqueueAfterProcessing(t *testing.T) { } } +func TestSyncWorkerEpisodeProgressSetGetAndFinish(t *testing.T) { + w := NewSyncWorker(nil, nil, nil, "", SyncWorkerConfig{}, nil) + + if _, ok := w.GetEpisodeProgress(42); ok { + t.Fatal("GetEpisodeProgress before set ok = true, want false") + } + + w.setEpisodeProgress(42, 12, 100) + progress, ok := w.GetEpisodeProgress(42) + if !ok { + t.Fatal("GetEpisodeProgress after set ok = false, want true") + } + if progress.UploadedBytes != 12 || progress.TotalBytes != 100 { + t.Fatalf("progress = %+v, want uploaded=12 total=100", progress) + } + if progress.UpdatedAt.IsZero() { + t.Fatal("progress UpdatedAt is zero") + } + + w.setEpisodeProgress(42, 150, 200) + progress, ok = w.GetEpisodeProgress(42) + if !ok { + t.Fatal("GetEpisodeProgress after overwrite ok = false, want true") + } + if progress.UploadedBytes != 150 || progress.TotalBytes != 200 { + t.Fatalf("overwritten progress = %+v, want uploaded=150 total=200", progress) + } + + w.finishEpisodeProgress(42) + if _, ok := w.GetEpisodeProgress(42); ok { + t.Fatal("GetEpisodeProgress after finish ok = true, want false") + } +} + func TestFindPendingEpisodes_ExcludesExhaustedFailuresFromPollingOnly(t *testing.T) { db := newTestSyncWorkerDB(t) w := &SyncWorker{db: db, cfg: SyncWorkerConfig{BatchSize: 10, MaxRetries: 3}}