diff --git a/docs/designs/single-episode-cloud-sync-progress.zh.html b/docs/designs/single-episode-cloud-sync-progress.zh.html
new file mode 100644
index 0000000..236ffc9
--- /dev/null
+++ b/docs/designs/single-episode-cloud-sync-progress.zh.html
@@ -0,0 +1,868 @@
+
+
+
+
+
+
+
+
+ 1. 背景与范围
+
+ Synapse 数据明细页已经支持对单个 episode 触发云同步。当前交互只会在提交成功后刷新列表,用户能看到
+ pending、in_progress、completed、failed 等状态,
+ 但看不到正在上传时的百分比进度。
+
+
+
SynapseDataDetails 单条同步
+
KeystoneSyncWorker 调度
+
MinIO读取 episode MCAP
+
OSSmultipart upload
+
GatewayCompleteUpload 确认
+
+
+
+
目标
+
在单个 episode 进入 in_progress 后,行内显示真实上传百分比和字节数。
+
+
+
数据来源
+
进度来自 OSS 分片上传成功后的 uploaded_bytes / total_bytes。
+
+
+
状态边界
+
不把云端后处理、索引或 cloud_processed 算入本进度。
+
+
+
+
+
+ 2. 非目标
+
+ - 不新增数据库字段,不写入中间进度到
sync_logs。
+ - 不实现 SSE 或 WebSocket 推送。
+ - 不展示 bulk sync 整体进度。
+ - 不在同步历史抽屉中显示运行时百分比。
+ - 不处理服务重启后遗留的 stale
in_progress 修复。
+ - 不改变录制完成后的 Axon Transfer 上传链路。
+
+
+
+
+ 3. 进度语义
+ 进度只在 status = in_progress 时展示。其他状态只显示状态和现有提示。
+
+
+
+ | 状态 |
+ UI 展示 |
+ 进度语义 |
+
+
+
+
+ not_started |
+ 未同步 |
+ 无进度 |
+
+
+ pending |
+ 已入队 / 排队中 |
+ 无进度 |
+
+
+ in_progress 且有内存进度 |
+ 百分比、已上传/总大小、小进度条 |
+ OSS 分片上传成功进度 |
+
+
+ in_progress 但无内存进度 |
+ 同步中 |
+ 进度未知 |
+
+
+ completed |
+ 已同步 |
+ 不显示百分比,完成态代表 100% |
+
+
+ failed |
+ 失败 + 错误摘要 |
+ 不显示失败前百分比 |
+
+
+
+
+
+
+
百分比规则
+
+ - 后端计算
percent,前端不重复计算。
+ percent = floor(uploaded_bytes * 100 / total_bytes)。
+ in_progress 状态下 percent 最大为 99。
+ - 只有
completed 才代表完整 100%。
+ total_bytes <= 0 时不返回 progress。
+
+
+
+
行内示例
+
+
同步中
+
37% · 120.0MB / 320.0MB
+
+
+
+
+
+
+
+ 4. 后端设计
+ 4.1 Uploader Progress Callback
+
+ internal/cloud 保持为通用上传库,不持有 episode 数字 ID,也不依赖 Keystone API 或数据库。
+ cloud.UploadRequest 增加可选 progress callback。
+
+ type UploadProgressFunc func(uploadedBytes int64, totalBytes int64)
+
+type UploadRequest struct {
+ EpisodeID string
+ McapKey string
+ AssetID string
+ RawTags map[string]string
+ ClientHints map[string]string
+
+ Progress UploadProgressFunc
+}
+
+ Uploader 在每个 OSS multipart part 上传成功后调用回调。回调不能发生在读取 MinIO range 后,
+ 避免把本地读取误认为云端同步成功。
+
+ offset += int64(n)
+if req.Progress != nil {
+ req.Progress(offset, fileSize)
+}
+
+ 4.2 SyncWorker 内存进度
+ SyncWorker 维护进程内 progress map,key 使用数据库 numeric episodes.id。
+ type SyncProgressSnapshot struct {
+ UploadedBytes int64
+ TotalBytes int64
+ UpdatedAt time.Time
+}
+
+progressMu sync.RWMutex
+progressByEpisode map[int64]SyncProgressSnapshot
+
+ setEpisodeProgress 写入或覆盖快照,UpdatedAt = now。
+ markSyncCompleted 和 markSyncFailed 后调用 finishEpisodeProgress。
+ finishEpisodeProgress 直接删除该 episode 的 progress 快照。
+ - API 只在 DB 状态仍是
in_progress 时尝试合并内存 progress。
+
+
+ 4.3 SyncWorker 调用 Uploader
+ return uploader.Upload(ctx, cloud.UploadRequest{
+ EpisodeID: ep.EpisodeUUID,
+ McapKey: mcapKey,
+ AssetID: assetID,
+ RawTags: rawTags,
+ Progress: func(uploadedBytes int64, totalBytes int64) {
+ w.setEpisodeProgress(ep.ID, uploadedBytes, totalBytes)
+ },
+})
+
+ 如果 Keystone 重启,内存 progress 会丢失。数据库可能仍有 in_progress,API 此时返回状态但没有
+ progress,前端降级显示“同步中”。
+
+
+
+
+ 5. API 设计
+ 5.1 单条状态接口扩展
+ GET /api/v1/sync/episodes/:id/status
+ 扩展返回 cloud_synced、cloud_synced_at、cloud_processed 和可选 progress。
+ {
+ "id": 42,
+ "episode_id": 123,
+ "episode_public_id": "EP-20260615-001",
+ "status": "in_progress",
+ "attempt_count": 1,
+ "started_at": "2026-06-15T10:00:00Z",
+ "completed_at": null,
+ "error_message": null,
+ "cloud_synced": false,
+ "cloud_synced_at": null,
+ "cloud_processed": false,
+ "progress": {
+ "uploaded_bytes": 125829120,
+ "total_bytes": 335544320,
+ "percent": 37,
+ "updated_at": "2026-06-15T10:00:12Z"
+ }
+}
+
+ id 保持现有含义:latest sync_logs.id。
+ - 保留现有
episode_id 语义:它是 numeric episode ID。
+ episode_public_id 表示业务 episode 字符串 ID。
+ progress 只在 status = in_progress 且内存快照有效时返回。
+
+
+ 5.2 新增批量状态接口
+ GET /api/v1/sync/episode-statuses?ids=1,2,3
+
+ 该路径不放在 /sync/episodes/status,是为了避开现有
+ /sync/episodes/:id/status 和 /sync/episodes/:id/logs 的动态路由冲突。
+
+
+
+
+ | 规则 |
+ 行为 |
+
+
+
+
+ ids 为空 |
+ 返回 400 |
+
+
+ | 非正整数或格式错误 |
+ 返回 400 |
+
+
+ | 超过 50 个 ID |
+ 返回 400 |
+
+
+ | 部分 ID 不存在或已删除 |
+ 返回 200,缺失项写入 errors |
+
+
+ syncWorker == nil 或 worker 未运行 |
+ 返回 DB 状态,不返回运行时 progress |
+
+
+
+ {
+ "items": [
+ {
+ "id": 42,
+ "episode_id": 1,
+ "episode_public_id": "EP-20260615-001",
+ "status": "in_progress",
+ "attempt_count": 1,
+ "cloud_synced": false,
+ "cloud_synced_at": null,
+ "cloud_processed": false,
+ "progress": {
+ "uploaded_bytes": 125829120,
+ "total_bytes": 335544320,
+ "percent": 37,
+ "updated_at": "2026-06-15T10:00:12Z"
+ }
+ }
+ ],
+ "errors": [
+ {
+ "episode_id": 999,
+ "error": "episode not found"
+ }
+ ]
+}
+
+ 批量接口应使用一次 SQL 查询当前请求 ID 的 episode 云字段和 latest sync log,再按请求 ID 顺序组装响应并合并内存 progress。
+
+
+
+
+ 6. 前端设计
+ 6.1 API Client
+ synapse/src/api/sync.js 增加批量状态方法。
+ getStatuses: (ids = []) => api.get('/sync/episode-statuses', { ids: ids.join(',') })
+
+ 6.2 当前页轮询
+ 使用一个全局 interval,不使用每行 timer。只追踪当前页可见的 pending 和 in_progress 行。
+ const activeSyncEpisodeIds = ref(new Set())
+const syncProgressFailures = ref(new Map())
+let syncProgressTimer = null
+
+
+
启动点
+
同步提交成功、loadEpisodes() 成功、页面进入时发现活跃行。
+
+
+
停止点
+
active set 为空、页面卸载、换页筛选、某 ID 连续 3 次失败。
+
+
+
轮询频率
+
每 1000ms 批量请求一次当前页活跃行状态。
+
+
+
+ 同一轮询器不应并发发起多个状态请求。若实现允许请求重叠,必须使用 request sequence 忽略旧响应,避免旧
+ in_progress 覆盖新终态或导致百分比回退。
+
+
+ 6.3 行内 Patch
+ 返回后只 patch 当前行,不整页刷新。
+ item.sync_status = status.status
+item.latest_sync_log = status
+item.sync_progress = status.progress || null
+item.cloud_synced = status.cloud_synced
+item.cloud_synced_at = status.cloud_synced_at
+item.cloud_processed = status.cloud_processed
+
+ 6.4 失败处理
+
+ - 单次轮询失败只
console.warn。
+ - 连续 3 次失败后停止该 ID 的轮询。
+ - 不把 episode 标为同步失败,真正失败只能以后端
sync_logs.status = failed 为准。
+ - 行内 hint 可显示“进度刷新失败,请手动刷新”。
+
+
+
+
+ 7. 关键风险与接受的取舍
+
+
+
+ | 取舍 |
+ 影响 |
+ 接受原因 |
+
+
+
+
+ | 进度只放 Keystone 内存 |
+ 服务重启后进度丢失,只能显示“同步中”。 |
+ 避免 DB 热写和 schema 变更;状态终态仍由 sync_logs 持久化。 |
+
+
+ | 假设单 Keystone 进程 |
+ 多副本部署下,请求打到非 worker 实例会拿不到 progress。 |
+ 当前方案面向单实例边缘部署;多实例需要 sticky routing 或共享 progress store。 |
+
+
+ | 按 OSS part 成功更新 |
+ 单分片小文件可能从“同步中”直接变成“已同步”,看不到中间百分比。 |
+ 进度语义更准确,不把 MinIO 读取误报为云端上传。 |
+
+
+ | 终态后立即删除 progress |
+ 失败后不保留失败前百分比。 |
+ UI 只在 in_progress 展示百分比,终态以 DB 状态和错误为准。 |
+
+
+ | 终态 DB 写入失败不在本方案修复 |
+ 可能遗留 in_progress 且无 progress。 |
+ 属于 stale sync 状态恢复问题,后续单独处理;本方案只做运行时展示。 |
+
+
+
+
+
+
+ 8. 数据模型
+ 不新增 migration。继续使用现有持久化字段保存最终状态和历史。
+
+ sync_logs.status
+ sync_logs.bytes_transferred
+ sync_logs.duration_sec
+ sync_logs.error_message
+ sync_logs.started_at
+ sync_logs.completed_at
+ episodes.cloud_synced
+ episodes.cloud_synced_at
+ episodes.cloud_processed
+
+
+ 完成时 markSyncCompleted 仍把最终 bytes_transferred = result.FileSize 写入
+ sync_logs。中间 uploaded_bytes 不写 DB。
+
+
+
+
+ 9. 错误和降级
+
+
+
+ | 场景 |
+ 后端行为 |
+ 前端行为 |
+
+
+
+
+ | worker 未运行 |
+ 查询接口返回 DB 状态,无 progress |
+ 显示状态,无百分比 |
+
+
+ | Keystone 重启 |
+ 内存 progress 丢失 |
+ in_progress 显示“同步中” |
+
+
+ | 上传失败 |
+ sync_logs.status = failed |
+ 显示失败和错误,不显示失败前百分比 |
+
+
+ | 批量状态部分 ID 不存在 |
+ 200 + errors |
+ 忽略不存在行或记录 warning |
+
+
+ | 轮询连续失败 |
+ 不影响同步任务 |
+ 停止该 ID 轮询,提示手动刷新 |
+
+
+
+
+
+
+ 10. 测试计划
+
+
+
后端
+
+ internal/cloud:多个 part 成功时 callback 收到正确进度。
+ internal/cloud:part 上传失败时不推进失败 part 的进度。
+ internal/services:progress map set/get、完成或失败后立即删除。
+ internal/api/handlers:单条和批量 status 合并 progress。
+ internal/api/handlers:missing ID、非法 ids、超过 50 个 ids。
+
+
cd keystone
+go test ./internal/cloud ./internal/services ./internal/api/handlers
+
+
+
前端
+
+ syncApi 增加批量 status 方法。
+ DataDetails.vue 当前页 active rows 轮询。
+ - 行内百分比、字节数和小进度条展示。
+ - 终态只 patch 行,不整页刷新。
+
+
cd synapse
+npm run build
+
+
+
+
+
+ 11. 实施顺序
+
+ - 后端
cloud.UploadRequest 增加 progress callback,并在 OSS part 成功后触发。
+ - 后端
SyncWorker 增加内存 progress map 和生命周期方法。
+ - 后端扩展单条 status response,新增批量 status endpoint。
+ - 后端补单元测试。
+ - 前端
syncApi 增加批量状态方法。
+ - 前端
DataDetails.vue 增加当前页 active rows 轮询和行内 progress patch。
+ - 前端增加百分比、字节数和小进度条展示。
+ - 运行后端相关测试和前端 build。
+
+
+
+
+
+
+
diff --git a/internal/api/handlers/sync.go b/internal/api/handlers/sync.go
index 12f9955..7aa8a83 100644
--- a/internal/api/handlers/sync.go
+++ b/internal/api/handlers/sync.go
@@ -5,12 +5,14 @@
package handlers
import (
+ "context"
"database/sql"
"errors"
"fmt"
"net/http"
"strconv"
"strings"
+ "time"
"archebase.com/keystone-edge/internal/logger"
"archebase.com/keystone-edge/internal/services"
@@ -36,6 +38,7 @@ func (h *SyncHandler) RegisterRoutes(apiV1 *gin.RouterGroup) {
apiV1.POST("/sync/episodes/:id", h.TriggerEpisodeSync)
apiV1.GET("/sync/episodes", h.ListSyncJobs)
apiV1.GET("/sync/episodes/summary", h.ListEpisodeSyncSummaries)
+ apiV1.GET("/sync/episode-statuses", h.ListSyncStatuses)
apiV1.GET("/sync/episodes/:id/logs", h.ListEpisodeSyncLogs)
apiV1.GET("/sync/episodes/:id/status", h.GetSyncStatus)
apiV1.GET("/sync/config", h.GetSyncConfig)
@@ -176,6 +179,26 @@ type syncEpisodeSummaryRow struct {
CompletedAt sql.NullTime `db:"completed_at"`
}
+type syncStatusDBRow struct {
+ EpisodeID int64 `db:"episode_id"`
+ EpisodePublicID sql.NullString `db:"episode_public_id"`
+ CloudSynced bool `db:"cloud_synced"`
+ CloudProcessed bool `db:"cloud_processed"`
+ CloudSyncedAt sql.NullTime `db:"cloud_synced_at"`
+ SyncLogID sql.NullInt64 `db:"sync_log_id"`
+ SourceFactoryID sql.NullString `db:"source_factory_id"`
+ SourcePath sql.NullString `db:"source_path"`
+ DestinationPath sql.NullString `db:"destination_path"`
+ Status sql.NullString `db:"status"`
+ BytesTransferred sql.NullInt64 `db:"bytes_transferred"`
+ DurationSec sql.NullInt64 `db:"duration_sec"`
+ ErrorMessage sql.NullString `db:"error_message"`
+ AttemptCount sql.NullInt64 `db:"attempt_count"`
+ NextRetryAt sql.NullTime `db:"next_retry_at"`
+ StartedAt sql.NullTime `db:"started_at"`
+ CompletedAt sql.NullTime `db:"completed_at"`
+}
+
// SyncJobResponse represents a sync job in the API response.
type SyncJobResponse struct {
ID int64 `json:"id"`
@@ -193,6 +216,35 @@ type SyncJobResponse struct {
CompletedAt *string `json:"completed_at,omitempty"`
}
+// SyncProgressResponse represents runtime-only upload progress for an in-progress sync.
+type SyncProgressResponse struct {
+ UploadedBytes int64 `json:"uploaded_bytes"`
+ TotalBytes int64 `json:"total_bytes"`
+ Percent int `json:"percent"`
+ UpdatedAt string `json:"updated_at"`
+}
+
+// EpisodeSyncStatusResponse represents one episode's latest sync status plus runtime progress.
+type EpisodeSyncStatusResponse struct {
+ SyncJobResponse
+ CloudSynced bool `json:"cloud_synced"`
+ CloudSyncedAt *string `json:"cloud_synced_at,omitempty"`
+ CloudProcessed bool `json:"cloud_processed"`
+ Progress *SyncProgressResponse `json:"progress,omitempty"`
+}
+
+// EpisodeSyncStatusError describes one missing or invalid episode in a batch status request.
+type EpisodeSyncStatusError struct {
+ EpisodeID int64 `json:"episode_id"`
+ Error string `json:"error"`
+}
+
+// EpisodeSyncStatusListResponse represents batched episode sync status results.
+type EpisodeSyncStatusListResponse struct {
+ Items []EpisodeSyncStatusResponse `json:"items"`
+ Errors []EpisodeSyncStatusError `json:"errors,omitempty"`
+}
+
// SyncEpisodeSummaryResponse represents an episode-centered sync summary.
type SyncEpisodeSummaryResponse struct {
ID int64 `json:"id"`
@@ -629,6 +681,44 @@ func (h *SyncHandler) ListEpisodeSyncLogs(c *gin.Context) {
})
}
+// ListSyncStatuses returns sync statuses for multiple episodes.
+//
+// @Summary List episode sync statuses
+// @Description Returns latest sync status for multiple episode numeric IDs, including runtime progress when available
+// @Tags sync
+// @Produce json
+// @Param ids query string true "Comma-separated episode numeric IDs (max 50)"
+// @Success 200 {object} EpisodeSyncStatusListResponse
+// @Failure 400 {object} map[string]string
+// @Router /sync/episode-statuses [get]
+func (h *SyncHandler) ListSyncStatuses(c *gin.Context) {
+ ids, err := parseSyncStatusIDs(c.Query("ids"))
+ if err != nil {
+ c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
+ return
+ }
+
+ statuses, err := h.loadEpisodeSyncStatuses(c.Request.Context(), ids)
+ if err != nil {
+ logger.Printf("[SYNC] Failed to query batch sync statuses: %v", err)
+ c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to get sync statuses"})
+ return
+ }
+
+ items := make([]EpisodeSyncStatusResponse, 0, len(ids))
+ errs := make([]EpisodeSyncStatusError, 0)
+ for _, id := range ids {
+ status, ok := statuses[id]
+ if !ok {
+ errs = append(errs, EpisodeSyncStatusError{EpisodeID: id, Error: "episode not found"})
+ continue
+ }
+ items = append(items, status)
+ }
+
+ c.JSON(http.StatusOK, EpisodeSyncStatusListResponse{Items: items, Errors: errs})
+}
+
// GetSyncStatus returns the sync status for a specific episode.
//
// @Summary Get episode sync status
@@ -636,7 +726,7 @@ func (h *SyncHandler) ListEpisodeSyncLogs(c *gin.Context) {
// @Tags sync
// @Produce json
// @Param id path int true "Episode ID"
-// @Success 200 {object} SyncJobResponse
+// @Success 200 {object} EpisodeSyncStatusResponse
// @Failure 400 {object} map[string]string
// @Failure 404 {object} map[string]string "Episode not found"
// @Router /sync/episodes/{id}/status [get]
@@ -648,31 +738,71 @@ func (h *SyncHandler) GetSyncStatus(c *gin.Context) {
return
}
- var episode struct {
- ID int64 `db:"id"`
- PublicID sql.NullString `db:"episode_id"`
- }
- err = h.db.Get(&episode, `
- SELECT id, episode_id
- FROM episodes
- WHERE id = ? AND deleted_at IS NULL
- `, episodeID)
- if err == sql.ErrNoRows {
- c.JSON(http.StatusNotFound, gin.H{"error": "episode not found"})
- return
- }
+ statuses, err := h.loadEpisodeSyncStatuses(c.Request.Context(), []int64{episodeID})
if err != nil {
logger.Printf("[SYNC] Failed to query episode %d for sync status: %v", episodeID, err)
c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to get sync status"})
return
}
+ status, ok := statuses[episodeID]
+ if !ok {
+ c.JSON(http.StatusNotFound, gin.H{"error": "episode not found"})
+ return
+ }
+
+ c.JSON(http.StatusOK, status)
+}
+
+const maxBatchSyncStatusIDs = 50
+
+func parseSyncStatusIDs(raw string) ([]int64, error) {
+ raw = strings.TrimSpace(raw)
+ if raw == "" {
+ return nil, fmt.Errorf("ids is required")
+ }
+ parts := strings.Split(raw, ",")
+ ids := make([]int64, 0, len(parts))
+ seen := make(map[int64]struct{}, len(parts))
+ for _, part := range parts {
+ part = strings.TrimSpace(part)
+ if part == "" {
+ return nil, fmt.Errorf("ids must be comma-separated positive integers")
+ }
+ id, err := strconv.ParseInt(part, 10, 64)
+ if err != nil || id <= 0 {
+ return nil, fmt.Errorf("ids must be comma-separated positive integers")
+ }
+ if _, ok := seen[id]; ok {
+ continue
+ }
+ seen[id] = struct{}{}
+ ids = append(ids, id)
+ }
+ if len(ids) == 0 {
+ return nil, fmt.Errorf("ids is required")
+ }
+ if len(ids) > maxBatchSyncStatusIDs {
+ return nil, fmt.Errorf("ids must contain at most %d values", maxBatchSyncStatusIDs)
+ }
+ return ids, nil
+}
+
+func (h *SyncHandler) loadEpisodeSyncStatuses(ctx context.Context, episodeIDs []int64) (map[int64]EpisodeSyncStatusResponse, error) {
+ out := make(map[int64]EpisodeSyncStatusResponse, len(episodeIDs))
+ if len(episodeIDs) == 0 {
+ return out, nil
+ }
- var row syncLogRow
- err = h.db.Get(&row, `
+ placeholders, args := int64Placeholders(episodeIDs)
+ queryArgs := append(append([]interface{}{}, args...), args...)
+ query := `
SELECT
- sl.id,
- sl.episode_id,
+ e.id AS episode_id,
e.episode_id AS episode_public_id,
+ COALESCE(e.cloud_synced, FALSE) AS cloud_synced,
+ COALESCE(e.cloud_processed, FALSE) AS cloud_processed,
+ e.cloud_synced_at,
+ sl.id AS sync_log_id,
sl.source_factory_id,
sl.source_path,
sl.destination_path,
@@ -684,29 +814,100 @@ func (h *SyncHandler) GetSyncStatus(c *gin.Context) {
sl.next_retry_at,
sl.started_at,
sl.completed_at
- FROM sync_logs sl
- LEFT JOIN episodes e ON e.id = sl.episode_id AND e.deleted_at IS NULL
- WHERE sl.episode_id = ?
- ORDER BY sl.id DESC
- LIMIT 1
- `, episodeID)
- if err == sql.ErrNoRows {
- c.JSON(http.StatusOK, SyncJobResponse{
+ FROM episodes e
+ LEFT JOIN (
+ SELECT sl_latest.*
+ FROM sync_logs sl_latest
+ INNER JOIN (
+ SELECT episode_id, MAX(id) AS latest_id
+ FROM sync_logs
+ WHERE episode_id IN (` + placeholders + `)
+ GROUP BY episode_id
+ ) latest ON latest.episode_id = sl_latest.episode_id AND latest.latest_id = sl_latest.id
+ ) sl ON sl.episode_id = e.id
+ WHERE e.id IN (` + placeholders + `)
+ AND e.deleted_at IS NULL
+ `
+
+ var rows []syncStatusDBRow
+ // #nosec G201 -- placeholders are generated for integer IDs and values are parameterized.
+ if err := h.db.SelectContext(ctx, &rows, query, queryArgs...); err != nil {
+ return nil, err
+ }
+ for _, row := range rows {
+ resp := syncStatusResponseFromRow(row)
+ h.attachSyncProgress(&resp)
+ out[row.EpisodeID] = resp
+ }
+ return out, nil
+}
+
+func syncStatusResponseFromRow(row syncStatusDBRow) EpisodeSyncStatusResponse {
+ resp := EpisodeSyncStatusResponse{
+ SyncJobResponse: SyncJobResponse{
ID: 0,
- EpisodeID: episode.ID,
- EpisodePublicID: nullableString(episode.PublicID),
+ EpisodeID: row.EpisodeID,
+ EpisodePublicID: nullableString(row.EpisodePublicID),
Status: "not_started",
AttemptCount: 0,
+ },
+ CloudSynced: row.CloudSynced,
+ CloudSyncedAt: nullableTime(row.CloudSyncedAt),
+ CloudProcessed: row.CloudProcessed,
+ }
+ if row.SyncLogID.Valid {
+ attemptCount := 0
+ if row.AttemptCount.Valid {
+ attemptCount = int(row.AttemptCount.Int64)
+ }
+ resp.SyncJobResponse = syncJobResponseFromRow(syncLogRow{
+ ID: row.SyncLogID.Int64,
+ EpisodeID: row.EpisodeID,
+ EpisodePublicID: row.EpisodePublicID,
+ SourceFactoryID: row.SourceFactoryID,
+ SourcePath: row.SourcePath,
+ DestinationPath: row.DestinationPath,
+ Status: row.Status.String,
+ BytesTransferred: row.BytesTransferred,
+ DurationSec: row.DurationSec,
+ ErrorMessage: row.ErrorMessage,
+ AttemptCount: attemptCount,
+ NextRetryAt: row.NextRetryAt,
+ StartedAt: row.StartedAt,
+ CompletedAt: row.CompletedAt,
})
+ }
+ return resp
+}
+
+func (h *SyncHandler) attachSyncProgress(resp *EpisodeSyncStatusResponse) {
+ if h == nil || h.syncWorker == nil || resp == nil || resp.Status != "in_progress" {
return
}
- if err != nil {
- logger.Printf("[SYNC] Failed to query sync status for episode %d: %v", episodeID, err)
- c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to get sync status"})
+ progress, ok := h.syncWorker.GetEpisodeProgress(resp.EpisodeID)
+ if !ok || progress.TotalBytes <= 0 {
return
}
-
- c.JSON(http.StatusOK, syncJobResponseFromRow(row))
+ uploaded := progress.UploadedBytes
+ if uploaded < 0 {
+ uploaded = 0
+ }
+ if uploaded > progress.TotalBytes {
+ uploaded = progress.TotalBytes
+ }
+ percent := int(float64(uploaded) * 100 / float64(progress.TotalBytes))
+ if percent < 0 {
+ percent = 0
+ }
+ if percent > 99 {
+ percent = 99
+ }
+ resp.Progress = &SyncProgressResponse{
+ UploadedBytes: uploaded,
+ TotalBytes: progress.TotalBytes,
+ Percent: percent,
+ UpdatedAt: progress.UpdatedAt.UTC().Format(time.RFC3339),
+ }
}
// GetSyncConfig returns the current sync configuration (sanitized).
diff --git a/internal/api/handlers/sync_test.go b/internal/api/handlers/sync_test.go
index 477b436..57d4d80 100644
--- a/internal/api/handlers/sync_test.go
+++ b/internal/api/handlers/sync_test.go
@@ -214,6 +214,88 @@ func TestGetSyncStatusReturnsNotFoundWhenEpisodeDoesNotExist(t *testing.T) {
}
}
+func TestListSyncStatusesReturnsItemsInRequestOrderAndErrors(t *testing.T) {
+ gin.SetMode(gin.TestMode)
+ db := setupSyncHandlerTestDB(t)
+
+ syncedAt := time.Date(2026, 5, 9, 11, 0, 0, 0, time.UTC)
+ if _, err := db.Exec(`
+ INSERT INTO episodes (id, episode_id, cloud_synced, cloud_processed, cloud_synced_at, deleted_at)
+ VALUES
+ (1, 'episode-a', FALSE, FALSE, NULL, NULL),
+ (2, 'episode-b', TRUE, FALSE, ?, NULL)
+ `, syncedAt); err != nil {
+ t.Fatalf("insert episodes: %v", err)
+ }
+ if _, err := db.Exec(`
+ INSERT INTO sync_logs (id, episode_id, status, attempt_count, started_at, completed_at, bytes_transferred)
+ VALUES
+ (10, 1, 'pending', 0, ?, NULL, NULL),
+ (11, 2, 'completed', 1, ?, ?, 4096)
+ `, syncedAt.Add(-time.Hour), syncedAt.Add(-2*time.Hour), syncedAt); err != nil {
+ t.Fatalf("insert sync logs: %v", err)
+ }
+
+ router := gin.New()
+ handler := NewSyncHandler(db, nil)
+ handler.RegisterRoutes(router.Group("/api/v1"))
+
+ req := httptest.NewRequest(http.MethodGet, "/api/v1/sync/episode-statuses?ids=2,999,1", nil)
+ rec := httptest.NewRecorder()
+ router.ServeHTTP(rec, req)
+
+ if rec.Code != http.StatusOK {
+ t.Fatalf("status = %d, body = %s", rec.Code, rec.Body.String())
+ }
+
+ var got EpisodeSyncStatusListResponse
+ if err := json.Unmarshal(rec.Body.Bytes(), &got); err != nil {
+ t.Fatalf("decode response: %v", err)
+ }
+ if len(got.Items) != 2 {
+ t.Fatalf("items len = %d, want 2", len(got.Items))
+ }
+ if got.Items[0].EpisodeID != 2 || got.Items[0].Status != "completed" {
+ t.Fatalf("first item = %+v, want episode 2 completed", got.Items[0])
+ }
+ if !got.Items[0].CloudSynced || got.Items[0].CloudSyncedAt == nil {
+ t.Fatalf("first item cloud fields = synced %t at %v, want synced with timestamp", got.Items[0].CloudSynced, got.Items[0].CloudSyncedAt)
+ }
+ if got.Items[1].EpisodeID != 1 || got.Items[1].Status != "pending" {
+ t.Fatalf("second item = %+v, want episode 1 pending", got.Items[1])
+ }
+ if len(got.Errors) != 1 || got.Errors[0].EpisodeID != 999 || got.Errors[0].Error != "episode not found" {
+ t.Fatalf("errors = %+v, want missing episode 999", got.Errors)
+ }
+}
+
+func TestListSyncStatusesRejectsInvalidIDs(t *testing.T) {
+ gin.SetMode(gin.TestMode)
+ db := setupSyncHandlerTestDB(t)
+ router := gin.New()
+ handler := NewSyncHandler(db, nil)
+ handler.RegisterRoutes(router.Group("/api/v1"))
+
+ tests := []string{
+ "/api/v1/sync/episode-statuses",
+ "/api/v1/sync/episode-statuses?ids=",
+ "/api/v1/sync/episode-statuses?ids=1,,2",
+ "/api/v1/sync/episode-statuses?ids=0",
+ "/api/v1/sync/episode-statuses?ids=abc",
+ }
+ for _, path := range tests {
+ t.Run(path, func(t *testing.T) {
+ req := httptest.NewRequest(http.MethodGet, path, nil)
+ rec := httptest.NewRecorder()
+ router.ServeHTTP(rec, req)
+
+ if rec.Code != http.StatusBadRequest {
+ t.Fatalf("status = %d, body = %s", rec.Code, rec.Body.String())
+ }
+ })
+ }
+}
+
func setupSyncHandlerTestDB(t *testing.T) *sqlx.DB {
t.Helper()
@@ -227,6 +309,9 @@ func setupSyncHandlerTestDB(t *testing.T) *sqlx.DB {
`CREATE TABLE episodes (
id INTEGER PRIMARY KEY,
episode_id TEXT,
+ cloud_synced BOOLEAN DEFAULT FALSE,
+ cloud_processed BOOLEAN DEFAULT FALSE,
+ cloud_synced_at TIMESTAMP NULL,
deleted_at TIMESTAMP NULL
)`,
`CREATE TABLE sync_logs (
diff --git a/internal/cloud/uploader.go b/internal/cloud/uploader.go
index 8d746da..0a518f7 100644
--- a/internal/cloud/uploader.go
+++ b/internal/cloud/uploader.go
@@ -37,6 +37,9 @@ type UploaderConfig struct {
MaxRestartCount uint32
}
+// UploadProgressFunc is called after bytes are uploaded successfully.
+type UploadProgressFunc func(uploadedBytes int64, totalBytes int64)
+
// UploadRequest describes an episode upload from MinIO to cloud.
type UploadRequest struct {
// EpisodeID is the unique episode identifier used as client hint.
@@ -49,6 +52,8 @@ type UploadRequest struct {
RawTags map[string]string
// ClientHints are passed to CreateLogicalUpload for server-side routing.
ClientHints map[string]string
+ // Progress is called after each OSS multipart part is uploaded successfully.
+ Progress UploadProgressFunc
}
// UploadResult describes the outcome of a successful cloud upload.
@@ -603,7 +608,7 @@ func (u *Uploader) uploadParts(ctx context.Context, req UploadRequest, session *
// connection is not left idle during OSS part uploads. A single streaming
// response would risk idle connection timeout (~20-25s on MinIO or network
// intermediaries) when upload speed is slow.
- session, parts, partMD5s, err := u.streamMultipartParts(ctx, req.EpisodeID, session, multipartUploadID, fileSize, fixedPartSizeBytes, u.minioRangeReader(req.McapKey))
+ session, parts, partMD5s, err := u.streamMultipartParts(ctx, req.EpisodeID, session, multipartUploadID, fileSize, fixedPartSizeBytes, u.minioRangeReader(req.McapKey), req.Progress)
if err != nil {
u.abortMultipartUpload(session, multipartUploadID)
return nil, "", nil, nil, err
@@ -625,7 +630,7 @@ func (u *Uploader) uploadParts(ctx context.Context, req UploadRequest, session *
return session, multipartUploadID, parts, partMD5s, nil
}
-func (u *Uploader) streamMultipartParts(ctx context.Context, episodeID string, session *UploadSession, multipartUploadID string, fileSize int64, partSizeBytes int64, newPartStream partStreamFactory) (*UploadSession, []UploadedPart, [][16]byte, error) {
+func (u *Uploader) streamMultipartParts(ctx context.Context, episodeID string, session *UploadSession, multipartUploadID string, fileSize int64, partSizeBytes int64, newPartStream partStreamFactory, progressFns ...UploadProgressFunc) (*UploadSession, []UploadedPart, [][16]byte, error) {
partSizeBytes = normalizedPartSizeBytes(partSizeBytes)
session.PartSizeBytes = partSizeBytes
partSize := int(partSizeBytes)
@@ -633,6 +638,11 @@ func (u *Uploader) streamMultipartParts(ctx context.Context, episodeID string, s
return session, nil, nil, fmt.Errorf("invalid part_size_bytes %d", partSizeBytes)
}
+ var progress UploadProgressFunc
+ if len(progressFns) > 0 {
+ progress = progressFns[0]
+ }
+
buf := make([]byte, partSize)
var parts []UploadedPart
var partMD5s [][16]byte
@@ -697,6 +707,9 @@ func (u *Uploader) streamMultipartParts(ctx context.Context, episodeID string, s
offset += int64(n)
partNumber++
+ if progress != nil {
+ progress(offset, fileSize)
+ }
logger.Printf("[CLOUD-UPLOAD] Progress: episode=%s parts=%d offset=%d/%d",
episodeID, len(parts), offset, fileSize)
diff --git a/internal/cloud/uploader_test.go b/internal/cloud/uploader_test.go
index 99e3432..0296081 100644
--- a/internal/cloud/uploader_test.go
+++ b/internal/cloud/uploader_test.go
@@ -692,6 +692,101 @@ func TestStreamMultipartParts_UploadsExpectedPartBoundaries(t *testing.T) {
}
}
+func TestStreamMultipartParts_ReportsProgressAfterSuccessfulParts(t *testing.T) {
+ oss := &fakeOSS{
+ uploadPartFn: func(_ context.Context, _ *UploadSession, _ string, _ int, _ []byte) (string, error) {
+ return "etag", nil
+ },
+ }
+ u := newDecideResumeUploader("", &fakeGateway{}, oss)
+ session := makeSession("logical-progress", "upload-progress")
+ session.PartSizeBytes = 4
+
+ payload := []byte("abcdefghijkl")
+ factory := func(_ context.Context, offset, length int64) (io.ReadCloser, error) {
+ return io.NopCloser(bytes.NewReader(payload[offset : offset+length])), nil
+ }
+
+ var progress []struct {
+ uploaded int64
+ total int64
+ }
+ _, parts, _, err := u.streamMultipartParts(
+ context.Background(),
+ "episode-progress",
+ session,
+ "multipart-progress",
+ int64(len(payload)),
+ session.PartSizeBytes,
+ factory,
+ func(uploadedBytes int64, totalBytes int64) {
+ progress = append(progress, struct {
+ uploaded int64
+ total int64
+ }{uploaded: uploadedBytes, total: totalBytes})
+ },
+ )
+ if err != nil {
+ t.Fatalf("streamMultipartParts() error = %v", err)
+ }
+ if len(parts) != 3 {
+ t.Fatalf("uploaded part count = %d, want 3", len(parts))
+ }
+ want := []struct {
+ uploaded int64
+ total int64
+ }{{4, 12}, {8, 12}, {12, 12}}
+ if len(progress) != len(want) {
+ t.Fatalf("progress count = %d, want %d (%v)", len(progress), len(want), progress)
+ }
+ for i := range want {
+ if progress[i] != want[i] {
+ t.Fatalf("progress[%d] = %+v, want %+v", i, progress[i], want[i])
+ }
+ }
+}
+
+func TestStreamMultipartParts_DoesNotReportFailedPartProgress(t *testing.T) {
+ var uploadPartCalls int
+ oss := &fakeOSS{
+ uploadPartFn: func(_ context.Context, _ *UploadSession, _ string, _ int, _ []byte) (string, error) {
+ uploadPartCalls++
+ if uploadPartCalls == 2 {
+ return "", errors.New("upload failed")
+ }
+ return "etag", nil
+ },
+ }
+ u := newDecideResumeUploader("", &fakeGateway{}, oss)
+ session := makeSession("logical-progress-fail", "upload-progress-fail")
+ session.PartSizeBytes = 4
+
+ payload := []byte("abcdefghijkl")
+ factory := func(_ context.Context, offset, length int64) (io.ReadCloser, error) {
+ return io.NopCloser(bytes.NewReader(payload[offset : offset+length])), nil
+ }
+
+ var progress []int64
+ _, _, _, err := u.streamMultipartParts(
+ context.Background(),
+ "episode-progress-fail",
+ session,
+ "multipart-progress-fail",
+ int64(len(payload)),
+ session.PartSizeBytes,
+ factory,
+ func(uploadedBytes int64, _ int64) {
+ progress = append(progress, uploadedBytes)
+ },
+ )
+ if err == nil {
+ t.Fatal("expected upload error, got nil")
+ }
+ if len(progress) != 1 || progress[0] != 4 {
+ t.Fatalf("progress = %v, want only first successful part", progress)
+ }
+}
+
func TestStreamMultipartParts_EarlyEOFStopsInsteadOfUploadingEmptyParts(t *testing.T) {
var uploadedPartNumbers []int
oss := &fakeOSS{
diff --git a/internal/services/sync_worker.go b/internal/services/sync_worker.go
index 373b25a..6eaa50c 100644
--- a/internal/services/sync_worker.go
+++ b/internal/services/sync_worker.go
@@ -56,6 +56,13 @@ type syncEpisodeUploadRow struct {
OrganizationID sql.NullInt64 `db:"organization_id"`
}
+// SyncProgressSnapshot is the latest in-memory progress for an active episode sync.
+type SyncProgressSnapshot struct {
+ UploadedBytes int64
+ TotalBytes int64
+ UpdatedAt time.Time
+}
+
// SyncWorker is a background goroutine that processes queued cloud sync work
// and optionally discovers approved episodes for automatic cloud upload.
type SyncWorker struct {
@@ -70,6 +77,9 @@ type SyncWorker struct {
enqueuedEpisode map[int64]struct{}
stopDone chan struct{}
+ progressMu sync.RWMutex
+ progressByEpisode map[int64]SyncProgressSnapshot
+
running atomic.Bool
stopping atomic.Bool
wg sync.WaitGroup
@@ -105,15 +115,58 @@ var (
// NewSyncWorker creates a new sync worker. Call Start() to begin background processing.
func NewSyncWorker(db *sqlx.DB, uploader *cloud.Uploader, minioClient *s3.Client, minioBucket string, cfg SyncWorkerConfig, syncCfg *config.SyncConfig) *SyncWorker {
return &SyncWorker{
- db: db,
- uploader: uploader,
- minioClient: minioClient,
- minioBucket: minioBucket,
- cfg: cfg,
- syncCfg: syncCfg,
- enqueueCh: make(chan syncEnqueueRequest, 100),
- enqueuedEpisode: make(map[int64]struct{}),
+ db: db,
+ uploader: uploader,
+ minioClient: minioClient,
+ minioBucket: minioBucket,
+ cfg: cfg,
+ syncCfg: syncCfg,
+ enqueueCh: make(chan syncEnqueueRequest, 100),
+ enqueuedEpisode: make(map[int64]struct{}),
+ progressByEpisode: make(map[int64]SyncProgressSnapshot),
+ }
+}
+
+func (w *SyncWorker) setEpisodeProgress(episodeID int64, uploadedBytes int64, totalBytes int64) {
+ if w == nil {
+ return
+ }
+ if uploadedBytes < 0 {
+ uploadedBytes = 0
+ }
+ if totalBytes < 0 {
+ totalBytes = 0
+ }
+ w.progressMu.Lock()
+ defer w.progressMu.Unlock()
+ if w.progressByEpisode == nil {
+ w.progressByEpisode = make(map[int64]SyncProgressSnapshot)
+ }
+ w.progressByEpisode[episodeID] = SyncProgressSnapshot{
+ UploadedBytes: uploadedBytes,
+ TotalBytes: totalBytes,
+ UpdatedAt: time.Now().UTC(),
+ }
+}
+
+func (w *SyncWorker) finishEpisodeProgress(episodeID int64) {
+ if w == nil {
+ return
+ }
+ w.progressMu.Lock()
+ defer w.progressMu.Unlock()
+ delete(w.progressByEpisode, episodeID)
+}
+
+// GetEpisodeProgress returns the current in-memory upload progress for an episode.
+func (w *SyncWorker) GetEpisodeProgress(episodeID int64) (SyncProgressSnapshot, bool) {
+ if w == nil {
+ return SyncProgressSnapshot{}, false
}
+ w.progressMu.RLock()
+ defer w.progressMu.RUnlock()
+ progress, ok := w.progressByEpisode[episodeID]
+ return progress, ok
}
// Start begins the background sync worker loop.
@@ -891,12 +944,14 @@ func (w *SyncWorker) processEpisodeWithMode(ctx context.Context, episodeID int64
if err != nil {
duration := int64(time.Since(startTime).Seconds())
w.markSyncFailed(ctx, syncLogID, episodeID, duration, err, attemptCount)
+ w.finishEpisodeProgress(episodeID)
return
}
// Success: update episode and sync_log
duration := int64(time.Since(startTime).Seconds())
w.markSyncCompleted(ctx, syncLogID, episodeID, result, duration)
+ w.finishEpisodeProgress(episodeID)
}
func (w *SyncWorker) uploadEpisodeDirect(ctx context.Context, ep syncEpisodeUploadRow) (*cloud.UploadResult, error) {
@@ -951,6 +1006,9 @@ func (w *SyncWorker) uploadEpisodeDirect(ctx context.Context, ep syncEpisodeUplo
McapKey: mcapKey,
AssetID: assetID,
RawTags: rawTags,
+ Progress: func(uploadedBytes int64, totalBytes int64) {
+ w.setEpisodeProgress(ep.ID, uploadedBytes, totalBytes)
+ },
})
}
diff --git a/internal/services/sync_worker_test.go b/internal/services/sync_worker_test.go
index d5d26d6..d4472a2 100644
--- a/internal/services/sync_worker_test.go
+++ b/internal/services/sync_worker_test.go
@@ -109,6 +109,40 @@ func TestEnqueueEpisode_AllowsReenqueueAfterProcessing(t *testing.T) {
}
}
+func TestSyncWorkerEpisodeProgressSetGetAndFinish(t *testing.T) {
+ w := NewSyncWorker(nil, nil, nil, "", SyncWorkerConfig{}, nil)
+
+ if _, ok := w.GetEpisodeProgress(42); ok {
+ t.Fatal("GetEpisodeProgress before set ok = true, want false")
+ }
+
+ w.setEpisodeProgress(42, 12, 100)
+ progress, ok := w.GetEpisodeProgress(42)
+ if !ok {
+ t.Fatal("GetEpisodeProgress after set ok = false, want true")
+ }
+ if progress.UploadedBytes != 12 || progress.TotalBytes != 100 {
+ t.Fatalf("progress = %+v, want uploaded=12 total=100", progress)
+ }
+ if progress.UpdatedAt.IsZero() {
+ t.Fatal("progress UpdatedAt is zero")
+ }
+
+ w.setEpisodeProgress(42, 150, 200)
+ progress, ok = w.GetEpisodeProgress(42)
+ if !ok {
+ t.Fatal("GetEpisodeProgress after overwrite ok = false, want true")
+ }
+ if progress.UploadedBytes != 150 || progress.TotalBytes != 200 {
+ t.Fatalf("overwritten progress = %+v, want uploaded=150 total=200", progress)
+ }
+
+ w.finishEpisodeProgress(42)
+ if _, ok := w.GetEpisodeProgress(42); ok {
+ t.Fatal("GetEpisodeProgress after finish ok = true, want false")
+ }
+}
+
func TestFindPendingEpisodes_ExcludesExhaustedFailuresFromPollingOnly(t *testing.T) {
db := newTestSyncWorkerDB(t)
w := &SyncWorker{db: db, cfg: SyncWorkerConfig{BatchSize: 10, MaxRetries: 3}}