diff --git a/app/api/executions/[executionId]/cancel/route.ts b/app/api/executions/[executionId]/cancel/route.ts new file mode 100644 index 000000000..9fbf681fc --- /dev/null +++ b/app/api/executions/[executionId]/cancel/route.ts @@ -0,0 +1 @@ +export { POST } from "@/keeperhub/api/executions/[executionId]/cancel/route"; diff --git a/app/api/internal/executions/[executionId]/route.ts b/app/api/internal/executions/[executionId]/route.ts index 830bece4f..505dccb1b 100644 --- a/app/api/internal/executions/[executionId]/route.ts +++ b/app/api/internal/executions/[executionId]/route.ts @@ -1,5 +1,5 @@ // start custom keeperhub code // -import { eq } from "drizzle-orm"; +import { and, eq, ne } from "drizzle-orm"; import { NextResponse } from "next/server"; import { authenticateInternalService } from "@/keeperhub/lib/internal-service-auth"; @@ -35,16 +35,21 @@ export async function PATCH( const typedStatus = status as ExecutionStatus; - // Check execution exists + // Check execution exists and is not already cancelled const existing = await db.query.workflowExecutions.findFirst({ where: eq(workflowExecutions.id, executionId), - columns: { id: true }, + columns: { id: true, status: true }, }); if (!existing) { return NextResponse.json({ error: "Execution not found" }, { status: 404 }); } + // Don't overwrite cancelled status (user already stopped this execution) + if (existing.status === "cancelled") { + return NextResponse.json({ success: true }); + } + // Build update payload const updateData: { status: ExecutionStatus; @@ -62,7 +67,12 @@ export async function PATCH( await db .update(workflowExecutions) .set(updateData) - .where(eq(workflowExecutions.id, executionId)); + .where( + and( + eq(workflowExecutions.id, executionId), + ne(workflowExecutions.status, "cancelled") + ) + ); return NextResponse.json({ success: true }); } diff --git a/app/api/workflows/executions/[executionId]/status/route.ts b/app/api/workflows/executions/[executionId]/status/route.ts index 368c0af1e..37fe8abb0 100644 --- a/app/api/workflows/executions/[executionId]/status/route.ts +++ b/app/api/workflows/executions/[executionId]/status/route.ts @@ -12,7 +12,7 @@ import { workflowExecutionLogs, workflowExecutions } from "@/lib/db/schema"; type NodeStatus = { nodeId: string; - status: "pending" | "running" | "success" | "error"; + status: "pending" | "running" | "success" | "error" | "cancelled"; }; export async function GET( diff --git a/components/workflow/workflow-runs.tsx b/components/workflow/workflow-runs.tsx index fa0b588b9..291fd5fd0 100644 --- a/components/workflow/workflow-runs.tsx +++ b/components/workflow/workflow-runs.tsx @@ -1,7 +1,8 @@ "use client"; -import { useAtom } from "jotai"; +import { useAtom, useAtomValue } from "jotai"; import { + Ban, Check, ChevronDown, ChevronRight, @@ -35,6 +36,7 @@ import { getRelativeTime } from "@/lib/utils/time"; import { currentWorkflowIdAtom, executionLogsAtom, + runsRefreshTriggerAtom, selectedExecutionIdAtom, } from "@/lib/workflow-store"; import { Button } from "../ui/button"; @@ -45,7 +47,7 @@ type ExecutionLog = { nodeId: string; nodeName: string; nodeType: string; - status: "pending" | "running" | "success" | "error"; + status: "pending" | "running" | "success" | "error" | "cancelled"; startedAt: Date; completedAt: Date | null; duration: string | null; @@ -123,7 +125,7 @@ function createExecutionLogsMap(logs: ExecutionLog[]): Record< nodeId: string; nodeName: string; nodeType: string; - status: "pending" | "running" | "success" | "error"; + status: "pending" | "running" | "success" | "error" | "cancelled"; output?: unknown; } > { @@ -133,7 +135,7 @@ function createExecutionLogsMap(logs: ExecutionLog[]): Record< nodeId: string; nodeName: string; nodeType: string; - status: "pending" | "running" | "success" | "error"; + status: "pending" | "running" | "success" | "error" | "cancelled"; output?: unknown; } > = {}; @@ -532,6 +534,9 @@ function getProgressBarColor(status: WorkflowExecution["status"]): string { if (status === "success") { return "bg-green-500"; } + if (status === "cancelled") { + return "bg-orange-500"; + } return "bg-red-500"; } @@ -902,6 +907,9 @@ export function WorkflowRuns({ selectedExecutionIdAtom ); const [, setExecutionLogs] = useAtom(executionLogsAtom); + // start custom keeperhub code // + const runsRefreshTrigger = useAtomValue(runsRefreshTriggerAtom); + // end keeperhub code // const [executions, setExecutions] = useState([]); const [logs, setLogs] = useState>({}); const [expandedRuns, setExpandedRuns] = useState>(new Set()); @@ -911,6 +919,11 @@ export function WorkflowRuns({ // Track which execution we've already auto-expanded to prevent loops const autoExpandedExecutionRef = useRef(null); + // start custom keeperhub code // + // Track terminal executions that have had their final log refresh + const finalizedExecutionsRef = useRef>(new Set()); + // end keeperhub code // + const loadExecutions = useCallback( async (showLoading = true) => { if (!currentWorkflowId) { @@ -947,6 +960,15 @@ export function WorkflowRuns({ loadExecutions(); }, [loadExecutions]); + // start custom keeperhub code // + // Immediate refresh when toolbar signals a new execution started + useEffect(() => { + if (runsRefreshTrigger > 0) { + loadExecutions(false); + } + }, [runsRefreshTrigger, loadExecutions]); + // end keeperhub code // + // Clear expanded runs when workflow changes to prevent stale state useEffect(() => { setExpandedRuns(new Set()); @@ -962,7 +984,7 @@ export function WorkflowRuns({ nodeId: string; nodeName: string; nodeType: string; - status: "pending" | "running" | "success" | "error"; + status: "pending" | "running" | "success" | "error" | "cancelled"; input: unknown; output: unknown; error: string | null; @@ -1092,13 +1114,28 @@ export function WorkflowRuns({ const data = await api.workflow.getExecutions(currentWorkflowId); setExecutions(data as WorkflowExecution[]); - // Also refresh logs for expanded runs (only if they exist in current executions) - const validExecutionIds = new Set(data.map((e) => e.id)); + // start custom keeperhub code // + // Refresh logs for expanded runs: always for running, once more for newly-terminal + const terminalStatuses = new Set(["cancelled", "success", "error"]); + const executionMap = new Map(data.map((e) => [e.id, e])); for (const executionId of expandedRuns) { - if (validExecutionIds.has(executionId)) { + const execution = executionMap.get(executionId); + if (!execution) { + continue; + } + const isTerminal = terminalStatuses.has(execution.status); + const alreadyFinalized = + finalizedExecutionsRef.current.has(executionId); + + if (!isTerminal) { await refreshExecutionLogs(executionId); + } else if (!alreadyFinalized) { + // One final refresh to pick up cancel cleanup, then stop + await refreshExecutionLogs(executionId); + finalizedExecutionsRef.current.add(executionId); } } + // end keeperhub code // } catch (error) { console.error("Failed to poll executions:", error); } @@ -1154,6 +1191,8 @@ export function WorkflowRuns({ return ; case "running": return ; + case "cancelled": + return ; default: return ; } @@ -1167,6 +1206,8 @@ export function WorkflowRuns({ return "bg-red-600"; case "running": return "bg-blue-600"; + case "cancelled": + return "bg-orange-500"; default: return "bg-muted-foreground"; } diff --git a/components/workflow/workflow-toolbar.tsx b/components/workflow/workflow-toolbar.tsx index 2e68dbb5c..e7d590f3a 100644 --- a/components/workflow/workflow-toolbar.tsx +++ b/components/workflow/workflow-toolbar.tsx @@ -14,6 +14,7 @@ import { Redo2, Save, Settings2, + Square, Trash2, Undo2, } from "lucide-react"; @@ -49,6 +50,7 @@ import { canRedoAtom, canUndoAtom, clearWorkflowAtom, + currentExecutionIdAtom, currentWorkflowIdAtom, currentWorkflowNameAtom, currentWorkflowPublicTagsAtom, @@ -65,6 +67,7 @@ import { nodesAtom, propertiesPanelActiveTabAtom, redoAtom, + runsRefreshTriggerAtom, selectedEdgeAtom, selectedExecutionIdAtom, selectedNodeAtom, @@ -428,6 +431,10 @@ type ExecuteTestWorkflowParams = { pollingIntervalRef: React.MutableRefObject; setIsExecuting: (value: boolean) => void; setSelectedExecutionId: (value: string | null) => void; + setCurrentExecutionId: (value: string | null) => void; + // start custom keeperhub code // + onExecutionStarted?: () => void; + // end keeperhub code // }; async function executeTestWorkflow({ @@ -437,6 +444,10 @@ async function executeTestWorkflow({ pollingIntervalRef, setIsExecuting, setSelectedExecutionId, + setCurrentExecutionId, + // start custom keeperhub code // + onExecutionStarted, + // end keeperhub code // }: ExecuteTestWorkflowParams) { // Set all nodes to idle first updateNodesStatus(nodes, updateNodeData, "idle"); @@ -464,16 +475,32 @@ async function executeTestWorkflow({ const result = await response.json(); - // Select the new execution + // Select the new execution and track its ID for cancel support setSelectedExecutionId(result.executionId); + setCurrentExecutionId(result.executionId); + + // start custom keeperhub code // + // Signal the Runs panel to refresh immediately + onExecutionStarted?.(); + // end keeperhub code // // Poll for execution status updates const pollInterval = setInterval(async () => { + // Skip if polling was cancelled (e.g. user clicked Stop) + if (!pollingIntervalRef.current) { + return; + } + try { const statusData = await api.workflow.getExecutionStatus( result.executionId ); + // Skip update if cancelled while fetch was in-flight + if (!pollingIntervalRef.current) { + return; + } + // Update node statuses based on the execution logs for (const nodeStatus of statusData.nodeStatuses) { updateNodeData({ @@ -496,9 +523,14 @@ async function executeTestWorkflow({ } setIsExecuting(false); + setCurrentExecutionId(null); - // Don't reset node statuses - let them show the final state - // The user can click another run or deselect to reset + // start custom keeperhub code // + // Reset nodes to idle when cancelled (steps may show stale "success" from runtime) + if (statusData.status === "cancelled") { + updateNodesStatus(nodes, updateNodeData, "idle"); + } + // end keeperhub code // } } catch (error) { console.error("Failed to poll execution status:", error); @@ -513,6 +545,7 @@ async function executeTestWorkflow({ ); updateNodesStatus(nodes, updateNodeData, "error"); setIsExecuting(false); + setCurrentExecutionId(null); } } @@ -534,6 +567,8 @@ type WorkflowHandlerParams = { setEdges: (edges: WorkflowEdge[]) => void; setSelectedNodeId: (id: string | null) => void; setSelectedExecutionId: (id: string | null) => void; + currentExecutionId: string | null; + setCurrentExecutionId: (id: string | null) => void; userIntegrations: Array<{ id: string; type: IntegrationType }>; }; @@ -551,10 +586,15 @@ function useWorkflowHandlers({ setEdges, setSelectedNodeId, setSelectedExecutionId, + currentExecutionId, + setCurrentExecutionId, userIntegrations, }: WorkflowHandlerParams) { const { open: openOverlay } = useOverlay(); const pollingIntervalRef = useRef(null); + // start custom keeperhub code // + const setRunsRefreshTrigger = useSetAtom(runsRefreshTriggerAtom); + // end keeperhub code // // Cleanup polling interval on unmount useEffect( @@ -605,10 +645,39 @@ function useWorkflowHandlers({ pollingIntervalRef, setIsExecuting, setSelectedExecutionId, + setCurrentExecutionId, + // start custom keeperhub code // + onExecutionStarted: () => setRunsRefreshTrigger((c) => c + 1), + // end keeperhub code // }); // Don't set executing to false here - let polling handle it }; + const handleCancel = async (): Promise => { + // Best-effort cancel via API (may fail if execution already completed) + if (currentExecutionId) { + try { + await api.workflow.cancelExecution(currentExecutionId); + } catch { + // Execution may have already completed + } + } + + // Stop polling + if (pollingIntervalRef.current) { + clearInterval(pollingIntervalRef.current); + pollingIntervalRef.current = null; + } + + setIsExecuting(false); + setCurrentExecutionId(null); + + // Reset all node statuses to idle + updateNodesStatus(nodes, updateNodeData, "idle"); + + toast.success("Workflow execution cancelled"); + }; + const handleGoToStep = (nodeId: string, fieldKey?: string) => { setSelectedNodeId(nodeId); setActiveTab("properties"); @@ -687,6 +756,7 @@ function useWorkflowHandlers({ return { handleSave, handleExecute, + handleCancel, validateAndProceed, handleGoToStep, }; @@ -727,6 +797,9 @@ function useWorkflowState() { const setSelectedExecutionId = useSetAtom(selectedExecutionIdAtom); const userIntegrations = useAtomValue(integrationsAtom); const [triggerExecute, setTriggerExecute] = useAtom(triggerExecuteAtom); + const [currentExecutionId, setCurrentExecutionId] = useAtom( + currentExecutionIdAtom + ); const [isDownloading, setIsDownloading] = useState(false); const [isDuplicating, setIsDuplicating] = useState(false); @@ -811,6 +884,8 @@ function useWorkflowState() { userIntegrations, triggerExecute, setTriggerExecute, + currentExecutionId, + setCurrentExecutionId, isEnabled, setIsEnabled, }; @@ -844,6 +919,8 @@ function useWorkflowActions(state: ReturnType) { setEdges, setSelectedNodeId, setSelectedExecutionId, + currentExecutionId, + setCurrentExecutionId, userIntegrations, triggerExecute, setTriggerExecute, @@ -851,8 +928,8 @@ function useWorkflowActions(state: ReturnType) { session, } = state; - const { handleSave, handleExecute, validateAndProceed } = useWorkflowHandlers( - { + const { handleSave, handleExecute, handleCancel, validateAndProceed } = + useWorkflowHandlers({ currentWorkflowId, nodes, edges, @@ -866,9 +943,10 @@ function useWorkflowActions(state: ReturnType) { setEdges, setSelectedNodeId, setSelectedExecutionId, + currentExecutionId, + setCurrentExecutionId, userIntegrations, - } - ); + }); // Listen for execute trigger from keyboard shortcut useEffect(() => { @@ -1116,6 +1194,7 @@ function useWorkflowActions(state: ReturnType) { return { handleSave, handleExecute, + handleCancel, handleClearWorkflow, handleDeleteWorkflow, handleDownload, @@ -1580,6 +1659,21 @@ function RunButtonGroup({ isNonManualTrigger; // end keeperhub code // + // Show Stop button while executing + if (state.isExecuting) { + return ( + + ); + } + const button = ( ); diff --git a/keeperhub/api/executions/[executionId]/cancel/route.ts b/keeperhub/api/executions/[executionId]/cancel/route.ts new file mode 100644 index 000000000..2aa506ca8 --- /dev/null +++ b/keeperhub/api/executions/[executionId]/cancel/route.ts @@ -0,0 +1,110 @@ +import { and, eq } from "drizzle-orm"; +import { NextResponse } from "next/server"; +import { getOrgContext } from "@/keeperhub/lib/middleware/org-context"; +import { db } from "@/lib/db"; +import { + workflowExecutionLogs, + workflowExecutions, + workflows, +} from "@/lib/db/schema"; + +export async function POST( + _request: Request, + context: { params: Promise<{ executionId: string }> } +): Promise { + try { + const { executionId } = await context.params; + + const orgContext = await getOrgContext(); + + if (!orgContext.user?.id) { + return NextResponse.json({ error: "Unauthorized" }, { status: 401 }); + } + + if (!orgContext.organization?.id) { + return NextResponse.json( + { error: "No organization found" }, + { status: 400 } + ); + } + + // Fetch execution and verify it belongs to the user's org via the workflow + const execution = await db.query.workflowExecutions.findFirst({ + where: eq(workflowExecutions.id, executionId), + columns: { + id: true, + status: true, + workflowId: true, + startedAt: true, + }, + }); + + if (!execution) { + return NextResponse.json( + { error: "Execution not found" }, + { status: 404 } + ); + } + + // Verify the workflow belongs to the user's organization + const workflow = await db.query.workflows.findFirst({ + where: and( + eq(workflows.id, execution.workflowId), + eq(workflows.organizationId, orgContext.organization.id) + ), + columns: { id: true }, + }); + + if (!workflow) { + return NextResponse.json( + { error: "Execution not found" }, + { status: 404 } + ); + } + + if (execution.status !== "running") { + return NextResponse.json( + { error: "Execution is not running" }, + { status: 400 } + ); + } + + const now = new Date(); + const duration = now.getTime() - execution.startedAt.getTime(); + + await db + .update(workflowExecutions) + .set({ + status: "cancelled", + error: "Cancelled by user", + completedAt: now, + duration: duration.toString(), + currentNodeId: null, + currentNodeName: null, + }) + .where(eq(workflowExecutions.id, executionId)); + + // Mark any in-flight step logs as "error" to prevent orphaned "running" entries + await db + .update(workflowExecutionLogs) + .set({ + status: "cancelled", + error: "Cancelled by user", + completedAt: now, + }) + .where( + and( + eq(workflowExecutionLogs.executionId, executionId), + eq(workflowExecutionLogs.status, "running") + ) + ); + + return NextResponse.json({ success: true }); + } catch (error) { + console.error("Failed to cancel execution:", error); + return NextResponse.json( + { error: "Failed to cancel execution" }, + { status: 500 } + ); + } +} diff --git a/keeperhub/components/analytics/runs-filters.tsx b/keeperhub/components/analytics/runs-filters.tsx index 365f53069..dcd93ea41 100644 --- a/keeperhub/components/analytics/runs-filters.tsx +++ b/keeperhub/components/analytics/runs-filters.tsx @@ -24,6 +24,7 @@ const STATUS_OPTIONS: Array<{ { value: undefined, label: "All" }, { value: "success", label: "Success" }, { value: "error", label: "Error" }, + { value: "cancelled", label: "Cancelled" }, { value: "running", label: "Running" }, { value: "pending", label: "Pending" }, ]; diff --git a/keeperhub/components/analytics/runs-table.tsx b/keeperhub/components/analytics/runs-table.tsx index f30bd6db7..580dbb83c 100644 --- a/keeperhub/components/analytics/runs-table.tsx +++ b/keeperhub/components/analytics/runs-table.tsx @@ -77,6 +77,8 @@ const STATUS_STYLES: Record = { success: "bg-green-500/10 text-green-700 dark:text-green-400 border-green-500/20", error: "bg-red-500/10 text-red-700 dark:text-red-400 border-red-500/20", + cancelled: + "bg-orange-500/10 text-orange-700 dark:text-orange-400 border-orange-500/20", running: "bg-blue-500/10 text-blue-700 dark:text-blue-400 border-blue-500/20", pending: "bg-gray-500/10 text-gray-700 dark:text-gray-400 border-gray-500/20", } as const; diff --git a/keeperhub/components/analytics/time-series-chart.tsx b/keeperhub/components/analytics/time-series-chart.tsx index 50e632bb7..1c11f0091 100644 --- a/keeperhub/components/analytics/time-series-chart.tsx +++ b/keeperhub/components/analytics/time-series-chart.tsx @@ -23,6 +23,7 @@ import { const CHART_COLORS = { success: "var(--color-keeperhub-green)", error: "var(--chart-1)", + cancelled: "var(--color-orange-500, #f97316)", running: "var(--chart-2)", pending: "var(--chart-3)", } as const; @@ -115,6 +116,7 @@ function TimeSeriesContent({ timestamp: string; success: number; error: number; + cancelled: number; pending: number; running: number; }[]; @@ -175,6 +177,14 @@ function TimeSeriesContent({ stroke={CHART_COLORS.error} type="monotone" /> + 0 ? successCount / totalRuns : 0; const avgDurationMs = computeAvgDuration( @@ -160,6 +164,7 @@ export async function getAnalyticsSummary( totalRuns, successCount, errorCount, + cancelledCount, successRate, avgDurationMs, totalGasWei, @@ -177,6 +182,7 @@ async function getWorkflowCounts( total: number; success: number; error: number; + cancelled: number; durationSum: number; durationCount: number; }> { @@ -184,7 +190,8 @@ async function getWorkflowCounts( .select({ total: count(), success: sql`SUM(CASE WHEN ${workflowExecutions.status} = 'success' THEN 1 ELSE 0 END)`, - error: sql`SUM(CASE WHEN ${workflowExecutions.status} IN ('error', 'cancelled') THEN 1 ELSE 0 END)`, + error: sql`SUM(CASE WHEN ${workflowExecutions.status} = 'error' THEN 1 ELSE 0 END)`, + cancelled: sql`SUM(CASE WHEN ${workflowExecutions.status} = 'cancelled' THEN 1 ELSE 0 END)`, durationSum: sql`COALESCE(SUM(CAST(${workflowExecutions.duration} AS INTEGER)), 0)`, durationCount: sql`SUM(CASE WHEN ${workflowExecutions.duration} IS NOT NULL THEN 1 ELSE 0 END)`, }) @@ -204,6 +211,7 @@ async function getWorkflowCounts( total: Number(row?.total) || 0, success: Number(row?.success) || 0, error: Number(row?.error) || 0, + cancelled: Number(row?.cancelled) || 0, durationSum: Number(row?.durationSum) || 0, durationCount: Number(row?.durationCount) || 0, }; @@ -310,6 +318,7 @@ async function getPreviousPeriodSummary( totalRuns: workflowStats.total + directStats.total, successCount: workflowStats.success + directStats.success, errorCount: workflowStats.error + directStats.error, + cancelledCount: workflowStats.cancelled, avgDurationMs: computeAvgDuration( workflowStats.durationSum + directStats.durationSum, workflowStats.durationCount + directStats.durationCount @@ -405,7 +414,8 @@ export async function getTimeSeries( .select({ bucket: sql`${bucketExpr(workflowExecutions.startedAt)}`, success: sql`SUM(CASE WHEN ${workflowExecutions.status} = 'success' THEN 1 ELSE 0 END)`, - error: sql`SUM(CASE WHEN ${workflowExecutions.status} IN ('error', 'cancelled') THEN 1 ELSE 0 END)`, + error: sql`SUM(CASE WHEN ${workflowExecutions.status} = 'error' THEN 1 ELSE 0 END)`, + cancelled: sql`SUM(CASE WHEN ${workflowExecutions.status} = 'cancelled' THEN 1 ELSE 0 END)`, pending: sql`SUM(CASE WHEN ${workflowExecutions.status} = 'pending' THEN 1 ELSE 0 END)`, running: sql`SUM(CASE WHEN ${workflowExecutions.status} = 'running' THEN 1 ELSE 0 END)`, }) @@ -431,6 +441,7 @@ export async function getTimeSeries( bucket: sql`${bucketExpr(directExecutions.createdAt)}`, success: sql`SUM(CASE WHEN ${directExecutions.status} = 'completed' THEN 1 ELSE 0 END)`, error: sql`SUM(CASE WHEN ${directExecutions.status} = 'failed' THEN 1 ELSE 0 END)`, + cancelled: sql`0`, pending: sql`SUM(CASE WHEN ${directExecutions.status} = 'pending' THEN 1 ELSE 0 END)`, running: sql`SUM(CASE WHEN ${directExecutions.status} = 'running' THEN 1 ELSE 0 END)`, }) @@ -478,6 +489,7 @@ type BucketRow = { bucket: string; success: string; error: string; + cancelled: string; pending: string; running: string; }; @@ -718,7 +730,7 @@ async function fetchWorkflowRuns( ]; if (status) { - const dbStatuses = status === "error" ? ["error", "cancelled"] : [status]; + const dbStatuses = [status]; conditions.push( sql`${workflowExecutions.status} IN (${sql.join( dbStatuses.map((s) => sql`${s}`), @@ -891,7 +903,7 @@ async function getWorkflowRunsTotal( conditions.push(eq(workflows.projectId, projectId)); } if (status) { - const dbStatuses = status === "error" ? ["error", "cancelled"] : [status]; + const dbStatuses = [status]; conditions.push( sql`${workflowExecutions.status} IN (${sql.join( dbStatuses.map((s) => sql`${s}`), diff --git a/keeperhub/lib/analytics/types.ts b/keeperhub/lib/analytics/types.ts index 5c373d600..cc5138aa8 100644 --- a/keeperhub/lib/analytics/types.ts +++ b/keeperhub/lib/analytics/types.ts @@ -20,7 +20,12 @@ export type UnifiedStatus = | "completed" | "failed"; -export type NormalizedStatus = "pending" | "running" | "success" | "error"; +export type NormalizedStatus = + | "pending" + | "running" + | "success" + | "error" + | "cancelled"; export type UnifiedRun = { id: string; @@ -43,6 +48,7 @@ export type AnalyticsSummary = { totalRuns: number; successCount: number; errorCount: number; + cancelledCount: number; successRate: number; avgDurationMs: number | null; totalGasWei: string; @@ -51,6 +57,7 @@ export type AnalyticsSummary = { totalRuns: number; successCount: number; errorCount: number; + cancelledCount: number; avgDurationMs: number | null; totalGasWei: string; } | null; @@ -60,6 +67,7 @@ export type TimeSeriesBucket = { timestamp: string; success: number; error: number; + cancelled: number; pending: number; running: number; }; diff --git a/keeperhub/lib/template-helpers.ts b/keeperhub/lib/template-helpers.ts index 0fd8c07cf..442e78771 100644 --- a/keeperhub/lib/template-helpers.ts +++ b/keeperhub/lib/template-helpers.ts @@ -29,7 +29,7 @@ export function buildExecutionLogsMap( nodeId: string; nodeName: string; nodeType: string; - status: "pending" | "running" | "success" | "error"; + status: "pending" | "running" | "success" | "error" | "cancelled"; output?: unknown; }> ): ExecutionLogsByNodeId { diff --git a/lib/api-client.ts b/lib/api-client.ts index e242421e4..c798ed84b 100644 --- a/lib/api-client.ts +++ b/lib/api-client.ts @@ -680,7 +680,7 @@ export const workflowApi = { nodeId: string; nodeName: string; nodeType: string; - status: "pending" | "running" | "success" | "error"; + status: "pending" | "running" | "success" | "error" | "cancelled"; input: unknown; output: unknown; error: string | null; @@ -694,13 +694,19 @@ export const workflowApi = { }>; }>(`/api/workflows/executions/${executionId}/logs`), + // Cancel a running execution + cancelExecution: (executionId: string) => + apiCall<{ success: boolean }>(`/api/executions/${executionId}/cancel`, { + method: "POST", + }), + // Get execution status getExecutionStatus: (executionId: string) => apiCall<{ status: string; nodeStatuses: Array<{ nodeId: string; - status: "pending" | "running" | "success" | "error"; + status: "pending" | "running" | "success" | "error" | "cancelled"; }>; }>(`/api/workflows/executions/${executionId}/status`), diff --git a/lib/db/schema.ts b/lib/db/schema.ts index 75ec2af52..a25402787 100644 --- a/lib/db/schema.ts +++ b/lib/db/schema.ts @@ -309,7 +309,7 @@ export const workflowExecutionLogs = pgTable("workflow_execution_logs", { nodeType: text("node_type").notNull(), status: text("status") .notNull() - .$type<"pending" | "running" | "success" | "error">(), + .$type<"pending" | "running" | "success" | "error" | "cancelled">(), // biome-ignore lint/suspicious/noExplicitAny: JSONB type - structure validated at application level input: jsonb("input").$type(), // biome-ignore lint/suspicious/noExplicitAny: JSONB type - structure validated at application level diff --git a/lib/steps/step-handler.ts b/lib/steps/step-handler.ts index cdb3e33c1..c60785764 100644 --- a/lib/steps/step-handler.ts +++ b/lib/steps/step-handler.ts @@ -82,7 +82,10 @@ async function logStepComplete( logInfo: LogInfo, status: "success" | "error", output?: unknown, - error?: string + error?: string, + // start custom keeperhub code // + executionId?: string + // end keeperhub code // ): Promise { if (!logInfo.logId) { return; @@ -97,6 +100,9 @@ async function logStepComplete( status, output: redactedOutput, error, + // start custom keeperhub code // + executionId, + // end keeperhub code // }); } catch (err) { console.error("[stepHandler] Failed to log completion:", err); @@ -210,7 +216,8 @@ export async function withStepLogging( logInfo, "error", result, - errorResult.error || "Step execution failed" + errorResult.error || "Step execution failed", + context?.executionId ); // start custom keeperhub code // @@ -225,7 +232,13 @@ export async function withStepLogging( }); // end keeperhub code // } else { - await logStepComplete(logInfo, "success", result); + await logStepComplete( + logInfo, + "success", + result, + undefined, + context?.executionId + ); // start custom keeperhub code // recordStepMetrics({ @@ -268,7 +281,13 @@ export async function withStepLogging( } catch (error) { const errorMessage = error instanceof Error ? error.message : "Unknown error"; - await logStepComplete(logInfo, "error", undefined, errorMessage); + await logStepComplete( + logInfo, + "error", + undefined, + errorMessage, + context?.executionId + ); // start custom keeperhub code // recordStepMetrics({ diff --git a/lib/workflow-logging.ts b/lib/workflow-logging.ts index e13d60073..4fa72a19f 100644 --- a/lib/workflow-logging.ts +++ b/lib/workflow-logging.ts @@ -4,10 +4,26 @@ */ import "server-only"; -import { eq } from "drizzle-orm"; +import { and, eq, ne } from "drizzle-orm"; import { db } from "@/lib/db"; import { workflowExecutionLogs, workflowExecutions } from "@/lib/db/schema"; +// start custom keeperhub code // +const TERMINAL_STATUSES = new Set(["cancelled", "success", "error"]); + +/** + * Check if an execution has been cancelled (or otherwise terminated). + * Used as a guard to prevent stale writes from the runtime after cancellation. + */ +async function isExecutionTerminal(executionId: string): Promise { + const execution = await db.query.workflowExecutions.findFirst({ + where: eq(workflowExecutions.id, executionId), + columns: { status: true }, + }); + return !execution || TERMINAL_STATUSES.has(execution.status); +} +// end keeperhub code // + export type LogStepStartParams = { executionId: string; nodeId: string; @@ -31,6 +47,13 @@ export type LogStepStartResult = { export async function logStepStartDb( params: LogStepStartParams ): Promise { + // start custom keeperhub code // + // Guard: skip if execution was cancelled (runtime continues after cancel) + if (await isExecutionTerminal(params.executionId)) { + return { logId: "", startTime: Date.now() }; + } + // end keeperhub code // + const [log] = await db .insert(workflowExecutionLogs) .values({ @@ -60,6 +83,9 @@ export type LogStepCompleteParams = { status: "success" | "error"; output?: unknown; error?: string; + // start custom keeperhub code // + executionId?: string; + // end keeperhub code // }; /** @@ -68,6 +94,13 @@ export type LogStepCompleteParams = { export async function logStepCompleteDb( params: LogStepCompleteParams ): Promise { + // start custom keeperhub code // + // Guard: skip if execution was cancelled (runtime continues after cancel) + if (params.executionId && (await isExecutionTerminal(params.executionId))) { + return; + } + // end keeperhub code // + const duration = Date.now() - params.startTime; await db @@ -110,7 +143,12 @@ export async function logWorkflowCompleteDb( currentNodeId: null, currentNodeName: null, }) - .where(eq(workflowExecutions.id, params.executionId)); + .where( + and( + eq(workflowExecutions.id, params.executionId), + ne(workflowExecutions.status, "cancelled") + ) + ); } // ============================================================================ @@ -162,7 +200,14 @@ export async function updateCurrentStep( currentNodeId: params.currentNodeId, currentNodeName: params.currentNodeName, }) - .where(eq(workflowExecutions.id, params.executionId)); + .where( + // start custom keeperhub code // + and( + eq(workflowExecutions.id, params.executionId), + ne(workflowExecutions.status, "cancelled") + ) + // end keeperhub code // + ); } export type IncrementCompletedStepsParams = { @@ -188,6 +233,13 @@ export async function incrementCompletedSteps( return; } + // start custom keeperhub code // + // Guard: skip if execution was cancelled (runtime continues after cancel) + if (TERMINAL_STATUSES.has(execution.status)) { + return; + } + // end keeperhub code // + const completedSteps = Number.parseInt(execution.completedSteps || "0", 10) + 1; const trace = (execution.executionTrace as string[] | null) || []; diff --git a/lib/workflow-store.ts b/lib/workflow-store.ts index f1962be17..bd839b5a5 100644 --- a/lib/workflow-store.ts +++ b/lib/workflow-store.ts @@ -60,6 +60,10 @@ export const isWorkflowEnabled = atom(false); // UI state atoms export const propertiesPanelActiveTabAtom = atom("properties"); +// start custom keeperhub code // +// Increment to trigger an immediate Runs panel refresh (e.g. after execute) +export const runsRefreshTriggerAtom = atom(0); +// end keeperhub code // export const showMinimapAtom = atom(false); export const selectedExecutionIdAtom = atom(null); export const rightPanelWidthAtom = atom(null); @@ -76,6 +80,9 @@ export const pendingIntegrationNodesAtom = atom>(new Set()); // Cleared when the node gets an action type or is deselected export const newlyCreatedNodeIdAtom = atom(null); +// Tracks the execution ID of the currently running execution (for cancel support) +export const currentExecutionIdAtom = atom(null); + // Trigger execute atom - set to true to trigger workflow execution // This allows keyboard shortcuts to trigger the same execute flow as the button export const triggerExecuteAtom = atom(false); @@ -85,7 +92,7 @@ export type ExecutionLogEntry = { nodeId: string; nodeName: string; nodeType: string; - status: "pending" | "running" | "success" | "error"; + status: "pending" | "running" | "success" | "error" | "cancelled"; output?: unknown; }; @@ -509,6 +516,7 @@ export const resetWorkflowStateForOrgSwitchAtom = atom(null, (_get, set) => { set(isWorkflowOwnerAtom, true); set(isWorkflowEnabled, false); set(workflowNotFoundAtom, false); + set(currentExecutionIdAtom, null); set(selectedExecutionIdAtom, null); set(executionLogsAtom, {}); set(lastExecutionLogsAtom, { workflowId: null, logs: {} });