@@ -11,17 +11,19 @@ import (
1111 "github.com/gorilla/websocket"
1212 "github.com/olahol/melody"
1313 r "github.com/redis/go-redis/v9"
14- "github.com/scienceol/studio/service/internal/config"
1514 "github.com/scienceol/studio/service/pkg/common"
1615 "github.com/scienceol/studio/service/pkg/common/code"
1716 "github.com/scienceol/studio/service/pkg/common/constant"
1817 "github.com/scienceol/studio/service/pkg/common/uuid"
1918 "github.com/scienceol/studio/service/pkg/core/notify"
2019 "github.com/scienceol/studio/service/pkg/core/notify/events"
20+ "github.com/scienceol/studio/service/pkg/core/schedule/edge"
2121 "github.com/scienceol/studio/service/pkg/core/schedule/engine"
2222 actionEngine "github.com/scienceol/studio/service/pkg/core/schedule/engine/action"
23+ "github.com/scienceol/studio/service/pkg/middleware/auth"
2324 "github.com/scienceol/studio/service/pkg/middleware/logger"
2425 "github.com/scienceol/studio/service/pkg/middleware/redis"
26+ "github.com/scienceol/studio/service/pkg/utils"
2527)
2628
2729const (
@@ -89,6 +91,12 @@ func (h *Handle) RunAction(ctx *gin.Context) {
8991 return
9092 }
9193
94+ userInfo := auth .GetCurrentUser (ctx )
95+ if userInfo == nil {
96+ common .ReplyErr (ctx , code .UnLogin )
97+ return
98+ }
99+
92100 // 打印当前时间
93101 now := time .Now ()
94102 logger .Infof (ctx , "RunAction request received at: %s" , now .Format (time .RFC3339 ))
@@ -98,6 +106,23 @@ func (h *Handle) RunAction(ctx *gin.Context) {
98106 req .UUID = uuid .NewV4 ()
99107 }
100108
109+ if exists , err := h .rClient .Exists (ctx , utils .LabHeartName (req .LabUUID )).Result (); err != nil || exists == 0 {
110+ common .ReplyErr (ctx , code .EdgeNotStartedErr )
111+ return
112+ }
113+
114+ data := edge.ApiControlData [engine.WorkflowInfo ]{
115+ ApiControlMsg : edge.ApiControlMsg {
116+ Action : edge .StartAction ,
117+ },
118+ Data : engine.WorkflowInfo {
119+ TaskUUID : req .UUID ,
120+ WorkflowUUID : req .UUID ,
121+ LabUUID : req .LabUUID ,
122+ UserID : userInfo .ID ,
123+ },
124+ }
125+
101126 // 将请求数据存储到 Redis
102127 paramKey := actionEngine .ActionKey (req .UUID )
103128 reqData , err := json .Marshal (req )
@@ -107,24 +132,16 @@ func (h *Handle) RunAction(ctx *gin.Context) {
107132 return
108133 }
109134
110- ret := h .rClient .SetEx (ctx , paramKey , reqData , 1 * time .Hour )
135+ ret := h .rClient .SetEx (ctx , paramKey , reqData , 24 * time .Hour )
111136 if ret .Err () != nil {
112137 logger .Errorf (ctx , "set action param to redis err: %+v" , ret .Err ())
113138 common .ReplyErr (ctx , code .RPCHttpErr .WithErr (ret .Err ()))
114139 return
115140 }
116141
117142 // 发送任务到队列
118- conf := config .Global ().Job
119- jobInfo := engine.WorkflowInfo {
120- Action : engine .StartAction ,
121- TaskUUID : req .UUID ,
122- LabUUID : req .LabUUID ,
123- UserID : "manual" , // 手动触发
124- }
125-
126- jobData , _ := json .Marshal (jobInfo )
127- pushRet := h .rClient .LPush (ctx , conf .JobQueueName , jobData )
143+ jobData , _ := json .Marshal (data )
144+ pushRet := h .rClient .LPush (ctx , utils .LabControlName (req .LabUUID ), jobData )
128145 if pushRet .Err () != nil {
129146 logger .Errorf (ctx , "push job to queue err: %+v" , pushRet .Err ())
130147 common .ReplyErr (ctx , code .RPCHttpErr .WithErr (pushRet .Err ()))
0 commit comments