diff --git a/backend/executor.go b/backend/executor.go index 73e6d58a..42564e83 100644 --- a/backend/executor.go +++ b/backend/executor.go @@ -687,6 +687,7 @@ func createGetInstanceResponse(req *protos.GetInstanceRequest, metadata *Workflo CreatedTimestamp: metadata.CreatedAt, LastUpdatedTimestamp: metadata.LastUpdatedAt, Version: metadata.Version, + StartedAt: metadata.StartedAt, } if metadata.ParentInstanceId != "" { diff --git a/backend/postgres/postgres.go b/backend/postgres/postgres.go index e30a20eb..b59e7771 100644 --- a/backend/postgres/postgres.go +++ b/backend/postgres/postgres.go @@ -561,11 +561,19 @@ func (be *postgresBackend) CreateWorkflowInstance(ctx context.Context, e *backen return err } + // Honour ScheduledStartTimestamp by deferring the start event's + // visibility. NULL VisibleTime means immediately visible. + var visibleTime any + if ts := e.GetExecutionStarted().GetScheduledStartTimestamp(); ts != nil { + visibleTime = ts.AsTime() + } + _, err = tx.Exec( ctx, - `INSERT INTO NewEvents (InstanceID, EventPayload) VALUES ($1, $2)`, + `INSERT INTO NewEvents (InstanceID, EventPayload, VisibleTime) VALUES ($1, $2, $3)`, instanceID, eventPayload, + visibleTime, ) if err != nil { @@ -787,6 +795,11 @@ func (be *postgresBackend) GetWorkflowMetadata(ctx context.Context, iid api.Inst versionw = wrapperspb.String(*version) } + startedAt, err := be.getStartedAt(ctx, iid) + if err != nil { + return nil, err + } + startEvent, err := be.getStartEvent(ctx, iid) if err != nil { return nil, err @@ -814,6 +827,7 @@ func (be *postgresBackend) GetWorkflowMetadata(ctx context.Context, iid api.Inst Version: versionw, ParentInstanceId: parentInstanceID, ParentAppId: parentAppIDw, + StartedAt: startedAt, }, nil } @@ -843,6 +857,34 @@ func (be *postgresBackend) getStartEvent(ctx context.Context, iid api.InstanceID return e.GetExecutionStarted(), nil } +// getStartedAt returns the timestamp of the first history event for the +// instance, or nil if the workflow has not yet been picked up by a worker +// (History is empty). +// +// In History, row 0 is the WorkflowStartedEvent injected by the engine in +// workflowProcessor.applyWorkItem; its Timestamp is the moment the worker +// first picked the workflow up — distinct from the ExecutionStartedEvent's +// creation timestamp. +func (be *postgresBackend) getStartedAt(ctx context.Context, iid api.InstanceID) (*timestamppb.Timestamp, error) { + var payload []byte + err := be.db.QueryRow( + ctx, + "SELECT EventPayload FROM History WHERE InstanceID = $1 ORDER BY SequenceNumber ASC LIMIT 1", + iid, + ).Scan(&payload) + if errors.Is(err, pgx.ErrNoRows) { + return nil, nil + } + if err != nil { + return nil, fmt.Errorf("failed to query first history event: %w", err) + } + e, err := backend.UnmarshalHistoryEvent(payload) + if err != nil { + return nil, fmt.Errorf("failed to unmarshal first history event: %w", err) + } + return e.GetTimestamp(), nil +} + // GetWorkflowRuntimeState implements backend.Backend func (be *postgresBackend) GetWorkflowRuntimeState(ctx context.Context, wi *backend.WorkflowWorkItem) (*backend.WorkflowRuntimeState, error) { if err := be.ensureDB(); err != nil { diff --git a/backend/runtimestate/runtimestate_test.go b/backend/runtimestate/runtimestate_test.go index 00fbd9e9..e46e4392 100644 --- a/backend/runtimestate/runtimestate_test.go +++ b/backend/runtimestate/runtimestate_test.go @@ -2,6 +2,7 @@ package runtimestate import ( "testing" + "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -414,3 +415,68 @@ func TestAddEvent_NewOrchestrationRuntimeStateDropsHistoryDuplicates(t *testing. assert.True(t, dedup.IsPresent(s.OldEvents, dedup.KindTask, 1)) assert.True(t, dedup.IsPresent(s.OldEvents, dedup.KindTask, 2)) } + +func TestGetStartedTime(t *testing.T) { + t.Parallel() + + // Mirrors what the engine produces in applyWorkItem: a WorkflowStartedEvent + // is appended first (timestamp = first-execution time), then the work + // item's own events (the original ExecutionStartedEvent at creation time). + firstRun := timestamppb.New(timestamppb.Now().AsTime()) + creation := timestamppb.New(firstRun.AsTime().Add(-time.Second)) + history := []*protos.HistoryEvent{ + { + EventId: -1, + Timestamp: firstRun, + EventType: &protos.HistoryEvent_WorkflowStarted{ + WorkflowStarted: &protos.WorkflowStartedEvent{}, + }, + }, + { + EventId: -1, + Timestamp: creation, + EventType: &protos.HistoryEvent_ExecutionStarted{ + ExecutionStarted: &protos.ExecutionStartedEvent{Name: "wf"}, + }, + }, + } + s := NewWorkflowRuntimeState("abc", nil, history) + + got := GetStartedTime(s) + require.False(t, got.IsZero()) + assert.Equal(t, firstRun.AsTime(), got, "must return first event's timestamp, not the ExecutionStarted's") +} + +func TestGetStartedTime_EmptyState(t *testing.T) { + t.Parallel() + + // Workflow created but not yet executed -> no events in either OldEvents + // or NewEvents. The dapr orchestrator's `if !t.IsZero()` guard depends on + // this returning a zero time so StartedAt stays nil. + s := NewWorkflowRuntimeState("abc", nil, nil) + + got := GetStartedTime(s) + assert.True(t, got.IsZero(), "empty rstate must return zero time") +} + +func TestGetStartedTime_NewEventsOnly(t *testing.T) { + t.Parallel() + + // Mid-processing: events have been added via AddEvent but not yet moved + // to OldEvents (which happens when the runtime state is reloaded after a + // save). GetStartedTime must still return the first event's timestamp. + ts := timestamppb.Now() + s := NewWorkflowRuntimeState("abc", nil, nil) + require.NoError(t, AddEvent(s, &protos.HistoryEvent{ + EventId: -1, + Timestamp: ts, + EventType: &protos.HistoryEvent_WorkflowStarted{ + WorkflowStarted: &protos.WorkflowStartedEvent{}, + }, + })) + require.Empty(t, s.OldEvents) + require.Len(t, s.NewEvents, 1) + + got := GetStartedTime(s) + assert.Equal(t, ts.AsTime(), got) +} diff --git a/backend/sqlite/sqlite.go b/backend/sqlite/sqlite.go index 84097145..f323c62b 100644 --- a/backend/sqlite/sqlite.go +++ b/backend/sqlite/sqlite.go @@ -435,11 +435,19 @@ func (be *sqliteBackend) CreateWorkflowInstance(ctx context.Context, e *backend. return err } + // Honour ScheduledStartTimestamp by deferring the start event's + // visibility. NULL VisibleTime means immediately visible. + var visibleTime any + if ts := e.GetExecutionStarted().GetScheduledStartTimestamp(); ts != nil { + visibleTime = ts.AsTime() + } + _, err = tx.ExecContext( ctx, - `INSERT INTO NewEvents ([InstanceID], [EventPayload]) VALUES (?, ?)`, + `INSERT INTO NewEvents ([InstanceID], [EventPayload], [VisibleTime]) VALUES (?, ?, ?)`, instanceID, eventPayload, + visibleTime, ) if err != nil { @@ -695,6 +703,11 @@ func (be *sqliteBackend) GetWorkflowMetadata(ctx context.Context, iid api.Instan } } + startedAt, err := be.getStartedAt(ctx, iid) + if err != nil { + return nil, err + } + startEvent, err := be.getStartEvent(ctx, iid) if err != nil { @@ -723,8 +736,10 @@ func (be *sqliteBackend) GetWorkflowMetadata(ctx context.Context, iid api.Instan Version: versionw, ParentInstanceId: parentInstanceID, ParentAppId: parentAppIDw, + StartedAt: startedAt, }, nil } + // getStartEvent loads the ExecutionStarted event for an instance, or nil // if the workflow has not yet been picked up by a worker. // @@ -751,6 +766,34 @@ func (be *sqliteBackend) getStartEvent(ctx context.Context, iid api.InstanceID) return e.GetExecutionStarted(), nil } +// getStartedAt returns the timestamp of the first history event for the +// instance, or nil if the workflow has not yet been picked up by a worker +// (History is empty). +// +// In History, row 0 is the WorkflowStartedEvent injected by the engine in +// workflowProcessor.applyWorkItem; its Timestamp is the moment the worker +// first picked the workflow up — distinct from the ExecutionStartedEvent's +// creation timestamp. +func (be *sqliteBackend) getStartedAt(ctx context.Context, iid api.InstanceID) (*timestamppb.Timestamp, error) { + var payload []byte + err := be.db.QueryRowContext( + ctx, + "SELECT [EventPayload] FROM History WHERE [InstanceID] = ? ORDER BY [SequenceNumber] ASC LIMIT 1", + iid, + ).Scan(&payload) + if errors.Is(err, sql.ErrNoRows) { + return nil, nil + } + if err != nil { + return nil, fmt.Errorf("failed to query first history event: %w", err) + } + e, err := backend.UnmarshalHistoryEvent(payload) + if err != nil { + return nil, fmt.Errorf("failed to unmarshal first history event: %w", err) + } + return e.GetTimestamp(), nil +} + // GetWorkflowRuntimeState implements backend.Backend func (be *sqliteBackend) GetWorkflowRuntimeState(ctx context.Context, wi *backend.WorkflowWorkItem) (*backend.WorkflowRuntimeState, error) { if err := be.ensureDB(); err != nil { diff --git a/client/client_grpc.go b/client/client_grpc.go index 194a3cc0..5d305ae3 100644 --- a/client/client_grpc.go +++ b/client/client_grpc.go @@ -308,16 +308,17 @@ func makeWorkflowMetadata(resp *protos.GetInstanceResponse) (*backend.WorkflowMe return nil, fmt.Errorf("workflow state is nil") } metadata := &backend.WorkflowMetadata{ - InstanceId: resp.WorkflowState.InstanceId, - Name: resp.WorkflowState.Name, - RuntimeStatus: resp.WorkflowState.WorkflowStatus, - Input: resp.WorkflowState.Input, - CustomStatus: resp.WorkflowState.CustomStatus, - Output: resp.WorkflowState.Output, - CreatedAt: resp.WorkflowState.CreatedTimestamp, - LastUpdatedAt: resp.WorkflowState.LastUpdatedTimestamp, - FailureDetails: resp.WorkflowState.FailureDetails, - Version: resp.WorkflowState.Version, + InstanceId: resp.GetWorkflowState().GetInstanceId(), + Name: resp.GetWorkflowState().GetName(), + RuntimeStatus: resp.GetWorkflowState().GetWorkflowStatus(), + Input: resp.GetWorkflowState().GetInput(), + CustomStatus: resp.GetWorkflowState().GetCustomStatus(), + Output: resp.GetWorkflowState().GetOutput(), + CreatedAt: resp.GetWorkflowState().GetCreatedTimestamp(), + LastUpdatedAt: resp.GetWorkflowState().GetLastUpdatedTimestamp(), + FailureDetails: resp.GetWorkflowState().GetFailureDetails(), + Version: resp.GetWorkflowState().GetVersion(), + StartedAt: resp.GetWorkflowState().GetStartedAt(), } return metadata, nil } diff --git a/tests/backend_test.go b/tests/backend_test.go index 125404cd..d8e1e0ee 100644 --- a/tests/backend_test.go +++ b/tests/backend_test.go @@ -414,6 +414,66 @@ func Test_GetWorkflowMetadata_NoParent(t *testing.T) { } } + +func Test_GetWorkflowMetadata_StartedAt(t *testing.T) { + for i, be := range backends { + initTest(t, be, i, true) + + const iid = "startedat-instance" + + // ExecutionStartedEvent's timestamp — captured before CreateWorkflowInstance + // so we can later prove StartedAt is strictly later than this value + // (row-ordering check; see assertions below). + startTS := time.Now().UTC().Truncate(time.Microsecond) + e := &protos.HistoryEvent{ + Timestamp: timestamppb.New(startTS), + EventType: &protos.HistoryEvent_ExecutionStarted{ + ExecutionStarted: &protos.ExecutionStartedEvent{ + Name: defaultName, + WorkflowInstance: &protos.WorkflowInstance{InstanceId: iid}, + Input: wrapperspb.String(defaultInput), + }, + }, + } + if !assert.NoError(t, be.CreateWorkflowInstance(ctx, e)) { + continue + } + + // Pre-execution: instance row exists but History is empty. + // getStartedAt must hit the no-rows branch and StartedAt stays nil. + md, err := be.GetWorkflowMetadata(ctx, api.InstanceID(iid)) + if assert.NoError(t, err) { + assert.Nil(t, md.StartedAt, "StartedAt should be nil before the first work item is processed") + } + + // Drive the work item using the shared harness, which mirrors + // workflowProcessor.applyWorkItem by prepending a WorkflowStartedEvent + // to NewEvents. After this, History row 0 = WorkflowStarted (timestamp + // captured inside the helper), row 1 = ExecutionStarted (startTS). + beforeProcess := time.Now().UTC() + if !processFirstWorkItem(t, be, iid) { + continue + } + afterProcess := time.Now().UTC() + + + // after processing the work item startAt should return a non-nil value not earlier than the time the + // work item was processed and not earlier than the start time + md, err = be.GetWorkflowMetadata(ctx, api.InstanceID(iid)) + if assert.NoError(t, err) { + if assert.NotNil(t, md.StartedAt, "StartedAt must be populated once History has a row") { + started := md.StartedAt.AsTime() + assert.False(t, started.Before(beforeProcess), + "StartedAt %v should be >= %v (start of work-item processing)", started, beforeProcess) + assert.False(t, started.After(afterProcess), + "StartedAt %v should be <= %v (end of work-item processing)", started, afterProcess) + assert.False(t, started.Before(startTS), + "StartedAt %v should be >= %v (ExecutionStarted timestamp)", started, startTS) + } + } + } +} + func Test_PurgeWorkflowState(t *testing.T) { for i, be := range backends { initTest(t, be, i, true) @@ -586,19 +646,12 @@ func getWorkflowRuntimeState(t assert.TestingT, be backend.Backend, wi *backend. return nil, false } -func getWorkflowMetadata(t assert.TestingT, be backend.Backend, iid api.InstanceID) (*backend.WorkflowMetadata, bool) { - metadata, err := be.GetWorkflowMetadata(ctx, iid) - if assert.NoError(t, err) && assert.NotNil(t, metadata) { - return metadata, assert.Equal(t, iid, api.InstanceID(metadata.InstanceId)) - } - - return nil, false -} - // processFirstWorkItem fetches the first work item for the given instance, // prepends a WorkflowStartedEvent to NewEvents before the work item's own events, // applies its NewEvents to the runtime state, and completes the work item -// without producing any additional workflow actions. +// without producing any additional workflow actions. This mirrors what +// workflowProcessor.applyWorkItem does in production, so the persisted +// History layout matches: row 0 = WorkflowStarted, row 1 = ExecutionStarted. func processFirstWorkItem(t assert.TestingT, be backend.Backend, instanceID string) bool { wi, ok := getWorkflowWorkItem(t, be, instanceID) if !ok { @@ -621,3 +674,13 @@ func processFirstWorkItem(t assert.TestingT, be backend.Backend, instanceID stri wi.State = state return assert.NoError(t, be.CompleteWorkflowWorkItem(ctx, wi)) } + +func getWorkflowMetadata(t assert.TestingT, be backend.Backend, iid api.InstanceID) (*backend.WorkflowMetadata, bool) { + metadata, err := be.GetWorkflowMetadata(ctx, iid) + if assert.NoError(t, err) && assert.NotNil(t, metadata) { + return metadata, assert.Equal(t, iid, api.InstanceID(metadata.InstanceId)) + } + + return nil, false +} + diff --git a/tests/orchestrations_test.go b/tests/orchestrations_test.go index 7139e732..77c7b943 100644 --- a/tests/orchestrations_test.go +++ b/tests/orchestrations_test.go @@ -1836,6 +1836,130 @@ func Test_WorkflowPatching_TracingSpans(t *testing.T) { ) } +func Test_StartedAt_AfterExecution(t *testing.T) { + r := task.NewTaskRegistry() + r.AddWorkflowN("StartedAtAfterExec", func(ctx *task.WorkflowContext) (any, error) { + return nil, nil + }) + + ctx := context.Background() + utils.InitTracing() + client, worker := initTaskHubWorker(ctx, r) + defer worker.Shutdown(ctx) + + beforeSchedule := time.Now().UTC() + id, err := client.ScheduleNewWorkflow(ctx, "StartedAtAfterExec") + require.NoError(t, err) + metadata, err := client.WaitForWorkflowCompletion(ctx, id) + require.NoError(t, err) + afterCompletion := time.Now().UTC() + + require.Equal(t, protos.OrchestrationStatus_ORCHESTRATION_STATUS_COMPLETED, metadata.RuntimeStatus) + require.NotNil(t, metadata.StartedAt, "StartedAt must be populated once execution has begun") + startedAt := metadata.StartedAt.AsTime() + // StartedAt is the engine-injected WorkflowStartedEvent timestamp at first + // pickup. It must fall between the schedule call and the metadata read. + assert.False(t, startedAt.Before(beforeSchedule), "StartedAt %v should be >= scheduling time %v", startedAt, beforeSchedule) + assert.False(t, startedAt.After(afterCompletion), "StartedAt %v should be <= now %v", startedAt, afterCompletion) + // StartedAt must be at or after CreatedAt. + assert.False(t, startedAt.Before(metadata.CreatedAt.AsTime()), + "StartedAt %v should be >= CreatedAt %v", startedAt, metadata.CreatedAt.AsTime()) +} + + +func Test_StartedAt_WithScheduleTime(t *testing.T) { + r := task.NewTaskRegistry() + r.AddWorkflowN("StartedAtAfterExec", func(ctx *task.WorkflowContext) (any, error) { + return nil, nil + }) + + ctx := context.Background() + utils.InitTracing() + client, worker := initTaskHubWorker(ctx, r) + defer worker.Shutdown(ctx) + + beforeSchedule := time.Now().UTC() + startTime := beforeSchedule.Add(time.Second) + id, err := client.ScheduleNewWorkflow(ctx, "StartedAtAfterExec", api.WithStartTime(startTime)) + require.NoError(t, err) + metadata, err := client.WaitForWorkflowCompletion(ctx, id) + require.NoError(t, err) + afterCompletion := time.Now().UTC() + + require.Equal(t, protos.OrchestrationStatus_ORCHESTRATION_STATUS_COMPLETED, metadata.RuntimeStatus) + require.NotNil(t, metadata.StartedAt, "StartedAt must be populated once execution has begun") + startedAt := metadata.StartedAt.AsTime() + // StartedAt is the engine-injected WorkflowStartedEvent timestamp at first + // pickup. It must fall between the schedule call and the metadata read. + assert.False(t, startedAt.Before(startTime), "StartedAt %v should be >= start time %v", startedAt, startTime) + assert.False(t, startedAt.Before(metadata.CreatedAt.AsTime()), "StartedAt %v should be >= CreatedAt %v", startedAt, startTime) + assert.False(t, startedAt.After(afterCompletion), "StartedAt %v should be <= now %v", startedAt, afterCompletion) + // StartedAt must be at or after CreatedAt. + assert.False(t, startedAt.Before(metadata.CreatedAt.AsTime()), + "StartedAt %v should be >= CreatedAt %v", startedAt, metadata.CreatedAt.AsTime()) +} + +func Test_StartedAt_NilBeforeExecution(t *testing.T) { + // Verify GetWorkflowMetadata returns StartedAt=nil while a workflow is + // PENDING (history is empty). To get a deterministic PENDING state we + // don't start a worker, so the queued start event is never processed. + ctx := context.Background() + logger := backend.DefaultLogger() + be := sqlite.NewSqliteBackend(sqlite.NewSqliteOptions(""), logger) + require.NoError(t, be.CreateTaskHub(ctx)) + client := backend.NewTaskHubClient(be) + + id, err := client.ScheduleNewWorkflow(ctx, "NeverRun") + require.NoError(t, err) + + metadata, err := client.FetchWorkflowMetadata(ctx, id) + require.NoError(t, err) + require.Equal(t, protos.OrchestrationStatus_ORCHESTRATION_STATUS_PENDING, metadata.RuntimeStatus) + assert.Nil(t, metadata.StartedAt, "StartedAt must be nil while the workflow is pending") +} + +func Test_StartedAt_AfterContinueAsNew(t *testing.T) { + r := task.NewTaskRegistry() + r.AddWorkflowN("StartedAtCAN", func(ctx *task.WorkflowContext) (any, error) { + var input int32 + if err := ctx.GetInput(&input); err != nil { + return nil, err + } + if input < 2 { + if err := ctx.CreateTimer(0).Await(nil); err != nil { + return nil, err + } + ctx.ContinueAsNew(input + 1) + } + return input, nil + }) + + ctx := context.Background() + utils.InitTracing() + client, worker := initTaskHubWorker(ctx, r) + defer worker.Shutdown(ctx) + + beforeSchedule := time.Now().UTC() + id, err := client.ScheduleNewWorkflow(ctx, "StartedAtCAN", api.WithInput(0)) + require.NoError(t, err) + metadata, err := client.WaitForWorkflowCompletion(ctx, id) + require.NoError(t, err) + afterCompletion := time.Now().UTC() + require.Equal(t, protos.OrchestrationStatus_ORCHESTRATION_STATUS_COMPLETED, metadata.RuntimeStatus) + + // After ContinueAsNew, history is wiped and repopulated. StartedAt and + // CreatedAt are both refreshed from the *most recent* run's events + // (WorkflowStarted + ExecutionStarted produced by the applier in + // successive time.Now() calls), so they sit microseconds apart in either + // order. The meaningful invariant is that the value is non-nil and falls + // within the test's wall-clock window — which proves CAN didn't break + // the StartedAt path or leave a stale value from a previous run. + require.NotNil(t, metadata.StartedAt) + startedAt := metadata.StartedAt.AsTime() + assert.False(t, startedAt.Before(beforeSchedule), "StartedAt %v should be >= scheduling time %v", startedAt, beforeSchedule) + assert.False(t, startedAt.After(afterCompletion), "StartedAt %v should be <= now %v", startedAt, afterCompletion) +} + func initTaskHubWorker(ctx context.Context, r *task.TaskRegistry, opts ...backend.NewTaskWorkerOptions) (backend.TaskHubClient, backend.TaskHubWorker) { // TODO: Switch to options pattern logger := backend.DefaultLogger()