Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions todo/bin/todo.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import * as cdk from 'aws-cdk-lib';
import { TodoStack } from '../lib/todo-stack';
import { ProductsStack } from '../lib/task4-products/products-stack';
import { ImportServiceStack } from '../lib/task5/import-service-stack';
import { AuthorizerStack } from '../lib/task7/authorizer-stack';

const app = new cdk.App();
new TodoStack(app, 'TodoStack', {
Expand All @@ -20,3 +21,4 @@ new TodoStack(app, 'TodoStack', {

new ProductsStack(app, 'ProductsStack', {});
new ImportServiceStack(app, 'ImportServiceStack', {});
new AuthorizerStack(app, 'AuthorizerStack', {});
54 changes: 54 additions & 0 deletions todo/lib/task4-products/products-handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,10 @@ import {
GetItemCommand,
} from '@aws-sdk/client-dynamodb';
import { v4 as uuidv4 } from 'uuid';
import { SQSEvent, SQSRecord } from 'aws-lambda';
import { SNSClient, PublishCommand } from '@aws-sdk/client-sns';
const dynamoDB = new DynamoDBClient({ region: process.env.AWS_REGION });
const sns = new SNSClient({});

const products = [
{
Expand Down Expand Up @@ -236,3 +239,54 @@ export const createProduct = async (event: { body?: string }) => {
};
}
};

// Lambda to process SQS messages and add products to DynamoDB
export const catalogBatchProcess = async (event: SQSEvent) => {
try {
for (const record of event.Records) {
console.log('🔹 Raw record body:', record.body);
}

const putPromises = event.Records.map(async (record: SQSRecord) => {
let body;

try {
body = JSON.parse(record.body);
} catch (err) {
console.error('❌ JSON parse failed for:', record.body);
throw err;
}

console.log('✅ Parsed message:', body);

const params = {
TableName: process.env.PRODUCTS_TABLE,
Item: {
id: { S: body.id },
title: { S: body.title },
description: { S: body.description },
price: { N: String(body.price) },
},
};

await dynamoDB.send(new PutItemCommand(params));
console.log(`Product ${body.title} added`);
// After all products are added, send SNS notifications
await sns.send(
new PublishCommand({
TopicArn: process.env.CREATE_PRODUCT_TOPIC_ARN!,
Subject: 'New Product Created',
Message: JSON.stringify(body, null, 2),
}),
);

console.log(`📨 Notification sent for product: ${body.title}`);
});

await Promise.all(putPromises);
console.log('Batch processed successfully');
} catch (err) {
console.error('Error processing batch:', err);
throw err;
}
};
44 changes: 44 additions & 0 deletions todo/lib/task4-products/products-stack.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@ import * as lambda from 'aws-cdk-lib/aws-lambda';
import { join } from 'path';
import * as lambdaNodejs from 'aws-cdk-lib/aws-lambda-nodejs';
import * as apigateway from 'aws-cdk-lib/aws-apigateway';
import * as sqs from 'aws-cdk-lib/aws-sqs';
import { SqsEventSource } from 'aws-cdk-lib/aws-lambda-event-sources';
import * as sns from 'aws-cdk-lib/aws-sns';
import * as subs from 'aws-cdk-lib/aws-sns-subscriptions';
export class ProductsStack extends cdk.Stack {
constructor(scope: Construct, id: string, props?: cdk.StackProps) {
super(scope, id, props);
Expand Down Expand Up @@ -83,5 +87,45 @@ export class ProductsStack extends cdk.Stack {
'GET',
new apigateway.LambdaIntegration(getProductByIdLambda),
);

// SQS Queue for catalog items
const catalogItemsQueue = new sqs.Queue(this, 'catalogItemsQueue');

// SNS Topic for product creation notifications
const createProductTopic = new sns.Topic(this, 'createProductTopic', {
displayName: 'Product Creation Notifications',
});

createProductTopic.addSubscription(
new subs.EmailSubscription('prasannakumar2899@gmail.com'),
);

// Lambda to process SQS messages
const catalogBatchProcess = new lambdaNodejs.NodejsFunction(
this,
'catalogBatchProcess',
{
entry: join(__dirname, '../task4-products/products-handler.ts'),
handler: 'catalogBatchProcess',
runtime: lambda.Runtime.NODEJS_20_X,
environment: {
PRODUCTS_TABLE: 'products',
CREATE_PRODUCT_TOPIC_ARN: createProductTopic.topicArn,
},
bundling: {
externalModules: [], // include everything
},
},
);

// Allow the Lambda to publish to SNS topic
createProductTopic.grantPublish(catalogBatchProcess);

// lambda function invoked by SQS events
catalogBatchProcess.addEventSource(
new SqsEventSource(catalogItemsQueue, {
batchSize: 5,
}),
);
}
}
9 changes: 8 additions & 1 deletion todo/lib/task5/import-service-stack.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import { Construct } from 'constructs';
import * as path from 'path';
import * as apigw from 'aws-cdk-lib/aws-apigateway';
import * as lambdaEventSources from 'aws-cdk-lib/aws-lambda-event-sources';

import * as sqs from 'aws-cdk-lib/aws-sqs';
export class ImportServiceStack extends cdk.Stack {
constructor(scope: Construct, id: string, props?: cdk.StackProps) {
super(scope, id, props);
Expand All @@ -25,6 +25,11 @@ export class ImportServiceStack extends cdk.Stack {
],
});

// Create SQS queue for catalog items
const catalogItemsQueue = new sqs.Queue(this, 'CatalogItemsQueue', {
queueName: 'catalog-items-queue',
});

const importProductsFile = new lambdaNodejs.NodejsFunction(
this,
'importProductsFile',
Expand Down Expand Up @@ -59,11 +64,13 @@ export class ImportServiceStack extends cdk.Stack {
runtime: cdk.aws_lambda.Runtime.NODEJS_20_X,
environment: {
BUCKET_NAME: importBucket.bucketName,
SQS_URL: catalogItemsQueue.queueUrl,
},
},
);

importBucket.grantRead(importFileParser);
catalogItemsQueue.grantSendMessages(importFileParser);

// Configure S3 event trigger for 'uploaded/' folder
importFileParser.addEventSource(
Expand Down
35 changes: 26 additions & 9 deletions todo/lib/task5/lambda/import-file-parser.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
import { S3Event } from 'aws-lambda';
import { S3 } from 'aws-sdk';
import { S3Client, GetObjectCommand } from '@aws-sdk/client-s3';
import { SQSClient, SendMessageCommand } from '@aws-sdk/client-sqs';

import csv from 'csv-parser';

const s3 = new S3();
const s3 = new S3Client();
const sqs = new SQSClient({ region: process.env.AWS_REGION });

export const handler = async (event: S3Event) => {
console.log('Received S3 event:', JSON.stringify(event, null, 2));
Expand All @@ -18,27 +21,41 @@ export const handler = async (event: S3Event) => {
);

try {
const s3Stream = s3
.getObject({ Bucket: bucketName, Key: objectKey })
.createReadStream();
const response = await s3.send(
new GetObjectCommand({
Bucket: bucketName,
Key: objectKey,
}),
);
const s3Stream = response.Body as NodeJS.ReadableStream;

await new Promise<void>((resolve, reject) => {
s3Stream
.pipe(csv())
.on('data', (data) => {
console.log('Parsed record:', data); // each row logged to CloudWatch
.on('data', async (data) => {
try {
await sqs.send(
new SendMessageCommand({
QueueUrl: process.env.SQS_URL!,
MessageBody: JSON.stringify(data),
}),
);
console.log('📤 Sent record to SQS:', data);
} catch (err) {
console.error('❌ Failed to send record to SQS:', err);
}
})
.on('end', () => {
console.log('✅ CSV parsing completed.');
resolve();
})
.on('error', (err) => {
console.error(' Error parsing CSV:', err);
console.error(' Error parsing CSV:', err);
reject(err);
});
});
} catch (error) {
console.error(' Failed to process file:', error);
console.error(' Failed to process file:', error);
throw error;
}
}
Expand Down
100 changes: 100 additions & 0 deletions todo/lib/task7/authorizer-stack.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
import * as lambda from 'aws-cdk-lib/aws-lambda';
import * as apigateway from 'aws-cdk-lib/aws-apigateway';
import * as cognito from 'aws-cdk-lib/aws-cognito';
import * as cdk from 'aws-cdk-lib';
import * as path from 'path';
import { Construct } from 'constructs';

export class AuthorizerStack extends cdk.Stack {
constructor(scope: Construct, id: string, props?: cdk.StackProps) {
super(scope, id, props);

const lambdaFunction = new lambda.Function(this, 'lambda-function', {
runtime: lambda.Runtime.NODEJS_20_X,
memorySize: 1024,
timeout: cdk.Duration.seconds(5),
handler: 'handler.main',
code: lambda.Code.fromAsset(path.join(__dirname, './')),
});

// Create a Cognito User Pool
const userPool = new cognito.UserPool(this, 'my-user-pool', {
signInAliases: {
email: true,
},
autoVerify: {
email: true,
},
standardAttributes: {
familyName: {
mutable: true,
required: true,
},
phoneNumber: { required: false },
},
customAttributes: {
createdAt: new cognito.DateTimeAttribute(),
},
passwordPolicy: {
minLength: 8,
requireLowercase: true,
requireUppercase: false,
requireDigits: true,
requireSymbols: false,
},
removalPolicy: cdk.RemovalPolicy.DESTROY,
});

// Create a User Pool Client for the application to interact with the User Pool.
userPool.addClient('my-app-client', {
userPoolClientName: 'my-app-client',
authFlows: {
userPassword: true,
},
});

// Add a domain to the User Pool for hosted UI authentication flows.
userPool.addDomain('Domain', {
cognitoDomain: {
domainPrefix: 'authorization',
},
});

const api = new apigateway.RestApi(this, 'my-api', {
restApiName: 'My API Gateway',
description: 'This API serves the Lambda functions.',
});

// Create a Cognito User Pools Authorizer for API Gateway to use Cognito User Pool for authorization.
const authorizer = new apigateway.CognitoUserPoolsAuthorizer(
this,
'my-authorizer',
{
authorizerName: 'my-authorizer',
cognitoUserPools: [userPool],
},
);

// Integrate Lambda function with API Gateway using the Cognito Authorizer.
const helloFromLambdaIntegration = new apigateway.LambdaIntegration(
lambdaFunction,
{
requestTemplates: {
'application/json': `{ "message": "$input.params('message')" }`,
},
integrationResponses: [
{
statusCode: '200',
},
],
proxy: false,
},
);

const helloResource = api.root.addResource('hello');
helloResource.addMethod('GET', helloFromLambdaIntegration, {
methodResponses: [{ statusCode: '200' }],
authorizer,
});
}
}
7 changes: 7 additions & 0 deletions todo/lib/task7/handler.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
export async function main(event: {
message: string;
}): Promise<{ message: string }> {
return {
message: `SUCCESS with message ${event.message} 🎉`,
};
}
Loading