diff --git a/flow/activities/snapshot_activity.go b/flow/activities/snapshot_activity.go index 8b82400fa..0f92d1823 100644 --- a/flow/activities/snapshot_activity.go +++ b/flow/activities/snapshot_activity.go @@ -74,7 +74,24 @@ func (a *SnapshotActivity) SetupReplication( } logger.Info("waiting for slot to be created...") + slotCreateStart := time.Now() + stopSlotCreateWarning := common.Interval(ctx, 5*time.Minute, func() { + elapsed := time.Since(slotCreateStart) + if elapsed >= 15*time.Minute { + msg := fmt.Sprintf( + "Replication slot creation is taking very long (%s), possibly blocked by an open transaction."+ + " Run the following on the source to identify blockers:"+ + " SELECT pid, usename, application_name, state, now() - xact_start AS duration, query"+ + " FROM pg_stat_activity WHERE xact_start IS NOT NULL AND pid != pg_backend_pid() ORDER BY xact_start;", + elapsed.Round(time.Second), + ) + if err := alerting.InsertFlowLog(ctx, a.CatalogPool, config.FlowJobName, msg, alerting.FlowErrorTypeWarn); err != nil { + logger.Error("failed to insert slot creation warning", slog.Any("error", err)) + } + } + }) slotInfo, err := conn.SetupReplication(ctx, config) + stopSlotCreateWarning() if err != nil { connClose(ctx) diff --git a/flow/workflows/drop_flow.go b/flow/workflows/drop_flow.go index 05f999f2c..2ccee8898 100644 --- a/flow/workflows/drop_flow.go +++ b/flow/workflows/drop_flow.go @@ -112,7 +112,7 @@ func executeCDCDropActivities(ctx workflow.Context, input *protos.DropFlowInput) if canceled { if input.Resync { if err := errors.Join(ctx.Err(), sourceError, destinationError); err != nil { - logger.Warn("resync failed drop, proceeding with cdc flow", slog.Any("error", err)) + logger.Warn("resync failed drop, proceeding with cleanup", slog.Any("error", err)) } break } @@ -124,25 +124,62 @@ func executeCDCDropActivities(ctx workflow.Context, input *protos.DropFlowInput) } func DropFlowWorkflow(ctx workflow.Context, input *protos.DropFlowInput) error { + activeSignal := model.NoopSignal + if input.Resync { + activeSignal = model.ResyncSignal + } else { + activeSignal = model.TerminateSignal + } + if err := workflow.SetQueryHandler(ctx, shared.CDCFlowStateQuery, func() (cdc_state.CDCFlowWorkflowState, error) { state := cdc_state.CDCFlowWorkflowState{DropFlowInput: input} if input.Resync { state.CurrentFlowStatus = protos.FlowStatus_STATUS_RESYNC - state.ActiveSignal = model.ResyncSignal + state.ActiveSignal = activeSignal } else { state.CurrentFlowStatus = protos.FlowStatus_STATUS_TERMINATING - state.ActiveSignal = model.TerminateSignal + state.ActiveSignal = activeSignal } return state, nil }); err != nil { return fmt.Errorf("failed to set `%s` query handler: %w", shared.CDCFlowStateQuery, err) } + logger := workflow.GetLogger(ctx) + + if input.Resync { + flowSignalChan := model.FlowSignal.GetSignalChannel(ctx) + flowSignalStateChangeChan := model.FlowSignalStateChange.GetSignalChannel(ctx) + workflow.Go(ctx, func(gCtx workflow.Context) { + sigSelector := workflow.NewNamedSelector(gCtx, input.FlowJobName+"-drop-signals") + sigSelector.AddReceive(gCtx.Done(), func(_ workflow.ReceiveChannel, _ bool) {}) + + var onFlowSignal func(model.CDCFlowSignal, bool) + onFlowSignal = func(val model.CDCFlowSignal, _ bool) { + activeSignal = model.FlowSignalHandler(activeSignal, val, logger) + flowSignalChan.AddToSelector(sigSelector, onFlowSignal) + } + flowSignalChan.AddToSelector(sigSelector, onFlowSignal) + + var onStateChange func(*protos.FlowStateChangeRequest, bool) + onStateChange = func(req *protos.FlowStateChangeRequest, _ bool) { + if req.RequestedFlowState == protos.FlowStatus_STATUS_TERMINATING { + activeSignal = model.TerminateSignal + } + flowSignalStateChangeChan.AddToSelector(sigSelector, onStateChange) + } + flowSignalStateChangeChan.AddToSelector(sigSelector, onStateChange) + + for gCtx.Err() == nil { + sigSelector.Select(gCtx) + } + }) + } + status := protos.FlowStatus_STATUS_TERMINATING if input.Resync { status = protos.FlowStatus_STATUS_RESYNC } - logger := workflow.GetLogger(ctx) cdc_state.SyncStatusToCatalogWithFlowName(ctx, logger, status, input.FlowJobName) ctx = workflow.WithValue(ctx, shared.FlowNameKey, input.FlowJobName) @@ -205,7 +242,7 @@ func DropFlowWorkflow(ctx workflow.Context, input *protos.DropFlowInput) error { req := model.RemoveFlowDetailsFromCatalogRequest{ FlowName: input.FlowJobName, - Resync: input.Resync, + Resync: input.Resync && activeSignal != model.TerminateSignal, } if err := workflow.ExecuteActivity( removeFlowEntriesCtx, flowable.RemoveFlowDetailsFromCatalog, &req, @@ -214,7 +251,7 @@ func DropFlowWorkflow(ctx workflow.Context, input *protos.DropFlowInput) error { return err } - if input.Resync { + if input.Resync && activeSignal != model.TerminateSignal { return workflow.NewContinueAsNewError(ctx, CDCFlowWorkflow, input.FlowConnectionConfigs, nil) }