From e4da2b770ee7b6875d96696b3772af281ae807b0 Mon Sep 17 00:00:00 2001 From: masterashu Date: Wed, 17 Jun 2026 22:10:25 +0530 Subject: [PATCH 1/3] fix: handle status terminating during resync-drop-flow. --- flow/workflows/drop_flow.go | 49 ++++++++++++++++++++++++++++++++----- 1 file changed, 43 insertions(+), 6 deletions(-) diff --git a/flow/workflows/drop_flow.go b/flow/workflows/drop_flow.go index 05f999f2c..2ccee8898 100644 --- a/flow/workflows/drop_flow.go +++ b/flow/workflows/drop_flow.go @@ -112,7 +112,7 @@ func executeCDCDropActivities(ctx workflow.Context, input *protos.DropFlowInput) if canceled { if input.Resync { if err := errors.Join(ctx.Err(), sourceError, destinationError); err != nil { - logger.Warn("resync failed drop, proceeding with cdc flow", slog.Any("error", err)) + logger.Warn("resync failed drop, proceeding with cleanup", slog.Any("error", err)) } break } @@ -124,25 +124,62 @@ func executeCDCDropActivities(ctx workflow.Context, input *protos.DropFlowInput) } func DropFlowWorkflow(ctx workflow.Context, input *protos.DropFlowInput) error { + activeSignal := model.NoopSignal + if input.Resync { + activeSignal = model.ResyncSignal + } else { + activeSignal = model.TerminateSignal + } + if err := workflow.SetQueryHandler(ctx, shared.CDCFlowStateQuery, func() (cdc_state.CDCFlowWorkflowState, error) { state := cdc_state.CDCFlowWorkflowState{DropFlowInput: input} if input.Resync { state.CurrentFlowStatus = protos.FlowStatus_STATUS_RESYNC - state.ActiveSignal = model.ResyncSignal + state.ActiveSignal = activeSignal } else { state.CurrentFlowStatus = protos.FlowStatus_STATUS_TERMINATING - state.ActiveSignal = model.TerminateSignal + state.ActiveSignal = activeSignal } return state, nil }); err != nil { return fmt.Errorf("failed to set `%s` query handler: %w", shared.CDCFlowStateQuery, err) } + logger := workflow.GetLogger(ctx) + + if input.Resync { + flowSignalChan := model.FlowSignal.GetSignalChannel(ctx) + flowSignalStateChangeChan := model.FlowSignalStateChange.GetSignalChannel(ctx) + workflow.Go(ctx, func(gCtx workflow.Context) { + sigSelector := workflow.NewNamedSelector(gCtx, input.FlowJobName+"-drop-signals") + sigSelector.AddReceive(gCtx.Done(), func(_ workflow.ReceiveChannel, _ bool) {}) + + var onFlowSignal func(model.CDCFlowSignal, bool) + onFlowSignal = func(val model.CDCFlowSignal, _ bool) { + activeSignal = model.FlowSignalHandler(activeSignal, val, logger) + flowSignalChan.AddToSelector(sigSelector, onFlowSignal) + } + flowSignalChan.AddToSelector(sigSelector, onFlowSignal) + + var onStateChange func(*protos.FlowStateChangeRequest, bool) + onStateChange = func(req *protos.FlowStateChangeRequest, _ bool) { + if req.RequestedFlowState == protos.FlowStatus_STATUS_TERMINATING { + activeSignal = model.TerminateSignal + } + flowSignalStateChangeChan.AddToSelector(sigSelector, onStateChange) + } + flowSignalStateChangeChan.AddToSelector(sigSelector, onStateChange) + + for gCtx.Err() == nil { + sigSelector.Select(gCtx) + } + }) + } + status := protos.FlowStatus_STATUS_TERMINATING if input.Resync { status = protos.FlowStatus_STATUS_RESYNC } - logger := workflow.GetLogger(ctx) cdc_state.SyncStatusToCatalogWithFlowName(ctx, logger, status, input.FlowJobName) ctx = workflow.WithValue(ctx, shared.FlowNameKey, input.FlowJobName) @@ -205,7 +242,7 @@ func DropFlowWorkflow(ctx workflow.Context, input *protos.DropFlowInput) error { req := model.RemoveFlowDetailsFromCatalogRequest{ FlowName: input.FlowJobName, - Resync: input.Resync, + Resync: input.Resync && activeSignal != model.TerminateSignal, } if err := workflow.ExecuteActivity( removeFlowEntriesCtx, flowable.RemoveFlowDetailsFromCatalog, &req, @@ -214,7 +251,7 @@ func DropFlowWorkflow(ctx workflow.Context, input *protos.DropFlowInput) error { return err } - if input.Resync { + if input.Resync && activeSignal != model.TerminateSignal { return workflow.NewContinueAsNewError(ctx, CDCFlowWorkflow, input.FlowConnectionConfigs, nil) } From fcebf4c45768edcdb84756c935a3e346fe7eeaf3 Mon Sep 17 00:00:00 2001 From: masterashu Date: Wed, 17 Jun 2026 22:11:07 +0530 Subject: [PATCH 2/3] feat: add user facing logs for waiting replication slot creation --- flow/activities/snapshot_activity.go | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/flow/activities/snapshot_activity.go b/flow/activities/snapshot_activity.go index 8b82400fa..07621ec85 100644 --- a/flow/activities/snapshot_activity.go +++ b/flow/activities/snapshot_activity.go @@ -74,7 +74,24 @@ func (a *SnapshotActivity) SetupReplication( } logger.Info("waiting for slot to be created...") + slotCreateStart := time.Now() + stopSlotCreateWarning := common.Interval(ctx, time.Minute, func() { + elapsed := time.Since(slotCreateStart) + if elapsed >= 15*time.Minute { + msg := fmt.Sprintf( + "Replication slot creation is taking very long (%s), possibly blocked by an open transaction."+ + " Run the following on the source to identify blockers:"+ + " SELECT pid, usename, application_name, state, now() - xact_start AS duration, query"+ + " FROM pg_stat_activity WHERE xact_start IS NOT NULL AND pid != pg_backend_pid() ORDER BY xact_start;", + elapsed.Round(time.Second), + ) + if err := alerting.InsertFlowLog(ctx, a.CatalogPool, config.FlowJobName, msg, alerting.FlowErrorTypeWarn); err != nil { + logger.Error("failed to insert slot creation warning", slog.Any("error", err)) + } + } + }) slotInfo, err := conn.SetupReplication(ctx, config) + stopSlotCreateWarning() if err != nil { connClose(ctx) From a155412b64cbf8b0d29d5b6591d3d36550888f2e Mon Sep 17 00:00:00 2001 From: masterashu Date: Wed, 17 Jun 2026 22:16:41 +0530 Subject: [PATCH 3/3] fix --- flow/activities/snapshot_activity.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flow/activities/snapshot_activity.go b/flow/activities/snapshot_activity.go index 07621ec85..0f92d1823 100644 --- a/flow/activities/snapshot_activity.go +++ b/flow/activities/snapshot_activity.go @@ -75,7 +75,7 @@ func (a *SnapshotActivity) SetupReplication( logger.Info("waiting for slot to be created...") slotCreateStart := time.Now() - stopSlotCreateWarning := common.Interval(ctx, time.Minute, func() { + stopSlotCreateWarning := common.Interval(ctx, 5*time.Minute, func() { elapsed := time.Since(slotCreateStart) if elapsed >= 15*time.Minute { msg := fmt.Sprintf(