From fcd0833d9a7198b53456e7e49400d600be33226b Mon Sep 17 00:00:00 2001 From: Pavel Tcholakov Date: Thu, 29 Jan 2026 13:55:58 +0200 Subject: [PATCH 1/3] Add deployment removal and drained deployment pruning to ServiceDeployer - Add removalPolicy option (RETAIN default, DESTROY force-deletes on removal) - Add pruneDrainedDeployments option to clean up old drained deployments - Add revisionHistoryLimit to keep N recent drained revisions - Add maxPrunedPerRun (default 10) to limit cleanup per deployment - Query deployments by endpoint ARN for deletion instead of synthetic ID - Pruning finds all drained deployments (no services, no pinned invocations) - Delete is best-effort to avoid blocking CloudFormation stack deletion --- .../register-service-handler/index.mts | 213 +++++++++++++++++- lib/restate-constructs/service-deployer.ts | 63 +++++- .../restate-constructs.test.ts.snap | 163 +++++++++++++- test/e2e/cdk-util.ts | 71 +++++- test/e2e/restate-cloud-pruning-test.e2e.ts | 108 +++++++++ test/e2e/stacks/restate-cloud-lambda-stack.ts | 13 +- test/restate-constructs.test.ts | 29 +++ 7 files changed, 643 insertions(+), 17 deletions(-) create mode 100644 test/e2e/restate-cloud-pruning-test.e2e.ts diff --git a/lib/restate-constructs/register-service-handler/index.mts b/lib/restate-constructs/register-service-handler/index.mts index 498cc97..6d5ee05 100644 --- a/lib/restate-constructs/register-service-handler/index.mts +++ b/lib/restate-constructs/register-service-handler/index.mts @@ -51,7 +51,24 @@ export interface RegistrationProperties { /** Whether to trust any certificate when connecting to the admin endpoint. */ insecure?: "true" | "false"; - // removalPolicy?: string; + /** What to do when the handler is removed: "retain" (default) or "destroy". */ + removalPolicy?: "retain" | "destroy"; + + /** Whether to prune drained deployments for the same handler after registration. */ + pruneDrainedDeployments?: "true" | "false"; + + /** Number of old drained deployment revisions to retain when pruning. */ + revisionHistoryLimit?: number; + + /** Maximum number of drained deployments to prune per run. */ + maxPrunedPerRun?: number; + + /** + * Whether to prune deployments that have only completed invocations pinned. Default is false (conservative). + * When false, deployments with ANY pinned invocations (including completed) will not be pruned. + * When true, only deployments with active (non-completed) invocations will be kept. + */ + allowPruningDeploymentsWithCompletedInvocations?: "true" | "false"; } type RegisterDeploymentResponse = { @@ -124,11 +141,47 @@ export const handler = async function (event: CloudFormationCustomResourceEvent) const rejectUnauthorized = props.insecure !== "true"; if (event.RequestType === "Delete") { - // Since we retain older Lambda handler versions on update, we also leave the registered service alone. There may - // be unfinished invocations that require it; in the future we would want to inform Restate that we want to - // de-register the service, and wait for Restate to let us know that it is safe to delete the deployed Function - // version from Lambda. - console.warn("De-registering services is not supported currently. Previous version will remain registered."); + if (props.removalPolicy !== "destroy") { + console.log("Removal policy is 'retain'; leaving deployment registered in Restate."); + return; + } + + let authHeader: Record = {}; + try { + authHeader = await createAuthHeader(props); + } catch (e) { + console.warn(`Failed to load auth token for deletion: ${(e as Error)?.message}`); + console.warn("Proceeding with deletion without auth header."); + } + + console.log(`Removal policy is 'destroy'; finding deployment for ${props.serviceLambdaArn}`); + + // Best-effort deletion: log errors but don't fail CloudFormation delete + try { + const deploymentIds = await findDeploymentsByEndpoint( + props.adminUrl!, + props.serviceLambdaArn!, + authHeader, + rejectUnauthorized, + ); + + if (deploymentIds.length === 0) { + console.log("No deployments found for this endpoint; nothing to delete."); + return; + } + + for (const deploymentId of deploymentIds) { + try { + console.log(`Deleting deployment ${deploymentId}...`); + await deleteDeployment(props.adminUrl!, deploymentId, authHeader, rejectUnauthorized); + console.log(`Deleted deployment ${deploymentId}.`); + } catch (e) { + console.warn(`Failed to delete deployment ${deploymentId}: ${(e as Error)?.message}`); + } + } + } catch (e) { + console.warn(`Failed to query/delete deployments: ${(e as Error)?.message}`); + } return; } @@ -245,6 +298,26 @@ export const handler = async function (event: CloudFormationCustomResourceEvent) console.log(`Successfully marked service as ${isPublic ? "public" : "private"}.`); } + if (props.pruneDrainedDeployments === "true") { + try { + if (!props.serviceLambdaArn) { + console.warn("Pruning requested but no serviceLambdaArn provided; skipping."); + } else { + await pruneDrainedDeployments( + props.adminUrl!, + props.serviceLambdaArn, + props.revisionHistoryLimit ?? 0, + props.maxPrunedPerRun ?? 10, + props.allowPruningDeploymentsWithCompletedInvocations === "true", + authHeader, + rejectUnauthorized, + ); + } + } catch (e) { + console.warn(`Failed to prune drained deployments: ${(e as Error)?.message}`); + } + } + return; // Overall success! } else { failureReason = `Registration failed (${registerDeploymentResponse.statusCode}): ${registerDeploymentResponse.body}`; @@ -295,3 +368,131 @@ async function createAuthHeader(props: RegistrationProperties): Promise setTimeout(resolve, millis)); } + +async function deleteDeployment( + adminUrl: string, + deploymentId: string, + authHeader: Record, + rejectUnauthorized: boolean, +) { + const deleteUrl = new URL(`${adminUrl}/${DEPLOYMENTS_PATH}/${deploymentId}?force=true`); + + const deleteResponse = await httpRequest(deleteUrl, { + method: "DELETE", + headers: authHeader, + timeout: 10_000, + rejectUnauthorized, + }); + + const isSuccess = + (deleteResponse.statusCode >= 200 && deleteResponse.statusCode < 300) || deleteResponse.statusCode === 404; + if (!isSuccess) { + throw new Error(`Delete deployment failed (${deleteResponse.statusCode}): ${deleteResponse.body}`); + } +} + +type QueryResponse = { + rows: Record[]; +}; + +async function queryRestate( + adminUrl: string, + sql: string, + authHeader: Record, + rejectUnauthorized: boolean, +): Promise[]> { + const queryUrl = new URL(`${adminUrl}/query`); + + const queryResponse = await httpRequest(queryUrl, { + method: "POST", + headers: { + "Content-Type": "application/json", + Accept: "application/json", // Request JSON format (default is Apache Arrow IPC binary) + ...authHeader, + }, + body: JSON.stringify({ query: sql }), + timeout: 30_000, + rejectUnauthorized, + }); + + if (queryResponse.statusCode !== 200) { + throw new Error(`Query failed (${queryResponse.statusCode}): ${queryResponse.body}`); + } + + const response = JSON.parse(queryResponse.body) as QueryResponse; + return response.rows; +} + +function escapeSqlString(value: string): string { + return value.replace(/'/g, "''"); +} + +async function findDeploymentsByEndpoint( + adminUrl: string, + endpointArn: string, + authHeader: Record, + rejectUnauthorized: boolean, +): Promise { + const sql = ` + SELECT id FROM sys_deployment + WHERE endpoint = '${escapeSqlString(endpointArn)}' + `; + + const rows = await queryRestate(adminUrl, sql, authHeader, rejectUnauthorized); + return rows.map((row) => row.id as string); +} + +async function pruneDrainedDeployments( + adminUrl: string, + _endpointArn: string, + revisionHistoryLimit: number, + maxPrunedPerRun: number, + allowPruningDeploymentsWithCompletedInvocations: boolean, + authHeader: Record, + rejectUnauthorized: boolean, +) { + const safeOffset = Math.max(0, revisionHistoryLimit); + const safeLimit = Math.max(1, maxPrunedPerRun); + + console.log(`Pruning drained deployments (keeping ${safeOffset} revisions, max ${safeLimit} per run)`); + + // Find drained deployments: no associated services and no pinned invocations + // By default (conservative), exclude deployments with ANY pinned invocations + // If allowPruningDeploymentsWithCompletedInvocations is true, only exclude deployments with active invocations + // Prune oldest first, skip the N most recent drained ones + const invocationStatusFilter = allowPruningDeploymentsWithCompletedInvocations + ? "AND i.status != 'completed'" // Only consider active invocations + : ""; // Consider all invocations (conservative) + + const sql = ` + SELECT d.id, d.created_at + FROM sys_deployment d + LEFT JOIN sys_service s ON (d.id = s.deployment_id) + LEFT JOIN sys_invocation_status i ON (d.id = i.pinned_deployment_id ${invocationStatusFilter}) + WHERE s.name IS NULL + AND i.id IS NULL + ORDER BY d.created_at DESC + OFFSET ${safeOffset} + LIMIT ${safeLimit} + `; + + const drainedDeployments = await queryRestate(adminUrl, sql, authHeader, rejectUnauthorized); + + if (drainedDeployments.length === 0) { + console.log("No drained deployments to prune."); + return; + } + + console.log(`Found ${drainedDeployments.length} drained deployment(s) to prune.`); + + for (const deployment of drainedDeployments) { + const deploymentId = deployment.id as string; + try { + console.log(`Deleting drained deployment ${deploymentId}...`); + await deleteDeployment(adminUrl, deploymentId, authHeader, rejectUnauthorized); + console.log(`Deleted drained deployment ${deploymentId}.`); + } catch (e) { + console.warn(`Failed to delete drained deployment ${deploymentId}: ${(e as Error)?.message}`); + } + } +} diff --git a/lib/restate-constructs/service-deployer.ts b/lib/restate-constructs/service-deployer.ts index 962ab5d..2998a7c 100644 --- a/lib/restate-constructs/service-deployer.ts +++ b/lib/restate-constructs/service-deployer.ts @@ -70,6 +70,56 @@ export interface ServiceRegistrationProps { * Restate service is behind a load balancer. */ adminUrl?: string; + + /** + * What to do when the handler is removed from the stack. + * - RETAIN: Leave deployment registered in Restate (default). Use this if you want to transition to managing + * deployments manually, or if you want to remove the ServiceDeployer without affecting existing registrations. + * - DESTROY: Force-remove the deployment from Restate. Use this if you are decommissioning the service. + * + * Default: RETAIN + */ + removalPolicy?: cdk.RemovalPolicy; + + /** + * Prune fully drained deployments of the same handler after each successful registration. Only removes deployments + * that have no associated services and no pinned invocations. This helps clean up old deployment versions that + * accumulate over time as new versions are registered. + * + * Default: false + */ + pruneDrainedDeployments?: boolean; + + /** + * Number of old drained deployment revisions to retain. Only applies if `pruneDrainedDeployments` is enabled. + * Drained deployments beyond this limit will be removed, oldest first. + * + * Default: 0 + */ + revisionHistoryLimit?: number; + + /** + * Maximum number of drained deployments to prune per registration. Limits the cleanup work done in each + * deployment to avoid long-running operations. Only applies if `pruneDrainedDeployments` is enabled. + * + * Default: 10 + */ + maxPrunedPerRun?: number; + + /** + * Whether to prune deployments that have only completed invocations pinned. Only applies if + * `pruneDrainedDeployments` is enabled. + * + * By default (false), deployments with ANY pinned invocations (including completed ones) will not be pruned. + * This is the conservative behavior that ensures no deployment is removed while it might still be referenced. + * + * When true, only deployments with active (non-completed) invocations will be kept. Deployments with only + * completed invocations will be pruned. Use this for more aggressive cleanup when you're confident that + * completed invocations don't need their original deployment. + * + * Default: false + */ + allowPruningDeploymentsWithCompletedInvocations?: boolean; } /** @@ -212,13 +262,18 @@ export class ServiceDeployer extends Construct { authTokenSecretArn: authToken?.secretArn, serviceLambdaArn: handler.functionArn, invokeRoleArn: invokerRole?.roleArn, - // removalPolicy: "retain", + removalPolicy: options?.removalPolicy === cdk.RemovalPolicy.DESTROY ? "destroy" : ("retain" as const), private: (options?.private ?? false).toString() as "true" | "false", configurationVersion: - options?.configurationVersion || handler.functionArn.endsWith(":$LATEST") - ? new Date().toISOString() - : undefined, + options?.configurationVersion ?? + (handler.functionArn.endsWith(":$LATEST") ? new Date().toISOString() : undefined), insecure: (options?.insecure ?? false).toString() as "true" | "false", + pruneDrainedDeployments: (options?.pruneDrainedDeployments ?? false).toString() as "true" | "false", + revisionHistoryLimit: options?.revisionHistoryLimit ?? 0, + maxPrunedPerRun: options?.maxPrunedPerRun ?? 10, + allowPruningDeploymentsWithCompletedInvocations: ( + options?.allowPruningDeploymentsWithCompletedInvocations ?? false + ).toString() as "true" | "false", } satisfies RegistrationProperties, }); diff --git a/test/__snapshots__/restate-constructs.test.ts.snap b/test/__snapshots__/restate-constructs.test.ts.snap index 5013a8a..99288e0 100644 --- a/test/__snapshots__/restate-constructs.test.ts.snap +++ b/test/__snapshots__/restate-constructs.test.ts.snap @@ -1316,8 +1316,13 @@ exports[`Restate constructs Deploy a Lambda service handler to Restate Cloud env 'Fn::GetAtt': - RestateInvokerRole42565598 - Arn + removalPolicy: retain private: 'false' insecure: 'false' + pruneDrainedDeployments: 'false' + revisionHistoryLimit: 0 + maxPrunedPerRun: 10 + allowPruningDeploymentsWithCompletedInvocations: 'false' DependsOn: - ServiceDeployerInvocationPolicyD09B639D UpdateReplacePolicy: Delete @@ -1508,8 +1513,13 @@ exports[`Restate constructs Deploy a Lambda service handler to existing Restate 'Fn::GetAtt': - InvokerRole4DB2757E - Arn + removalPolicy: retain private: 'false' insecure: 'false' + pruneDrainedDeployments: 'false' + revisionHistoryLimit: 0 + maxPrunedPerRun: 10 + allowPruningDeploymentsWithCompletedInvocations: 'false' DependsOn: - ServiceDeployerInvocationPolicyD09B639D UpdateReplacePolicy: Delete @@ -1714,8 +1724,13 @@ exports[`Restate constructs Restate Cloud Environment construct with role refere Ref: >- RestateServiceHandlerCurrentVersion40030E674d150b78008a2f2be732db1598a1a845 invokeRoleArn: 'arn:aws:iam::654654156625:role/Invoker' + removalPolicy: retain private: 'false' insecure: 'false' + pruneDrainedDeployments: 'false' + revisionHistoryLimit: 0 + maxPrunedPerRun: 10 + allowPruningDeploymentsWithCompletedInvocations: 'false' DependsOn: - ServiceDeployerInvocationPolicyD09B639D UpdateReplacePolicy: Delete @@ -1833,7 +1848,7 @@ exports[`Restate constructs Service Deployer overrides 1`] = ` - arm64 Code: S3Bucket: cdk-hnb659fds-assets-account-id-region - S3Key: 00d00ebb12b16db855339e5b1a88b4dc2b9cf6509e147c47c46d68465e48b030.zip + S3Key: d15e4f9469705b39c56cb906737a89b97e5b6127c274170793154ed613d8b520.zip Description: Restate custom registration handler Handler: entrypoint.handler MemorySize: 128 @@ -1858,3 +1873,149 @@ exports[`Restate constructs Service Deployer overrides 1`] = ` DeletionPolicy: RetainExceptOnCreate " `; + +exports[`Restate constructs Service Deployer with removal policy and pruning options 1`] = ` +"Resources: + InvokerRole4DB2757E: + Type: 'AWS::IAM::Role' + Properties: + AssumeRolePolicyDocument: + Statement: + - Action: 'sts:AssumeRole' + Effect: Allow + Principal: + AWS: + 'Fn::Join': + - '' + - - 'arn:' + - Ref: 'AWS::Partition' + - ':iam::account-id:root' + Version: '2012-10-17' + RestateServiceHandlerServiceRole07B26D05: + Type: 'AWS::IAM::Role' + Properties: + AssumeRolePolicyDocument: + Statement: + - Action: 'sts:AssumeRole' + Effect: Allow + Principal: + Service: lambda.amazonaws.com + Version: '2012-10-17' + ManagedPolicyArns: + - 'Fn::Join': + - '' + - - 'arn:' + - Ref: 'AWS::Partition' + - ':iam::aws:policy/service-role/AWSLambdaBasicExecutionRole' + RestateServiceHandler71409CD7: + Type: 'AWS::Lambda::Function' + Properties: + Code: Any + Handler: index.handler + Role: + 'Fn::GetAtt': + - RestateServiceHandlerServiceRole07B26D05 + - Arn + Runtime: nodejs22.x + DependsOn: + - RestateServiceHandlerServiceRole07B26D05 + RestateServiceHandlerCurrentVersion40030E674d150b78008a2f2be732db1598a1a845: + Type: 'AWS::Lambda::Version' + Properties: + FunctionName: + Ref: RestateServiceHandler71409CD7 + Metadata: + 'aws:cdk:do-not-refactor': true + RestateServiceHandlerCurrentVersionRestateServiceDeployment1DB3C4D6: + Type: 'Custom::RestateServiceDeployment' + Properties: + ServiceToken: + 'Fn::GetAtt': + - ServiceDeployerEventHandler89EAD25F + - Arn + adminUrl: 'https://restate.example.com:9070' + serviceLambdaArn: + Ref: >- + RestateServiceHandlerCurrentVersion40030E674d150b78008a2f2be732db1598a1a845 + invokeRoleArn: + 'Fn::GetAtt': + - InvokerRole4DB2757E + - Arn + removalPolicy: destroy + private: 'false' + insecure: 'false' + pruneDrainedDeployments: 'true' + revisionHistoryLimit: 5 + maxPrunedPerRun: 10 + allowPruningDeploymentsWithCompletedInvocations: 'false' + DependsOn: + - ServiceDeployerInvocationPolicyD09B639D + UpdateReplacePolicy: Delete + DeletionPolicy: Delete + ServiceDeployerEventHandlerServiceRoleF133584F: + Type: 'AWS::IAM::Role' + Properties: + AssumeRolePolicyDocument: + Statement: + - Action: 'sts:AssumeRole' + Effect: Allow + Principal: + Service: lambda.amazonaws.com + Version: '2012-10-17' + ManagedPolicyArns: + - 'Fn::Join': + - '' + - - 'arn:' + - Ref: 'AWS::Partition' + - ':iam::aws:policy/service-role/AWSLambdaBasicExecutionRole' + ServiceDeployerEventHandler89EAD25F: + Type: 'AWS::Lambda::Function' + Properties: + Architectures: + - arm64 + Code: Any + Description: Restate custom registration handler + Handler: entrypoint.handler + MemorySize: 128 + Role: + 'Fn::GetAtt': + - ServiceDeployerEventHandlerServiceRoleF133584F + - Arn + Runtime: nodejs22.x + Timeout: 300 + DependsOn: + - ServiceDeployerEventHandlerServiceRoleF133584F + ServiceDeployerDeploymentLogs5B8BE5D2: + Type: 'AWS::Logs::LogGroup' + Properties: + LogGroupName: + 'Fn::Join': + - '' + - - /aws/lambda/ + - Ref: ServiceDeployerEventHandler89EAD25F + RetentionInDays: 30 + UpdateReplacePolicy: Retain + DeletionPolicy: RetainExceptOnCreate + ServiceDeployerInvocationPolicyD09B639D: + Type: 'AWS::IAM::Policy' + Properties: + PolicyDocument: + Statement: + - Action: 'lambda:InvokeFunction' + Effect: Allow + Resource: + - 'Fn::GetAtt': + - RestateServiceHandler71409CD7 + - Arn + - 'Fn::Join': + - '' + - - 'Fn::GetAtt': + - RestateServiceHandler71409CD7 + - Arn + - ':*' + Version: '2012-10-17' + PolicyName: ServiceDeployerInvocationPolicyD09B639D + Roles: + - Ref: InvokerRole4DB2757E +" +`; diff --git a/test/e2e/cdk-util.ts b/test/e2e/cdk-util.ts index 096cd2a..d6af9df 100644 --- a/test/e2e/cdk-util.ts +++ b/test/e2e/cdk-util.ts @@ -14,6 +14,7 @@ import { $, cd, path } from "zx"; export interface CdkStackProps { stackName: string; cdkAppPath: string; + context?: Record; } interface StackOutput { @@ -23,14 +24,18 @@ interface StackOutput { } export async function createStack(config: CdkStackProps): Promise> { - const noRollback = new Boolean(process.env["NO_ROLLBACK"]).valueOf(); + const noRollback = process.env["NO_ROLLBACK"] === "true"; + + const contextArgs = Object.entries(config.context ?? {}).flatMap(([key, value]) => ["--context", `${key}=${value}`]); + const extraArgs = noRollback ? ["--no-rollback"] : []; cd(path.resolve(__dirname)); - await $`npx cdk --app 'npx tsx ${config.cdkAppPath}' deploy \ - --context stack_name="${config.stackName}" \ - --output "cdk.${config.stackName}.out" \ + await $`npx cdk --app 'npx tsx ${config.cdkAppPath}' deploy ${config.stackName} \ + --context stack_name=${config.stackName} \ + ${contextArgs} \ + --output cdk.${config.stackName}.out \ --require-approval never \ - ${noRollback ? "--no-rollback" : ""}`.timeout("575s"); + ${extraArgs}`.timeout("575s"); const result = await $`aws cloudformation describe-stacks --stack-name "${config.stackName}" --query 'Stacks[0].Outputs'`; @@ -62,9 +67,65 @@ export async function destroyStack(config: CdkStackProps) { if (retainStack) { console.log(`Retaining stack "${config.stackName}"`); } else { + const contextArgs = Object.entries(config.context ?? {}).flatMap(([key, value]) => [ + "--context", + `${key}=${value}`, + ]); + await $`npx cdk --app 'npx tsx ${config.cdkAppPath}' destroy \ --context stack_name=${config.stackName} \ + ${contextArgs} \ --output "cdk.${config.stackName}.out" \ --force`.timeout("595s"); } } + +interface Deployment { + id: string; + endpoint?: string; + arn?: string; + created_at?: string; +} + +async function runSsmCommand(instanceId: string, command: string): Promise { + const result = + await $`aws ssm send-command --instance-ids ${instanceId} --document-name "AWS-RunShellScript" --parameters commands=${JSON.stringify([command])} --output json`; + + const commandObj = JSON.parse(result.stdout); + const commandId = commandObj.Command.CommandId; + + // Wait for command to complete (with timeout handling) + for (let i = 0; i < 30; i++) { + await new Promise((resolve) => setTimeout(resolve, 1000)); + const statusResult = + await $`aws ssm get-command-invocation --command-id ${commandId} --instance-id ${instanceId} --output json`; + const status = JSON.parse(statusResult.stdout); + if (status.Status === "Success") { + return status.StandardOutputContent; + } + if (status.Status === "Failed") { + throw new Error(`SSM command failed: ${status.StandardErrorContent}`); + } + } + throw new Error("SSM command timed out"); +} + +export async function queryRestateDeployments(instanceId: string): Promise { + // Use SSM to query the admin API via curl from the instance + const output = await runSsmCommand(instanceId, "curl -s http://localhost:9070/deployments"); + const response = JSON.parse(output) as { deployments: Deployment[] }; + return response.deployments; +} + +export async function queryRestateCloudDeployments(adminUrl: string, authToken: string): Promise { + const response = await fetch(`${adminUrl}/deployments`, { + headers: { + Authorization: `Bearer ${authToken}`, + }, + }); + if (!response.ok) { + throw new Error(`Failed to query deployments: ${response.status} ${response.statusText}`); + } + const data = (await response.json()) as { deployments: Deployment[] }; + return data.deployments; +} diff --git a/test/e2e/restate-cloud-pruning-test.e2e.ts b/test/e2e/restate-cloud-pruning-test.e2e.ts new file mode 100644 index 0000000..970ada0 --- /dev/null +++ b/test/e2e/restate-cloud-pruning-test.e2e.ts @@ -0,0 +1,108 @@ +/* + * Copyright (c) 2023-2025 - Restate Software, Inc., Restate GmbH + * + * This file is part of the Restate CDK Construct Library, + * which is released under the MIT license. + * + * You can find a copy of the license in file LICENSE in the root + * directory of this repository or package, or at + * https://github.com/restatedev/cdk/blob/main/LICENSE + */ + +import { randomUUID } from "crypto"; +import { $ } from "zx"; +import { createStack, destroyStackAsync, queryRestateCloudDeployments } from "./cdk-util"; + +describe("Restate Cloud Drained Deployment Pruning E2E Test", () => { + $.verbose = true; + + const cdkAppPath = "stacks/restate-cloud-lambda-stack.ts"; + const stackName = "e2e-RestateCloudPruning"; + let ingressUrl: string; + let adminUrl: string; + let lambdaFunctionName: string; // The specific Lambda function for THIS test run + + beforeAll(async () => { + // Initial deployment with pruning enabled + const outputs = await createStack({ + cdkAppPath, + stackName, + context: { configuration_version: "v1", enable_pruning: "true" }, + }); + ingressUrl = outputs["RestateIngressUrl"]; + adminUrl = outputs["RestateAdminUrl"]; + }, 600_000); + + afterAll(async () => { + destroyStackAsync(stackName); + }); + + it("should prune drained deployments after re-registration", async () => { + const authToken = process.env.RESTATE_API_KEY!; + + // Step 1: Verify initial deployment works + const id = randomUUID(); + const response = await fetch(`${ingressUrl}/Greeter/greet`, { + method: "POST", + headers: { + "Content-Type": "application/json", + Authorization: `Bearer ${authToken}`, + }, + body: JSON.stringify(id), + }); + expect(response.status).toBe(200); + expect(await response.text()).toBe(`"Hello ${id}!"`); + + // Step 2: Find the deployment from THIS run (most recent with our stack name) + const allInitialDeployments = await queryRestateCloudDeployments(adminUrl, authToken); + // Sort by created_at descending to get the most recent first + const sortedDeployments = allInitialDeployments + .filter((d) => d.arn?.includes(stackName)) + .sort((a, b) => (b.created_at ?? "").localeCompare(a.created_at ?? "")); + + console.log("All deployments for this stack:", sortedDeployments); + expect(sortedDeployments.length).toBeGreaterThanOrEqual(1); + + const initialDeployment = sortedDeployments[0]; + const initialDeploymentId = initialDeployment.id; + // Extract the Lambda function name (without version) to track THIS run's deployments + lambdaFunctionName = initialDeployment.arn?.replace(/:\d+$/, "") ?? ""; + console.log(`Initial deployment: ${initialDeploymentId}, Lambda: ${lambdaFunctionName}`); + + // Step 3: Re-deploy with a new configuration version to trigger re-registration + // This creates a new deployment (v2) and drains the old one (v1); pruning should clean up v1 + console.log("Re-deploying with new configuration version..."); + await createStack({ + cdkAppPath, + stackName, + context: { configuration_version: "v2", enable_pruning: "true" }, + }); + + // Step 4: Verify the service still works + const id2 = randomUUID(); + const response2 = await fetch(`${ingressUrl}/Greeter/greet`, { + method: "POST", + headers: { + "Content-Type": "application/json", + Authorization: `Bearer ${authToken}`, + }, + body: JSON.stringify(id2), + }); + expect(response2.status).toBe(200); + expect(await response2.text()).toBe(`"Hello ${id2}!"`); + + // Step 5: Check that the initial deployment was pruned + const allFinalDeployments = await queryRestateCloudDeployments(adminUrl, authToken); + // Filter to only THIS Lambda function's deployments + const thisRunDeployments = allFinalDeployments.filter((d) => d.arn?.startsWith(lambdaFunctionName + ":")); + console.log("Final deployments for this Lambda:", thisRunDeployments); + + // Should have exactly 1 deployment for this Lambda (v2), because v1 was pruned + expect(thisRunDeployments.length).toBe(1); + + // The remaining deployment should NOT be the initial one (it was pruned) + const finalDeploymentId = thisRunDeployments[0].id; + expect(finalDeploymentId).not.toBe(initialDeploymentId); + console.log(`Final deployment: ${finalDeploymentId} (initial ${initialDeploymentId} was pruned)`); + }, 600_000); +}); diff --git a/test/e2e/stacks/restate-cloud-lambda-stack.ts b/test/e2e/stacks/restate-cloud-lambda-stack.ts index 355d0cc..0045ec6 100644 --- a/test/e2e/stacks/restate-cloud-lambda-stack.ts +++ b/test/e2e/stacks/restate-cloud-lambda-stack.ts @@ -19,6 +19,11 @@ import { EnvironmentId, RestateCloudEnvironment, ServiceDeployer } from "../../. // Deploy with: RESTATE_ENV_ID=env_... RESTATE_API_KEY=key_... npx cdk --app 'npx tsx restate-cloud.e2e.ts' deploy const app = new cdk.App(); const stackName = app.node.tryGetContext("stack_name") ?? "e2e-RestateCloud"; + +// Optional context variable to force new Lambda versions on re-deployment +const configurationVersion = app.node.tryGetContext("configuration_version"); +const enablePruning = app.node.tryGetContext("enable_pruning") === "true"; + const stack = new cdk.Stack(app, stackName, { env: { account: process.env.CDK_DEFAULT_ACCOUNT, @@ -34,6 +39,7 @@ const handler: lambda.Function = new lambda.Function(stack, "Service", { runtime: lambda.Runtime.NODEJS_LATEST, code: lambda.Code.fromAsset("../handlers/dist/"), handler: "bundle.handler", + environment: configurationVersion ? { CONFIGURATION_VERSION: configurationVersion } : undefined, }); const environment = new RestateCloudEnvironment(stack, "CloudEnv", { @@ -49,8 +55,13 @@ const deployer = new ServiceDeployer(stack, "ServiceDeployer", { removalPolicy: cdk.RemovalPolicy.DESTROY, }); -deployer.deployService("Greeter", handler.currentVersion, environment); +deployer.register(handler.currentVersion, environment, { + pruneDrainedDeployments: enablePruning, + revisionHistoryLimit: 0, + allowPruningDeploymentsWithCompletedInvocations: true, +}); new cdk.CfnOutput(stack, "RestateIngressUrl", { value: environment.ingressUrl }); +new cdk.CfnOutput(stack, "RestateAdminUrl", { value: environment.adminUrl }); app.synth(); diff --git a/test/restate-constructs.test.ts b/test/restate-constructs.test.ts index 4bdb8ea..fa4cf9d 100644 --- a/test/restate-constructs.test.ts +++ b/test/restate-constructs.test.ts @@ -170,6 +170,35 @@ describe("Restate constructs", () => { }); }); + test("Service Deployer with removal policy and pruning options", () => { + const app = new cdk.App(); + const stack = new cdk.Stack(app, "ServiceDeployerOptions", { + env: { account: "account-id", region: "region" }, + }); + + const invokerRole = new iam.Role(stack, "InvokerRole", { assumedBy: new iam.AccountRootPrincipal() }); + + const restateEnvironment = RestateEnvironment.fromAttributes({ + adminUrl: "https://restate.example.com:9070", + invokerRole, + }); + + const handler = mockHandler(stack); + const serviceDeployer = new ServiceDeployer(stack, "ServiceDeployer", { + code: lambda.Code.fromAsset("dist/register-service-handler"), + }); + serviceDeployer.register(handler.currentVersion, restateEnvironment, { + removalPolicy: cdk.RemovalPolicy.DESTROY, + pruneDrainedDeployments: true, + revisionHistoryLimit: 5, + }); + + expect(stack).toMatchCdkSnapshot({ + ignoreAssets: true, + yaml: true, + }); + }); + test("Create a self-hosted Restate environment deployed on EC2", () => { const app = new cdk.App(); const stack = new cdk.Stack(app, "RestateSelfHostedServerEc2Stack", { From 2e5da6ad7893a215daf77c76995db90c343e59a3 Mon Sep 17 00:00:00 2001 From: Pavel Tcholakov Date: Mon, 2 Feb 2026 16:56:37 +0200 Subject: [PATCH 2/3] Drop allowPruningDeploymentsWithCompletedInvocations and always prune with completed invocations --- .../register-service-handler/index.mts | 20 +++---------------- lib/restate-constructs/service-deployer.ts | 18 ----------------- .../restate-constructs.test.ts.snap | 6 +----- test/e2e/stacks/restate-cloud-lambda-stack.ts | 1 - 4 files changed, 4 insertions(+), 41 deletions(-) diff --git a/lib/restate-constructs/register-service-handler/index.mts b/lib/restate-constructs/register-service-handler/index.mts index 6d5ee05..4d29457 100644 --- a/lib/restate-constructs/register-service-handler/index.mts +++ b/lib/restate-constructs/register-service-handler/index.mts @@ -62,13 +62,6 @@ export interface RegistrationProperties { /** Maximum number of drained deployments to prune per run. */ maxPrunedPerRun?: number; - - /** - * Whether to prune deployments that have only completed invocations pinned. Default is false (conservative). - * When false, deployments with ANY pinned invocations (including completed) will not be pruned. - * When true, only deployments with active (non-completed) invocations will be kept. - */ - allowPruningDeploymentsWithCompletedInvocations?: "true" | "false"; } type RegisterDeploymentResponse = { @@ -308,7 +301,6 @@ export const handler = async function (event: CloudFormationCustomResourceEvent) props.serviceLambdaArn, props.revisionHistoryLimit ?? 0, props.maxPrunedPerRun ?? 10, - props.allowPruningDeploymentsWithCompletedInvocations === "true", authHeader, rejectUnauthorized, ); @@ -447,7 +439,6 @@ async function pruneDrainedDeployments( _endpointArn: string, revisionHistoryLimit: number, maxPrunedPerRun: number, - allowPruningDeploymentsWithCompletedInvocations: boolean, authHeader: Record, rejectUnauthorized: boolean, ) { @@ -456,19 +447,14 @@ async function pruneDrainedDeployments( console.log(`Pruning drained deployments (keeping ${safeOffset} revisions, max ${safeLimit} per run)`); - // Find drained deployments: no associated services and no pinned invocations - // By default (conservative), exclude deployments with ANY pinned invocations - // If allowPruningDeploymentsWithCompletedInvocations is true, only exclude deployments with active invocations + // Find drained deployments: no associated services and no active pinned invocations + // Deployments with only completed invocations can be pruned // Prune oldest first, skip the N most recent drained ones - const invocationStatusFilter = allowPruningDeploymentsWithCompletedInvocations - ? "AND i.status != 'completed'" // Only consider active invocations - : ""; // Consider all invocations (conservative) - const sql = ` SELECT d.id, d.created_at FROM sys_deployment d LEFT JOIN sys_service s ON (d.id = s.deployment_id) - LEFT JOIN sys_invocation_status i ON (d.id = i.pinned_deployment_id ${invocationStatusFilter}) + LEFT JOIN sys_invocation_status i ON (d.id = i.pinned_deployment_id AND i.status != 'completed') WHERE s.name IS NULL AND i.id IS NULL ORDER BY d.created_at DESC diff --git a/lib/restate-constructs/service-deployer.ts b/lib/restate-constructs/service-deployer.ts index 2998a7c..3d0d29a 100644 --- a/lib/restate-constructs/service-deployer.ts +++ b/lib/restate-constructs/service-deployer.ts @@ -105,21 +105,6 @@ export interface ServiceRegistrationProps { * Default: 10 */ maxPrunedPerRun?: number; - - /** - * Whether to prune deployments that have only completed invocations pinned. Only applies if - * `pruneDrainedDeployments` is enabled. - * - * By default (false), deployments with ANY pinned invocations (including completed ones) will not be pruned. - * This is the conservative behavior that ensures no deployment is removed while it might still be referenced. - * - * When true, only deployments with active (non-completed) invocations will be kept. Deployments with only - * completed invocations will be pruned. Use this for more aggressive cleanup when you're confident that - * completed invocations don't need their original deployment. - * - * Default: false - */ - allowPruningDeploymentsWithCompletedInvocations?: boolean; } /** @@ -271,9 +256,6 @@ export class ServiceDeployer extends Construct { pruneDrainedDeployments: (options?.pruneDrainedDeployments ?? false).toString() as "true" | "false", revisionHistoryLimit: options?.revisionHistoryLimit ?? 0, maxPrunedPerRun: options?.maxPrunedPerRun ?? 10, - allowPruningDeploymentsWithCompletedInvocations: ( - options?.allowPruningDeploymentsWithCompletedInvocations ?? false - ).toString() as "true" | "false", } satisfies RegistrationProperties, }); diff --git a/test/__snapshots__/restate-constructs.test.ts.snap b/test/__snapshots__/restate-constructs.test.ts.snap index 99288e0..4821b80 100644 --- a/test/__snapshots__/restate-constructs.test.ts.snap +++ b/test/__snapshots__/restate-constructs.test.ts.snap @@ -1322,7 +1322,6 @@ exports[`Restate constructs Deploy a Lambda service handler to Restate Cloud env pruneDrainedDeployments: 'false' revisionHistoryLimit: 0 maxPrunedPerRun: 10 - allowPruningDeploymentsWithCompletedInvocations: 'false' DependsOn: - ServiceDeployerInvocationPolicyD09B639D UpdateReplacePolicy: Delete @@ -1519,7 +1518,6 @@ exports[`Restate constructs Deploy a Lambda service handler to existing Restate pruneDrainedDeployments: 'false' revisionHistoryLimit: 0 maxPrunedPerRun: 10 - allowPruningDeploymentsWithCompletedInvocations: 'false' DependsOn: - ServiceDeployerInvocationPolicyD09B639D UpdateReplacePolicy: Delete @@ -1730,7 +1728,6 @@ exports[`Restate constructs Restate Cloud Environment construct with role refere pruneDrainedDeployments: 'false' revisionHistoryLimit: 0 maxPrunedPerRun: 10 - allowPruningDeploymentsWithCompletedInvocations: 'false' DependsOn: - ServiceDeployerInvocationPolicyD09B639D UpdateReplacePolicy: Delete @@ -1848,7 +1845,7 @@ exports[`Restate constructs Service Deployer overrides 1`] = ` - arm64 Code: S3Bucket: cdk-hnb659fds-assets-account-id-region - S3Key: d15e4f9469705b39c56cb906737a89b97e5b6127c274170793154ed613d8b520.zip + S3Key: 8460d2a67fcb3e96a1be7095d039665711526f7894d3c29cb283fae379128d05.zip Description: Restate custom registration handler Handler: entrypoint.handler MemorySize: 128 @@ -1947,7 +1944,6 @@ exports[`Restate constructs Service Deployer with removal policy and pruning opt pruneDrainedDeployments: 'true' revisionHistoryLimit: 5 maxPrunedPerRun: 10 - allowPruningDeploymentsWithCompletedInvocations: 'false' DependsOn: - ServiceDeployerInvocationPolicyD09B639D UpdateReplacePolicy: Delete diff --git a/test/e2e/stacks/restate-cloud-lambda-stack.ts b/test/e2e/stacks/restate-cloud-lambda-stack.ts index 0045ec6..5abf54d 100644 --- a/test/e2e/stacks/restate-cloud-lambda-stack.ts +++ b/test/e2e/stacks/restate-cloud-lambda-stack.ts @@ -58,7 +58,6 @@ const deployer = new ServiceDeployer(stack, "ServiceDeployer", { deployer.register(handler.currentVersion, environment, { pruneDrainedDeployments: enablePruning, revisionHistoryLimit: 0, - allowPruningDeploymentsWithCompletedInvocations: true, }); new cdk.CfnOutput(stack, "RestateIngressUrl", { value: environment.ingressUrl }); From 0eab829640713c0c6cafe8b6dfcfe479875ed194 Mon Sep 17 00:00:00 2001 From: Pavel Tcholakov Date: Mon, 2 Feb 2026 17:11:08 +0200 Subject: [PATCH 3/3] Exclude currently registered deployment from pruning --- .../register-service-handler/index.mts | 133 ++++++++++++++---- .../restate-constructs.test.ts.snap | 2 +- 2 files changed, 103 insertions(+), 32 deletions(-) diff --git a/lib/restate-constructs/register-service-handler/index.mts b/lib/restate-constructs/register-service-handler/index.mts index 4d29457..b912d81 100644 --- a/lib/restate-constructs/register-service-handler/index.mts +++ b/lib/restate-constructs/register-service-handler/index.mts @@ -293,12 +293,14 @@ export const handler = async function (event: CloudFormationCustomResourceEvent) if (props.pruneDrainedDeployments === "true") { try { - if (!props.serviceLambdaArn) { - console.warn("Pruning requested but no serviceLambdaArn provided; skipping."); + const serviceNames = response.services.map((s) => s.name); + if (serviceNames.length === 0) { + console.warn("Pruning requested but no services in deployment; skipping."); } else { await pruneDrainedDeployments( props.adminUrl!, - props.serviceLambdaArn, + serviceNames, + response.id, props.revisionHistoryLimit ?? 0, props.maxPrunedPerRun ?? 10, authHeader, @@ -434,51 +436,120 @@ async function findDeploymentsByEndpoint( return rows.map((row) => row.id as string); } +type ListDeploymentsResponse = { + deployments: { + id: string; + created_at: string; + services: { name: string; revision: number }[]; + }[]; +}; + +async function listDeployments( + adminUrl: string, + authHeader: Record, + rejectUnauthorized: boolean, +): Promise { + const listUrl = new URL(`${adminUrl}/${DEPLOYMENTS_PATH}`); + + const response = await httpRequest(listUrl, { + method: "GET", + headers: authHeader, + timeout: 30_000, + rejectUnauthorized, + }); + + if (response.statusCode !== 200) { + throw new Error(`List deployments failed (${response.statusCode}): ${response.body}`); + } + + return (JSON.parse(response.body) as ListDeploymentsResponse).deployments; +} + +async function hasActiveInvocations( + adminUrl: string, + deploymentId: string, + authHeader: Record, + rejectUnauthorized: boolean, +): Promise { + const sql = ` + SELECT COUNT(*) as cnt FROM sys_invocation_status + WHERE pinned_deployment_id = '${escapeSqlString(deploymentId)}' + AND status != 'completed' + `; + const rows = await queryRestate(adminUrl, sql, authHeader, rejectUnauthorized); + const count = (rows[0]?.cnt as number) ?? 0; + return count > 0; +} + async function pruneDrainedDeployments( adminUrl: string, - _endpointArn: string, + serviceNames: string[], + currentDeploymentId: string, revisionHistoryLimit: number, maxPrunedPerRun: number, authHeader: Record, rejectUnauthorized: boolean, ) { - const safeOffset = Math.max(0, revisionHistoryLimit); - const safeLimit = Math.max(1, maxPrunedPerRun); + const safeRevisionLimit = Math.max(0, revisionHistoryLimit); + const safeMaxPruned = Math.max(1, maxPrunedPerRun); + const serviceNameSet = new Set(serviceNames); - console.log(`Pruning drained deployments (keeping ${safeOffset} revisions, max ${safeLimit} per run)`); + console.log( + `Pruning drained deployments for services [${serviceNames.join(", ")}] ` + + `(keeping ${safeRevisionLimit} revisions, max ${safeMaxPruned} per run, excluding current ${currentDeploymentId})`, + ); - // Find drained deployments: no associated services and no active pinned invocations - // Deployments with only completed invocations can be pruned - // Prune oldest first, skip the N most recent drained ones - const sql = ` - SELECT d.id, d.created_at - FROM sys_deployment d - LEFT JOIN sys_service s ON (d.id = s.deployment_id) - LEFT JOIN sys_invocation_status i ON (d.id = i.pinned_deployment_id AND i.status != 'completed') - WHERE s.name IS NULL - AND i.id IS NULL - ORDER BY d.created_at DESC - OFFSET ${safeOffset} - LIMIT ${safeLimit} - `; + const allDeployments = await listDeployments(adminUrl, authHeader, rejectUnauthorized); + const relatedDeployments = allDeployments + .filter((d) => d.id !== currentDeploymentId) + .filter((d) => d.services.some((s) => serviceNameSet.has(s.name))) + .sort((a, b) => a.created_at.localeCompare(b.created_at)); // oldest first - const drainedDeployments = await queryRestate(adminUrl, sql, authHeader, rejectUnauthorized); + console.log(`Found ${relatedDeployments.length} related deployment(s) for these services.`); - if (drainedDeployments.length === 0) { - console.log("No drained deployments to prune."); + if (relatedDeployments.length <= safeRevisionLimit) { + console.log(`Not exceeding revision history limit (${safeRevisionLimit}); nothing to prune.`); return; } - console.log(`Found ${drainedDeployments.length} drained deployment(s) to prune.`); + const candidatesForPruning = relatedDeployments.slice(0, relatedDeployments.length - safeRevisionLimit); + console.log(`${candidatesForPruning.length} deployment(s) are candidates for pruning (beyond revision limit).`); + + let prunedCount = 0; + for (const deployment of candidatesForPruning) { + if (prunedCount >= safeMaxPruned) { + console.log(`Reached max pruned per run (${safeMaxPruned}); stopping.`); + break; + } + + const hasActive = await hasActiveInvocations(adminUrl, deployment.id, authHeader, rejectUnauthorized); + if (hasActive) { + console.log(`Deployment ${deployment.id} has active invocations; skipping.`); + continue; + } - for (const deployment of drainedDeployments) { - const deploymentId = deployment.id as string; try { - console.log(`Deleting drained deployment ${deploymentId}...`); - await deleteDeployment(adminUrl, deploymentId, authHeader, rejectUnauthorized); - console.log(`Deleted drained deployment ${deploymentId}.`); + const extraServices = deployment.services + .map((service) => service.name) + .filter((serviceName) => !serviceNameSet.has(serviceName)); + if (extraServices.length > 0) { + console.log( + `Deployment ${deployment.id} also registered services no longer in this handler: [${[...new Set(extraServices)].join(", ")}]`, + ); + } + + console.log(`Deleting drained deployment ${deployment.id}...`); + await deleteDeployment(adminUrl, deployment.id, authHeader, rejectUnauthorized); + console.log(`Deleted drained deployment ${deployment.id}.`); + prunedCount++; } catch (e) { - console.warn(`Failed to delete drained deployment ${deploymentId}: ${(e as Error)?.message}`); + console.warn(`Failed to delete drained deployment ${deployment.id}: ${(e as Error)?.message}`); } } + + if (prunedCount === 0) { + console.log("No drained deployments were pruned."); + } else { + console.log(`Pruned ${prunedCount} drained deployment(s).`); + } } diff --git a/test/__snapshots__/restate-constructs.test.ts.snap b/test/__snapshots__/restate-constructs.test.ts.snap index 4821b80..6aea441 100644 --- a/test/__snapshots__/restate-constructs.test.ts.snap +++ b/test/__snapshots__/restate-constructs.test.ts.snap @@ -1845,7 +1845,7 @@ exports[`Restate constructs Service Deployer overrides 1`] = ` - arm64 Code: S3Bucket: cdk-hnb659fds-assets-account-id-region - S3Key: 8460d2a67fcb3e96a1be7095d039665711526f7894d3c29cb283fae379128d05.zip + S3Key: 3e8c1652eeb2e525a478419d4009a3ed4e021bf98fc70eac7f4294d66f5910bc.zip Description: Restate custom registration handler Handler: entrypoint.handler MemorySize: 128