Skip to content

Commit dd45830

Browse files
authored
Merge pull request #36 from devxtra-community/niranjana
Niranjana
2 parents 538096b + affc91f commit dd45830

15 files changed

Lines changed: 192 additions & 25 deletions

File tree

backend/package.json

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
},
1010
"dependencies": {
1111
"@aws-sdk/client-s3": "^3.962.0",
12+
"amqplib": "^0.10.9",
1213
"bcrypt": "^6.0.0",
1314
"cors": "^2.8.5",
1415
"dotenv": "^17.2.3",
@@ -28,6 +29,7 @@
2829
"@commitlint/cli": "^20.2.0",
2930
"@commitlint/config-conventional": "^20.2.0",
3031
"@eslint/js": "^9.39.2",
32+
"@types/amqplib": "^0.10.8",
3133
"@types/bcrypt": "^6.0.0",
3234
"@types/cors": "^2.8.19",
3335
"@types/express": "^5.0.6",

backend/src/Services/authToken.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { appDataSouce } from '../data-source';
1+
import { appDataSource } from '../data-source';
22
import { RefreshTokenEntity } from '../entities/refreshToken';
33
import { User } from '../entities/User';
44
import { generateRefreshToken, hashRefreshToken } from './refreshToken';
@@ -15,7 +15,7 @@ export const createRefreshTokenSession = async (user: User) => {
1515

1616
expiresAt.setDate(expiresAt.getDate() + REFRESH_TOKEN_DAYS);
1717

18-
const repo = appDataSouce.getRepository(RefreshTokenEntity);
18+
const repo = appDataSource.getRepository(RefreshTokenEntity);
1919
console.log('Got refresh token repository');
2020

2121
await repo.save({

backend/src/data-source.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,12 @@
11
import 'dotenv/config';
22
import { DataSource } from 'typeorm';
33
import { User } from './entities/User';
4-
import { Otp } from './entities/opt';
4+
import { Otp } from './entities/otp';
55
import { RefreshTokenEntity } from './entities/refreshToken';
66
if (!process.env.DATABASE_URL) {
77
throw new Error('DATABASE_URL is not defined');
88
}
9-
export const appDataSouce = new DataSource({
9+
export const appDataSource = new DataSource({
1010
type: 'postgres',
1111
url: process.env.DATABASE_URL,
1212
ssl: {

backend/src/messaging/jobTypes.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
export interface SendOtpJob {
2+
phone: string;
3+
otp: string;
4+
purpose: 'login' | 'register' | 'reset_password';
5+
}
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
import * as amqp from "amqplib";
2+
import { Channel } from "amqplib";
3+
import { QUEUES } from "./queues";
4+
5+
const RABBITMQ_URL =
6+
process.env.RABBITMQ_URL || "amqp://guest:guest@localhost:5672";
7+
8+
let channel: Channel | null = null;
9+
10+
export const connectRabbitMQ = async (): Promise<void> => {
11+
if (channel) return;
12+
13+
try {
14+
console.log("🔌 Connecting to RabbitMQ...");
15+
16+
const connection = await amqp.connect(RABBITMQ_URL);
17+
channel = await connection.createChannel();
18+
19+
// declare queues ONCE
20+
for (const queue of Object.values(QUEUES)) {
21+
await channel.assertQueue(queue, { durable: true });
22+
}
23+
24+
console.log("RabbitMQ connected");
25+
} catch (err) {
26+
console.error("RabbitMQ connection failed", err);
27+
process.exit(1);
28+
}
29+
};
30+
31+
export const getChannel = (): Channel => {
32+
if (!channel) {
33+
throw new Error("RabbitMQ channel not initialized");
34+
}
35+
return channel;
36+
};
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
import { connectRabbitMQ, getChannel } from "../connect";
2+
import { QUEUES } from "../queues";
3+
import { SendOtpJob } from "../../jobTypes";
4+
5+
const startOtpWorker = async () => {
6+
await connectRabbitMQ();
7+
const channel = getChannel();
8+
9+
channel.prefetch(1); // process one OTP at a time
10+
11+
console.log("OTP Worker running");
12+
13+
channel.consume(QUEUES.SEND_OTP, async (msg) => {
14+
if (!msg) return;
15+
16+
try {
17+
const job: SendOtpJob = JSON.parse(msg.content.toString());
18+
19+
console.log(`Sending OTP ${job.otp} to ${job.phone}`);
20+
21+
// Here is where Twilio / SMS provider goes
22+
await new Promise((r) => setTimeout(r, 1500));
23+
24+
channel.ack(msg);
25+
console.log("OTP sent");
26+
} catch (err) {
27+
console.error("OTP failed", err);
28+
channel.nack(msg, false, false);
29+
}
30+
});
31+
};
32+
33+
startOtpWorker();
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
import { getChannel } from "./connect";
2+
import { QUEUES, QueueKey } from "./queues";
3+
4+
export const publish = async <T>(
5+
queue: QueueKey,
6+
payload: T
7+
): Promise<void> => {
8+
const channel = getChannel();
9+
10+
channel.sendToQueue(
11+
QUEUES[queue],
12+
Buffer.from(JSON.stringify(payload)),
13+
{ persistent: true }
14+
);
15+
16+
console.log("📤 OTP job queued");
17+
};
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
export const QUEUES = {
2+
SEND_OTP: "send_otp_queue",
3+
} as const;
4+
5+
export type QueueKey = keyof typeof QUEUES;

backend/src/modules/auth/auth.controller.ts

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,13 @@ import { logger } from '../../utils/logger';
33
import { registerSchema, phoneSchema, loginSchema } from './auth.schema';
44
import { sendOtpSms } from '../../Services/sms.service';
55
import { generateotp } from '../../utils/otp';
6-
import { appDataSouce } from '../../data-source';
7-
import { Otp } from '../../entities/opt';
6+
import { appDataSource } from '../../data-source';
7+
import { Otp } from '../../entities/otp';
88
import { User } from '../../entities/User';
99
import { signAccessToken } from '../../Services/jwt.service';
1010
import { createRefreshTokenSession } from '../../Services/authToken';
1111
import bcrypt from 'bcrypt';
12+
// import { publish } from '../../messaging/rabbitmq/publish';
1213

1314
export const sendOtp = async (
1415
req: Request,
@@ -28,8 +29,8 @@ export const sendOtp = async (
2829

2930
const phoneNumber = result.data.phoneNumber;
3031

31-
const userRepo = appDataSouce.getRepository(User);
32-
const otpRepo = appDataSouce.getRepository(Otp);
32+
const userRepo = appDataSource.getRepository(User);
33+
const otpRepo = appDataSource.getRepository(Otp);
3334

3435
const existingUser = await userRepo.findOne({
3536
where: { phoneNumber },
@@ -52,6 +53,11 @@ export const sendOtp = async (
5253

5354
await sendOtpSms(phoneNumber, otpCode.toString());
5455

56+
// await publish('SEND_OTP', {
57+
// phone: phoneNumber,
58+
// otp: otpCode.toString(),
59+
// });
60+
5561
await otpRepo.delete({ phoneNumber });
5662
await otpRepo.save({
5763
phoneNumber,
@@ -83,7 +89,7 @@ export const verifyotp = async (
8389
.status(400)
8490
.json({ message: ' otp and phoneNumber are required' });
8591
}
86-
const otpRepo = appDataSouce.getRepository(Otp);
92+
const otpRepo = appDataSource.getRepository(Otp);
8793
const otpRecord = await otpRepo.findOne({
8894
where: {
8995
phoneNumber,
@@ -155,8 +161,8 @@ export const register = async (
155161
});
156162
}
157163

158-
const otpRepo = appDataSouce.getRepository(Otp);
159-
const userRepo = appDataSouce.getRepository(User);
164+
const otpRepo = appDataSource.getRepository(Otp);
165+
const userRepo = appDataSource.getRepository(User);
160166

161167
const otpRecord = await otpRepo.findOne({
162168
where: { id: otpId },
@@ -231,7 +237,7 @@ export const login = async (
231237

232238
const { phoneNumber, password } = result.data;
233239

234-
const userRepo = appDataSouce.getRepository(User);
240+
const userRepo = appDataSource.getRepository(User);
235241

236242
// 2️⃣ find user
237243
const user = await userRepo.findOne({

0 commit comments

Comments
 (0)