Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 56 additions & 32 deletions service/frontend/workflow_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -3498,29 +3498,11 @@ func (wh *WorkflowHandler) createScheduleCHASM(
return nil, err
}

// Validate blob size limit here. In the V1 codepath, this is done automatically
// as part of StartWorkflowExecution, but here it must be done separately (to
// maintain equal payload size limits between versions).
switch action := request.GetSchedule().GetAction().GetAction().(type) {
case *schedulepb.ScheduleAction_StartWorkflow:
payloadSize := request.GetMemo().Size() + action.StartWorkflow.GetInput().Size()
sizeLimitError := wh.config.BlobSizeLimitError(request.GetNamespace())
sizeLimitWarn := wh.config.BlobSizeLimitWarn(request.GetNamespace())
if err := common.CheckEventBlobSizeLimit(
payloadSize,
sizeLimitWarn,
sizeLimitError,
namespaceID.String(),
request.ScheduleId,
"", // don't have runid yet
wh.metricsScope(ctx).WithTags(metrics.CommandTypeTag(enumspb.COMMAND_TYPE_UNSPECIFIED.String())),
wh.throttledLogger,
"CreateSchedule",
); err != nil {
return nil, err
}
default:
return nil, serviceerror.NewInvalidArgument("Only StartWorkflow action is supported for schedules")
if err := wh.validateSchedulePayloadSize(
ctx, request.GetNamespace(), request.ScheduleId,
request.GetSchedule(), request.GetMemo(), "CreateSchedule",
); err != nil {
return nil, err
}

// Phase 1: Write sentinel to V1 key space (dummy workflow) to prevent a
Expand Down Expand Up @@ -3820,6 +3802,11 @@ func (wh *WorkflowHandler) CreateSchedule(
return nil, err
}

if startWorkflow := request.GetSchedule().GetAction().GetStartWorkflow(); startWorkflow != nil {
metricsHandler := wh.metricsScope(ctx).WithTags(metrics.HeaderCallsiteTag("CreateSchedule"))
metrics.HeaderSize.With(metricsHandler).Record(int64(startWorkflow.GetHeader().Size()))
}

if useChasmScheduler {
return wh.createScheduleCHASM(ctx, request)
}
Expand Down Expand Up @@ -4577,6 +4564,10 @@ func (wh *WorkflowHandler) UpdateSchedule(
return nil, errRequestNotSet
}

if len(request.GetRequestId()) > wh.config.MaxIDLengthLimit() {
return nil, errRequestIDTooLong
}

namespaceName := namespace.Name(request.GetNamespace())

if err := wh.validateStartWorkflowArgsForSchedule(
Expand All @@ -4601,6 +4592,18 @@ func (wh *WorkflowHandler) UpdateSchedule(
return nil, err
}

if startWorkflow := request.GetSchedule().GetAction().GetStartWorkflow(); startWorkflow != nil {
metricsHandler := wh.metricsScope(ctx).WithTags(metrics.HeaderCallsiteTag("UpdateSchedule"))
metrics.HeaderSize.With(metricsHandler).Record(int64(startWorkflow.GetHeader().Size()))
}

if err := wh.validateSchedulePayloadSize(
ctx, request.GetNamespace(), request.ScheduleId,
request.GetSchedule(), request.GetMemo(), "UpdateSchedule",
); err != nil {
return nil, err
}

if wh.chasmSchedulerEnabled(ctx, request.Namespace) {
res, err := wh.updateScheduleCHASM(ctx, request)
if err == nil {
Expand Down Expand Up @@ -4643,22 +4646,13 @@ func (wh *WorkflowHandler) updateScheduleWorkflow(
ctx context.Context,
request *workflowservice.UpdateScheduleRequest,
) (*workflowservice.UpdateScheduleResponse, error) {
if len(request.GetRequestId()) > wh.config.MaxIDLengthLimit() {
return nil, errRequestIDTooLong
}

workflowID := scheduler.WorkflowIDPrefix + request.ScheduleId

namespaceID, err := wh.namespaceRegistry.GetNamespaceID(namespace.Name(request.GetNamespace()))
if err != nil {
return nil, err
}

if startWorkflow := request.GetSchedule().GetAction().GetStartWorkflow(); startWorkflow != nil {
metricsHandler := wh.metricsScope(ctx).WithTags(metrics.HeaderCallsiteTag("UpdateSchedule"))
metrics.HeaderSize.With(metricsHandler).Record(int64(startWorkflow.GetHeader().Size()))
}

input := &schedulespb.FullUpdateRequest{
Schedule: request.Schedule,
SearchAttributes: request.SearchAttributes,
Expand Down Expand Up @@ -6750,6 +6744,36 @@ func (wh *WorkflowHandler) validateWorkflowID(
return nil
}

// validateSchedulePayloadSize validates the blob size of the schedule's memo
// and action input. This runs for both CHASM and V1 paths.
func (wh *WorkflowHandler) validateSchedulePayloadSize(
ctx context.Context,
namespaceName string,
scheduleID string,
schedule *schedulepb.Schedule,
memo *commonpb.Memo,
operation string,
) error {
action := schedule.GetAction().GetStartWorkflow()
if action == nil {
return serviceerror.NewInvalidArgument("Only StartWorkflow action is supported for schedules")
}
payloadSize := memo.Size() + action.GetInput().Size()
sizeLimitError := wh.config.BlobSizeLimitError(namespaceName)
sizeLimitWarn := wh.config.BlobSizeLimitWarn(namespaceName)
return common.CheckEventBlobSizeLimit(
payloadSize,
sizeLimitWarn,
sizeLimitError,
namespaceName,
scheduleID,
"",
wh.metricsScope(ctx).WithTags(metrics.CommandTypeTag(enumspb.COMMAND_TYPE_UNSPECIFIED.String())),
wh.throttledLogger,
operation,
)
}

func (wh *WorkflowHandler) canonicalizeScheduleSpec(schedule *schedulepb.Schedule) error {
if schedule.Spec == nil {
schedule.Spec = &schedulepb.ScheduleSpec{}
Expand Down
116 changes: 116 additions & 0 deletions tests/schedule_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,8 @@ func runSharedScheduleTests(t *testing.T, newContext contextFactory) {
t.Run("TestSchedule_InternalTaskQueue", func(t *testing.T) { testScheduleInternalTaskQueue(t, newContext) })
t.Run("TestDeletedScheduleOperations", func(t *testing.T) { testDeletedScheduleOperations(t, newContext) })
t.Run("TestUnpauseResumesProcessing", func(t *testing.T) { testCHASMUnpauseResumesProcessing(t, newContext) })
t.Run("TestUpdateScheduleRequestIDTooLong", func(t *testing.T) { testUpdateScheduleRequestIDTooLong(t, newContext) })
t.Run("TestUpdateScheduleBlobSizeLimit", func(t *testing.T) { testUpdateScheduleBlobSizeLimit(t, newContext) })
}

func testDeletedScheduleOperations(t *testing.T, newContext contextFactory) {
Expand Down Expand Up @@ -2734,3 +2736,117 @@ func testCHASMUnpauseResumesProcessing(t *testing.T, newContext contextFactory)
"schedule should resume processing after unpause",
)
}
func testUpdateScheduleRequestIDTooLong(t *testing.T, newContext contextFactory) {
s := testcore.NewEnv(t, scheduleCommonOpts()...)

sid := "sched-test-update-reqid-too-long"
wid := "sched-test-update-reqid-too-long-wf"
wt := "sched-test-update-reqid-too-long-wt"

s.SdkWorker().RegisterWorkflowWithOptions(
func(ctx workflow.Context) error { return nil },
workflow.RegisterOptions{Name: wt},
)

schedule := &schedulepb.Schedule{
Spec: &schedulepb.ScheduleSpec{
Interval: []*schedulepb.IntervalSpec{
{Interval: durationpb.New(1 * time.Hour)},
},
},
Action: &schedulepb.ScheduleAction{
Action: &schedulepb.ScheduleAction_StartWorkflow{
StartWorkflow: &workflowpb.NewWorkflowExecutionInfo{
WorkflowId: wid,
WorkflowType: &commonpb.WorkflowType{Name: wt},
TaskQueue: &taskqueuepb.TaskQueue{Name: s.WorkerTaskQueue(), Kind: enumspb.TASK_QUEUE_KIND_NORMAL},
},
},
},
}

// Create schedule.
ctx := newContext(s.Context())
_, err := s.FrontendClient().CreateSchedule(ctx, &workflowservice.CreateScheduleRequest{
Namespace: s.Namespace().String(),
ScheduleId: sid,
Schedule: schedule,
Identity: "test",
RequestId: uuid.NewString(),
})
require.NoError(t, err)

// Update with an oversized request ID.
_, err = s.FrontendClient().UpdateSchedule(newContext(s.Context()), &workflowservice.UpdateScheduleRequest{
Namespace: s.Namespace().String(),
ScheduleId: sid,
Schedule: schedule,
Identity: "test",
RequestId: strings.Repeat("x", 1001),
})
var invalidArgReqID *serviceerror.InvalidArgument
require.ErrorAs(t, err, &invalidArgReqID)
}

func testUpdateScheduleBlobSizeLimit(t *testing.T, newContext contextFactory) {
s := testcore.NewEnv(t,
append(scheduleCommonOpts(),
testcore.WithDynamicConfig(dynamicconfig.BlobSizeLimitError, 1000),
testcore.WithDynamicConfig(dynamicconfig.BlobSizeLimitWarn, 500),
)...,
)

sid := "sched-test-update-blob-limit"
wid := "sched-test-update-blob-limit-wf"
wt := "sched-test-update-blob-limit-wt"

s.SdkWorker().RegisterWorkflowWithOptions(
func(ctx workflow.Context) error { return nil },
workflow.RegisterOptions{Name: wt},
)

schedule := &schedulepb.Schedule{
Spec: &schedulepb.ScheduleSpec{
Interval: []*schedulepb.IntervalSpec{
{Interval: durationpb.New(1 * time.Hour)},
},
},
Action: &schedulepb.ScheduleAction{
Action: &schedulepb.ScheduleAction_StartWorkflow{
StartWorkflow: &workflowpb.NewWorkflowExecutionInfo{
WorkflowId: wid,
WorkflowType: &commonpb.WorkflowType{Name: wt},
TaskQueue: &taskqueuepb.TaskQueue{Name: s.WorkerTaskQueue(), Kind: enumspb.TASK_QUEUE_KIND_NORMAL},
},
},
},
}

// Create schedule.
ctx := newContext(s.Context())
_, err := s.FrontendClient().CreateSchedule(ctx, &workflowservice.CreateScheduleRequest{
Namespace: s.Namespace().String(),
ScheduleId: sid,
Schedule: schedule,
Identity: "test",
RequestId: uuid.NewString(),
})
require.NoError(t, err)

// Update with an oversized memo that exceeds the blob size limit.
largeMemo := &commonpb.Memo{
Fields: map[string]*commonpb.Payload{
"key": {Data: make([]byte, 1001)},
},
}
_, err = s.FrontendClient().UpdateSchedule(newContext(s.Context()), &workflowservice.UpdateScheduleRequest{
Namespace: s.Namespace().String(),
ScheduleId: sid,
Schedule: schedule,
Identity: "test",
RequestId: uuid.NewString(),
Memo: largeMemo,
})
var invalidArgBlob *serviceerror.InvalidArgument
require.ErrorAs(t, err, &invalidArgBlob)
}
Loading