Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions flow/activities/snapshot_activity.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,24 @@ func (a *SnapshotActivity) SetupReplication(
}

logger.Info("waiting for slot to be created...")
slotCreateStart := time.Now()
stopSlotCreateWarning := common.Interval(ctx, 5*time.Minute, func() {
elapsed := time.Since(slotCreateStart)
if elapsed >= 15*time.Minute {
msg := fmt.Sprintf(
"Replication slot creation is taking very long (%s), possibly blocked by an open transaction."+
" Run the following on the source to identify blockers:"+
" SELECT pid, usename, application_name, state, now() - xact_start AS duration, query"+
" FROM pg_stat_activity WHERE xact_start IS NOT NULL AND pid != pg_backend_pid() ORDER BY xact_start;",
elapsed.Round(time.Second),
)
if err := alerting.InsertFlowLog(ctx, a.CatalogPool, config.FlowJobName, msg, alerting.FlowErrorTypeWarn); err != nil {
logger.Error("failed to insert slot creation warning", slog.Any("error", err))
}
}
})
slotInfo, err := conn.SetupReplication(ctx, config)
stopSlotCreateWarning()

if err != nil {
connClose(ctx)
Expand Down
49 changes: 43 additions & 6 deletions flow/workflows/drop_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ func executeCDCDropActivities(ctx workflow.Context, input *protos.DropFlowInput)
if canceled {
if input.Resync {
if err := errors.Join(ctx.Err(), sourceError, destinationError); err != nil {
logger.Warn("resync failed drop, proceeding with cdc flow", slog.Any("error", err))
logger.Warn("resync failed drop, proceeding with cleanup", slog.Any("error", err))
}
break
}
Expand All @@ -124,25 +124,62 @@ func executeCDCDropActivities(ctx workflow.Context, input *protos.DropFlowInput)
}

func DropFlowWorkflow(ctx workflow.Context, input *protos.DropFlowInput) error {
activeSignal := model.NoopSignal
if input.Resync {
activeSignal = model.ResyncSignal
} else {
activeSignal = model.TerminateSignal
}

if err := workflow.SetQueryHandler(ctx, shared.CDCFlowStateQuery, func() (cdc_state.CDCFlowWorkflowState, error) {
state := cdc_state.CDCFlowWorkflowState{DropFlowInput: input}
if input.Resync {
state.CurrentFlowStatus = protos.FlowStatus_STATUS_RESYNC
state.ActiveSignal = model.ResyncSignal
state.ActiveSignal = activeSignal
} else {
state.CurrentFlowStatus = protos.FlowStatus_STATUS_TERMINATING
state.ActiveSignal = model.TerminateSignal
state.ActiveSignal = activeSignal
}
return state, nil
}); err != nil {
return fmt.Errorf("failed to set `%s` query handler: %w", shared.CDCFlowStateQuery, err)
}

logger := workflow.GetLogger(ctx)

if input.Resync {
flowSignalChan := model.FlowSignal.GetSignalChannel(ctx)
flowSignalStateChangeChan := model.FlowSignalStateChange.GetSignalChannel(ctx)
workflow.Go(ctx, func(gCtx workflow.Context) {
sigSelector := workflow.NewNamedSelector(gCtx, input.FlowJobName+"-drop-signals")
sigSelector.AddReceive(gCtx.Done(), func(_ workflow.ReceiveChannel, _ bool) {})

var onFlowSignal func(model.CDCFlowSignal, bool)
onFlowSignal = func(val model.CDCFlowSignal, _ bool) {
activeSignal = model.FlowSignalHandler(activeSignal, val, logger)
flowSignalChan.AddToSelector(sigSelector, onFlowSignal)
}
flowSignalChan.AddToSelector(sigSelector, onFlowSignal)

var onStateChange func(*protos.FlowStateChangeRequest, bool)
onStateChange = func(req *protos.FlowStateChangeRequest, _ bool) {
if req.RequestedFlowState == protos.FlowStatus_STATUS_TERMINATING {
activeSignal = model.TerminateSignal
}
flowSignalStateChangeChan.AddToSelector(sigSelector, onStateChange)
}
flowSignalStateChangeChan.AddToSelector(sigSelector, onStateChange)

for gCtx.Err() == nil {
sigSelector.Select(gCtx)
}
})
}

status := protos.FlowStatus_STATUS_TERMINATING
if input.Resync {
status = protos.FlowStatus_STATUS_RESYNC
}
logger := workflow.GetLogger(ctx)
cdc_state.SyncStatusToCatalogWithFlowName(ctx, logger, status, input.FlowJobName)

ctx = workflow.WithValue(ctx, shared.FlowNameKey, input.FlowJobName)
Expand Down Expand Up @@ -205,7 +242,7 @@ func DropFlowWorkflow(ctx workflow.Context, input *protos.DropFlowInput) error {

req := model.RemoveFlowDetailsFromCatalogRequest{
FlowName: input.FlowJobName,
Resync: input.Resync,
Resync: input.Resync && activeSignal != model.TerminateSignal,
}
if err := workflow.ExecuteActivity(
removeFlowEntriesCtx, flowable.RemoveFlowDetailsFromCatalog, &req,
Expand All @@ -214,7 +251,7 @@ func DropFlowWorkflow(ctx workflow.Context, input *protos.DropFlowInput) error {
return err
}

if input.Resync {
if input.Resync && activeSignal != model.TerminateSignal {
return workflow.NewContinueAsNewError(ctx, CDCFlowWorkflow, input.FlowConnectionConfigs, nil)
}

Expand Down
Loading