Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
270 changes: 264 additions & 6 deletions lib/restate-constructs/register-service-handler/index.mts
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,17 @@ export interface RegistrationProperties {
/** Whether to trust any certificate when connecting to the admin endpoint. */
insecure?: "true" | "false";

// removalPolicy?: string;
/** What to do when the handler is removed: "retain" (default) or "destroy". */
removalPolicy?: "retain" | "destroy";

/** Whether to prune drained deployments for the same handler after registration. */
pruneDrainedDeployments?: "true" | "false";

/** Number of old drained deployment revisions to retain when pruning. */
revisionHistoryLimit?: number;

/** Maximum number of drained deployments to prune per run. */
maxPrunedPerRun?: number;
}

type RegisterDeploymentResponse = {
Expand Down Expand Up @@ -124,11 +134,47 @@ export const handler = async function (event: CloudFormationCustomResourceEvent)
const rejectUnauthorized = props.insecure !== "true";

if (event.RequestType === "Delete") {
// Since we retain older Lambda handler versions on update, we also leave the registered service alone. There may
// be unfinished invocations that require it; in the future we would want to inform Restate that we want to
// de-register the service, and wait for Restate to let us know that it is safe to delete the deployed Function
// version from Lambda.
console.warn("De-registering services is not supported currently. Previous version will remain registered.");
if (props.removalPolicy !== "destroy") {
console.log("Removal policy is 'retain'; leaving deployment registered in Restate.");
return;
}

let authHeader: Record<string, string> = {};
try {
authHeader = await createAuthHeader(props);
} catch (e) {
console.warn(`Failed to load auth token for deletion: ${(e as Error)?.message}`);
console.warn("Proceeding with deletion without auth header.");
}

console.log(`Removal policy is 'destroy'; finding deployment for ${props.serviceLambdaArn}`);

// Best-effort deletion: log errors but don't fail CloudFormation delete
try {
const deploymentIds = await findDeploymentsByEndpoint(
props.adminUrl!,
props.serviceLambdaArn!,
authHeader,
rejectUnauthorized,
);

if (deploymentIds.length === 0) {
console.log("No deployments found for this endpoint; nothing to delete.");
return;
}

for (const deploymentId of deploymentIds) {
try {
console.log(`Deleting deployment ${deploymentId}...`);
await deleteDeployment(props.adminUrl!, deploymentId, authHeader, rejectUnauthorized);
console.log(`Deleted deployment ${deploymentId}.`);
} catch (e) {
console.warn(`Failed to delete deployment ${deploymentId}: ${(e as Error)?.message}`);
}
}
} catch (e) {
console.warn(`Failed to query/delete deployments: ${(e as Error)?.message}`);
}
return;
}

Expand Down Expand Up @@ -245,6 +291,27 @@ export const handler = async function (event: CloudFormationCustomResourceEvent)
console.log(`Successfully marked service as ${isPublic ? "public" : "private"}.`);
}

if (props.pruneDrainedDeployments === "true") {
try {
const serviceNames = response.services.map((s) => s.name);
if (serviceNames.length === 0) {
console.warn("Pruning requested but no services in deployment; skipping.");
} else {
await pruneDrainedDeployments(
props.adminUrl!,
serviceNames,
response.id,
props.revisionHistoryLimit ?? 0,
props.maxPrunedPerRun ?? 10,
authHeader,
rejectUnauthorized,
);
}
} catch (e) {
console.warn(`Failed to prune drained deployments: ${(e as Error)?.message}`);
}
}

return; // Overall success!
} else {
failureReason = `Registration failed (${registerDeploymentResponse.statusCode}): ${registerDeploymentResponse.body}`;
Expand Down Expand Up @@ -295,3 +362,194 @@ async function createAuthHeader(props: RegistrationProperties): Promise<Record<s
async function sleep(millis: number) {
return new Promise((resolve) => setTimeout(resolve, millis));
}

async function deleteDeployment(
adminUrl: string,
deploymentId: string,
authHeader: Record<string, string>,
rejectUnauthorized: boolean,
) {
const deleteUrl = new URL(`${adminUrl}/${DEPLOYMENTS_PATH}/${deploymentId}?force=true`);

const deleteResponse = await httpRequest(deleteUrl, {
method: "DELETE",
headers: authHeader,
timeout: 10_000,
rejectUnauthorized,
});

const isSuccess =
(deleteResponse.statusCode >= 200 && deleteResponse.statusCode < 300) || deleteResponse.statusCode === 404;
if (!isSuccess) {
throw new Error(`Delete deployment failed (${deleteResponse.statusCode}): ${deleteResponse.body}`);
}
}

type QueryResponse = {
rows: Record<string, unknown>[];
};

async function queryRestate(
adminUrl: string,
sql: string,
authHeader: Record<string, string>,
rejectUnauthorized: boolean,
): Promise<Record<string, unknown>[]> {
const queryUrl = new URL(`${adminUrl}/query`);

const queryResponse = await httpRequest(queryUrl, {
method: "POST",
headers: {
"Content-Type": "application/json",
Accept: "application/json", // Request JSON format (default is Apache Arrow IPC binary)
...authHeader,
},
body: JSON.stringify({ query: sql }),
timeout: 30_000,
rejectUnauthorized,
});

if (queryResponse.statusCode !== 200) {
throw new Error(`Query failed (${queryResponse.statusCode}): ${queryResponse.body}`);
}

const response = JSON.parse(queryResponse.body) as QueryResponse;
return response.rows;
}

function escapeSqlString(value: string): string {
return value.replace(/'/g, "''");
}

async function findDeploymentsByEndpoint(
adminUrl: string,
endpointArn: string,
authHeader: Record<string, string>,
rejectUnauthorized: boolean,
): Promise<string[]> {
const sql = `
SELECT id FROM sys_deployment
WHERE endpoint = '${escapeSqlString(endpointArn)}'
`;

const rows = await queryRestate(adminUrl, sql, authHeader, rejectUnauthorized);
return rows.map((row) => row.id as string);
}

type ListDeploymentsResponse = {
deployments: {
id: string;
created_at: string;
services: { name: string; revision: number }[];
}[];
};

async function listDeployments(
adminUrl: string,
authHeader: Record<string, string>,
rejectUnauthorized: boolean,
): Promise<ListDeploymentsResponse["deployments"]> {
const listUrl = new URL(`${adminUrl}/${DEPLOYMENTS_PATH}`);
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ended up resorting to using the API, which gives us extra service name information that we can map back to our current deployment. Unfortunately the sys_deployment schema doesn't list services, which makes it impossible to query only the relevant fully drained deployments for the service(s) which the service deployer is managing. I've opened restatedev/restate#4316 which would make this much more straightforward.


const response = await httpRequest(listUrl, {
method: "GET",
headers: authHeader,
timeout: 30_000,
rejectUnauthorized,
});

if (response.statusCode !== 200) {
throw new Error(`List deployments failed (${response.statusCode}): ${response.body}`);
}

return (JSON.parse(response.body) as ListDeploymentsResponse).deployments;
}

async function hasActiveInvocations(
adminUrl: string,
deploymentId: string,
authHeader: Record<string, string>,
rejectUnauthorized: boolean,
): Promise<boolean> {
const sql = `
SELECT COUNT(*) as cnt FROM sys_invocation_status
WHERE pinned_deployment_id = '${escapeSqlString(deploymentId)}'
AND status != 'completed'
`;
const rows = await queryRestate(adminUrl, sql, authHeader, rejectUnauthorized);
const count = (rows[0]?.cnt as number) ?? 0;
return count > 0;
}

async function pruneDrainedDeployments(
adminUrl: string,
serviceNames: string[],
currentDeploymentId: string,
revisionHistoryLimit: number,
maxPrunedPerRun: number,
authHeader: Record<string, string>,
rejectUnauthorized: boolean,
) {
const safeRevisionLimit = Math.max(0, revisionHistoryLimit);
const safeMaxPruned = Math.max(1, maxPrunedPerRun);
const serviceNameSet = new Set(serviceNames);

console.log(
`Pruning drained deployments for services [${serviceNames.join(", ")}] ` +
`(keeping ${safeRevisionLimit} revisions, max ${safeMaxPruned} per run, excluding current ${currentDeploymentId})`,
);

const allDeployments = await listDeployments(adminUrl, authHeader, rejectUnauthorized);
const relatedDeployments = allDeployments
.filter((d) => d.id !== currentDeploymentId)
.filter((d) => d.services.some((s) => serviceNameSet.has(s.name)))
.sort((a, b) => a.created_at.localeCompare(b.created_at)); // oldest first

console.log(`Found ${relatedDeployments.length} related deployment(s) for these services.`);

if (relatedDeployments.length <= safeRevisionLimit) {
console.log(`Not exceeding revision history limit (${safeRevisionLimit}); nothing to prune.`);
return;
}

const candidatesForPruning = relatedDeployments.slice(0, relatedDeployments.length - safeRevisionLimit);
console.log(`${candidatesForPruning.length} deployment(s) are candidates for pruning (beyond revision limit).`);

let prunedCount = 0;
for (const deployment of candidatesForPruning) {
if (prunedCount >= safeMaxPruned) {
console.log(`Reached max pruned per run (${safeMaxPruned}); stopping.`);
break;
}

const hasActive = await hasActiveInvocations(adminUrl, deployment.id, authHeader, rejectUnauthorized);
if (hasActive) {
console.log(`Deployment ${deployment.id} has active invocations; skipping.`);
continue;
}

try {
const extraServices = deployment.services
.map((service) => service.name)
.filter((serviceName) => !serviceNameSet.has(serviceName));
if (extraServices.length > 0) {
console.log(
`Deployment ${deployment.id} also registered services no longer in this handler: [${[...new Set(extraServices)].join(", ")}]`,
);
}

console.log(`Deleting drained deployment ${deployment.id}...`);
await deleteDeployment(adminUrl, deployment.id, authHeader, rejectUnauthorized);
console.log(`Deleted drained deployment ${deployment.id}.`);
prunedCount++;
} catch (e) {
console.warn(`Failed to delete drained deployment ${deployment.id}: ${(e as Error)?.message}`);
}
}

if (prunedCount === 0) {
console.log("No drained deployments were pruned.");
} else {
console.log(`Pruned ${prunedCount} drained deployment(s).`);
}
}
45 changes: 41 additions & 4 deletions lib/restate-constructs/service-deployer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,41 @@ export interface ServiceRegistrationProps {
* Restate service is behind a load balancer.
*/
adminUrl?: string;

/**
* What to do when the handler is removed from the stack.
* - RETAIN: Leave deployment registered in Restate (default). Use this if you want to transition to managing
* deployments manually, or if you want to remove the ServiceDeployer without affecting existing registrations.
* - DESTROY: Force-remove the deployment from Restate. Use this if you are decommissioning the service.
*
* Default: RETAIN
*/
removalPolicy?: cdk.RemovalPolicy;

/**
* Prune fully drained deployments of the same handler after each successful registration. Only removes deployments
* that have no associated services and no pinned invocations. This helps clean up old deployment versions that
* accumulate over time as new versions are registered.
*
* Default: false
*/
pruneDrainedDeployments?: boolean;

/**
* Number of old drained deployment revisions to retain. Only applies if `pruneDrainedDeployments` is enabled.
* Drained deployments beyond this limit will be removed, oldest first.
*
* Default: 0
*/
revisionHistoryLimit?: number;

/**
* Maximum number of drained deployments to prune per registration. Limits the cleanup work done in each
* deployment to avoid long-running operations. Only applies if `pruneDrainedDeployments` is enabled.
*
* Default: 10
*/
maxPrunedPerRun?: number;
}

/**
Expand Down Expand Up @@ -212,13 +247,15 @@ export class ServiceDeployer extends Construct {
authTokenSecretArn: authToken?.secretArn,
serviceLambdaArn: handler.functionArn,
invokeRoleArn: invokerRole?.roleArn,
// removalPolicy: "retain",
removalPolicy: options?.removalPolicy === cdk.RemovalPolicy.DESTROY ? "destroy" : ("retain" as const),
private: (options?.private ?? false).toString() as "true" | "false",
configurationVersion:
options?.configurationVersion || handler.functionArn.endsWith(":$LATEST")
? new Date().toISOString()
: undefined,
options?.configurationVersion ??
(handler.functionArn.endsWith(":$LATEST") ? new Date().toISOString() : undefined),
insecure: (options?.insecure ?? false).toString() as "true" | "false",
pruneDrainedDeployments: (options?.pruneDrainedDeployments ?? false).toString() as "true" | "false",
revisionHistoryLimit: options?.revisionHistoryLimit ?? 0,
maxPrunedPerRun: options?.maxPrunedPerRun ?? 10,
} satisfies RegistrationProperties,
});

Expand Down
Loading