From 57f1ffd593b458afe414a768f14fd5cbdb66c4f9 Mon Sep 17 00:00:00 2001 From: Raghav Kattel Date: Mon, 22 Jan 2024 17:41:07 +0545 Subject: [PATCH 01/10] Add beneficiary and queue modules --- apps/sample/src/app/app.module.ts | 17 +- apps/sample/src/user/user.module.ts | 11 +- apps/sample/src/user/user.processor.ts | 15 + apps/sample/src/user/user.service.ts | 23 +- libs/beneficiary/.eslintrc.json | 18 + libs/beneficiary/README.md | 7 + libs/beneficiary/jest.config.ts | 11 + libs/beneficiary/project.json | 20 + libs/beneficiary/src/index.ts | 1 + .../beneficiary/src/lib/beneficiary.module.ts | 10 + libs/beneficiary/tsconfig.json | 22 ++ libs/beneficiary/tsconfig.lib.json | 16 + libs/beneficiary/tsconfig.spec.json | 14 + libs/queue/.eslintrc.json | 18 + libs/queue/README.md | 37 ++ libs/queue/jest.config.ts | 11 + libs/queue/project.json | 20 + libs/queue/src/index.ts | 3 + .../src/lib/interface/broker.interface.ts | 9 + .../lib/interface/queue-config.interfaces.ts | 13 + .../src/lib/interface/transport.interface.ts | 8 + .../plugins/default-queue-plugin.service.ts | 14 + libs/queue/src/lib/plugins/index.ts | 2 + .../src/lib/plugins/queue-plugin.interface.ts | 4 + libs/queue/src/lib/queue.module.ts | 32 ++ libs/queue/src/lib/queue.service.ts | 54 +++ .../src/lib/transports/bull/bull.module.ts | 16 + .../src/lib/transports/bull/bull.transport.ts | 53 +++ libs/queue/src/lib/transports/bull/index.ts | 2 + libs/queue/tsconfig.json | 22 ++ libs/queue/tsconfig.lib.json | 16 + libs/queue/tsconfig.spec.json | 14 + libs/user/src/index.ts | 1 + package-lock.json | 371 +++++++++++++++++- package.json | 4 + tsconfig.base.json | 2 + 36 files changed, 900 insertions(+), 11 deletions(-) create mode 100644 apps/sample/src/user/user.processor.ts create mode 100644 libs/beneficiary/.eslintrc.json create mode 100644 libs/beneficiary/README.md create mode 100644 libs/beneficiary/jest.config.ts create mode 100644 libs/beneficiary/project.json create mode 100644 libs/beneficiary/src/index.ts create mode 100644 libs/beneficiary/src/lib/beneficiary.module.ts create mode 100644 libs/beneficiary/tsconfig.json create mode 100644 libs/beneficiary/tsconfig.lib.json create mode 100644 libs/beneficiary/tsconfig.spec.json create mode 100644 libs/queue/.eslintrc.json create mode 100644 libs/queue/README.md create mode 100644 libs/queue/jest.config.ts create mode 100644 libs/queue/project.json create mode 100644 libs/queue/src/index.ts create mode 100644 libs/queue/src/lib/interface/broker.interface.ts create mode 100644 libs/queue/src/lib/interface/queue-config.interfaces.ts create mode 100644 libs/queue/src/lib/interface/transport.interface.ts create mode 100644 libs/queue/src/lib/plugins/default-queue-plugin.service.ts create mode 100644 libs/queue/src/lib/plugins/index.ts create mode 100644 libs/queue/src/lib/plugins/queue-plugin.interface.ts create mode 100644 libs/queue/src/lib/queue.module.ts create mode 100644 libs/queue/src/lib/queue.service.ts create mode 100644 libs/queue/src/lib/transports/bull/bull.module.ts create mode 100644 libs/queue/src/lib/transports/bull/bull.transport.ts create mode 100644 libs/queue/src/lib/transports/bull/index.ts create mode 100644 libs/queue/tsconfig.json create mode 100644 libs/queue/tsconfig.lib.json create mode 100644 libs/queue/tsconfig.spec.json diff --git a/apps/sample/src/app/app.module.ts b/apps/sample/src/app/app.module.ts index 56e6f48..04ed453 100644 --- a/apps/sample/src/app/app.module.ts +++ b/apps/sample/src/app/app.module.ts @@ -4,21 +4,34 @@ import { PrismaModule } from '@rumsan/prisma'; import { ConfigModule } from '@nestjs/config'; import { EventEmitterModule } from '@nestjs/event-emitter'; -import { RumsanUserModule } from '@rumsan/user'; +import { QueueModule } from '@rumsan/queue'; +import { RumsanUserModule, SignupModule } from '@rumsan/user'; import { ListenerModule } from '../listener/listener.module'; import { UserModule } from '../user/user.module'; import { AppController } from './app.controller'; import { AppService } from './app.service'; +// bullmq.transport.ts @Module({ imports: [ + QueueModule.forRoot({ + config: { + queueName: 'APP_TEST', + + connection: { + host: 'localhost', + port: 6379, + password: 'raghav123', + }, + }, + }), ConfigModule.forRoot({ isGlobal: true }), EventEmitterModule.forRoot({ maxListeners: 10, ignoreErrors: false }), ListenerModule, PrismaModule, UserModule, RumsanUserModule, - //SignupModule.register({ autoApprove: false }), + SignupModule.register({ autoApprove: false }), ], controllers: [AppController], providers: [AppService], diff --git a/apps/sample/src/user/user.module.ts b/apps/sample/src/user/user.module.ts index 26cc776..fa19c92 100644 --- a/apps/sample/src/user/user.module.ts +++ b/apps/sample/src/user/user.module.ts @@ -1,10 +1,15 @@ import { Module } from '@nestjs/common'; -import { RumsanUserModule } from '@rumsan/user'; +import { PrismaModule } from '@rumsan/prisma'; +import { QueueModule } from '@rumsan/queue'; +import { AuthModule } from '@rumsan/user'; import { AppUserController } from './user.controller'; +import { UserProcessor } from './user.processor'; import { AppUserService } from './user.service'; @Module({ + imports: [AuthModule, PrismaModule], controllers: [AppUserController], - providers: [AppUserService], + + providers: [UserProcessor, AppUserService, QueueModule], }) -export class UserModule extends RumsanUserModule {} +export class UserModule {} diff --git a/apps/sample/src/user/user.processor.ts b/apps/sample/src/user/user.processor.ts new file mode 100644 index 0000000..b2b0b7a --- /dev/null +++ b/apps/sample/src/user/user.processor.ts @@ -0,0 +1,15 @@ +import { Inject, Injectable, Logger } from '@nestjs/common'; +import { QueueService } from '@rumsan/queue'; + +@Injectable() +export class UserProcessor { + private readonly _logger = new Logger('USER_TEST'); + + constructor(@Inject('TRANSPORT') private readonly queue: QueueService) {} + + public async sendOTP(d): Promise { + this.queue.receiveMessage('USER_TEST', (data) => { + console.log('data', data); + }); + } +} diff --git a/apps/sample/src/user/user.service.ts b/apps/sample/src/user/user.service.ts index 3dc9735..363455d 100644 --- a/apps/sample/src/user/user.service.ts +++ b/apps/sample/src/user/user.service.ts @@ -1,11 +1,26 @@ import { Injectable } from '@nestjs/common'; -import { UserService } from '@rumsan/user'; - -const USER_ROLE_ID = 3; +import { PrismaService } from '@rumsan/prisma'; +import { QueueService } from '@rumsan/queue'; +import { AuthService, UserService } from '@rumsan/user'; @Injectable() export class AppUserService extends UserService { + constructor( + private readonly queueService: QueueService, + protected prisma: PrismaService, + public authService: AuthService, + ) { + super(prisma, authService); + } + async Test(dto: any) { - return {}; + // console.log('queueService', { s: this.queueService }); + // this.queueService.sendMessage('APP_TEST', dto); + // this.queueService.sendMessage('USER_@', { message: 'success' }); + this.queueService.receiveMessage('APP_TEST', (data) => { + console.log('data', data); + }); + + return { message: 'success' }; } } diff --git a/libs/beneficiary/.eslintrc.json b/libs/beneficiary/.eslintrc.json new file mode 100644 index 0000000..9d9c0db --- /dev/null +++ b/libs/beneficiary/.eslintrc.json @@ -0,0 +1,18 @@ +{ + "extends": ["../../.eslintrc.json"], + "ignorePatterns": ["!**/*"], + "overrides": [ + { + "files": ["*.ts", "*.tsx", "*.js", "*.jsx"], + "rules": {} + }, + { + "files": ["*.ts", "*.tsx"], + "rules": {} + }, + { + "files": ["*.js", "*.jsx"], + "rules": {} + } + ] +} diff --git a/libs/beneficiary/README.md b/libs/beneficiary/README.md new file mode 100644 index 0000000..c0b6200 --- /dev/null +++ b/libs/beneficiary/README.md @@ -0,0 +1,7 @@ +# beneficiary + +This library was generated with [Nx](https://nx.dev). + +## Running unit tests + +Run `nx test beneficiary` to execute the unit tests via [Jest](https://jestjs.io). diff --git a/libs/beneficiary/jest.config.ts b/libs/beneficiary/jest.config.ts new file mode 100644 index 0000000..8df00b3 --- /dev/null +++ b/libs/beneficiary/jest.config.ts @@ -0,0 +1,11 @@ +/* eslint-disable */ +export default { + displayName: 'beneficiary', + preset: '../../jest.preset.js', + testEnvironment: 'node', + transform: { + '^.+\\.[tj]s$': ['ts-jest', { tsconfig: '/tsconfig.spec.json' }], + }, + moduleFileExtensions: ['ts', 'js', 'html'], + coverageDirectory: '../../coverage/libs/beneficiary', +}; diff --git a/libs/beneficiary/project.json b/libs/beneficiary/project.json new file mode 100644 index 0000000..cc30f51 --- /dev/null +++ b/libs/beneficiary/project.json @@ -0,0 +1,20 @@ +{ + "name": "beneficiary", + "$schema": "../../node_modules/nx/schemas/project-schema.json", + "sourceRoot": "libs/beneficiary/src", + "projectType": "library", + "targets": { + "lint": { + "executor": "@nx/eslint:lint", + "outputs": ["{options.outputFile}"] + }, + "test": { + "executor": "@nx/jest:jest", + "outputs": ["{workspaceRoot}/coverage/{projectRoot}"], + "options": { + "jestConfig": "libs/beneficiary/jest.config.ts" + } + } + }, + "tags": [] +} diff --git a/libs/beneficiary/src/index.ts b/libs/beneficiary/src/index.ts new file mode 100644 index 0000000..5f4876d --- /dev/null +++ b/libs/beneficiary/src/index.ts @@ -0,0 +1 @@ +export * from './lib/beneficiary.module'; diff --git a/libs/beneficiary/src/lib/beneficiary.module.ts b/libs/beneficiary/src/lib/beneficiary.module.ts new file mode 100644 index 0000000..cf26d45 --- /dev/null +++ b/libs/beneficiary/src/lib/beneficiary.module.ts @@ -0,0 +1,10 @@ +import { Module } from '@nestjs/common'; +import { PrismaModule, PrismaService } from '@rumsan/prisma'; + +@Module({ + controllers: [], + providers: [PrismaService], + exports: [], + imports: [PrismaModule], +}) +export class BeneficiaryModule {} diff --git a/libs/beneficiary/tsconfig.json b/libs/beneficiary/tsconfig.json new file mode 100644 index 0000000..f5b8565 --- /dev/null +++ b/libs/beneficiary/tsconfig.json @@ -0,0 +1,22 @@ +{ + "extends": "../../tsconfig.base.json", + "compilerOptions": { + "module": "commonjs", + "forceConsistentCasingInFileNames": true, + "strict": true, + "noImplicitOverride": true, + "noPropertyAccessFromIndexSignature": true, + "noImplicitReturns": true, + "noFallthroughCasesInSwitch": true + }, + "files": [], + "include": [], + "references": [ + { + "path": "./tsconfig.lib.json" + }, + { + "path": "./tsconfig.spec.json" + } + ] +} diff --git a/libs/beneficiary/tsconfig.lib.json b/libs/beneficiary/tsconfig.lib.json new file mode 100644 index 0000000..c297a24 --- /dev/null +++ b/libs/beneficiary/tsconfig.lib.json @@ -0,0 +1,16 @@ +{ + "extends": "./tsconfig.json", + "compilerOptions": { + "outDir": "../../dist/out-tsc", + "declaration": true, + "types": ["node"], + "target": "es2021", + "strictNullChecks": true, + "noImplicitAny": true, + "strictBindCallApply": true, + "forceConsistentCasingInFileNames": true, + "noFallthroughCasesInSwitch": true + }, + "include": ["src/**/*.ts"], + "exclude": ["jest.config.ts", "src/**/*.spec.ts", "src/**/*.test.ts"] +} diff --git a/libs/beneficiary/tsconfig.spec.json b/libs/beneficiary/tsconfig.spec.json new file mode 100644 index 0000000..9b2a121 --- /dev/null +++ b/libs/beneficiary/tsconfig.spec.json @@ -0,0 +1,14 @@ +{ + "extends": "./tsconfig.json", + "compilerOptions": { + "outDir": "../../dist/out-tsc", + "module": "commonjs", + "types": ["jest", "node"] + }, + "include": [ + "jest.config.ts", + "src/**/*.test.ts", + "src/**/*.spec.ts", + "src/**/*.d.ts" + ] +} diff --git a/libs/queue/.eslintrc.json b/libs/queue/.eslintrc.json new file mode 100644 index 0000000..9d9c0db --- /dev/null +++ b/libs/queue/.eslintrc.json @@ -0,0 +1,18 @@ +{ + "extends": ["../../.eslintrc.json"], + "ignorePatterns": ["!**/*"], + "overrides": [ + { + "files": ["*.ts", "*.tsx", "*.js", "*.jsx"], + "rules": {} + }, + { + "files": ["*.ts", "*.tsx"], + "rules": {} + }, + { + "files": ["*.js", "*.jsx"], + "rules": {} + } + ] +} diff --git a/libs/queue/README.md b/libs/queue/README.md new file mode 100644 index 0000000..136c408 --- /dev/null +++ b/libs/queue/README.md @@ -0,0 +1,37 @@ +Queue Library Documentation +The Queue library is a module that provides a way to handle message queues in your application. It uses the BullMQ library as the default transport layer for sending and receiving messages, but you can also implement your own transport layer. + +Setup +First, import the QueueModule in your application module: + +Configuration +The QueueModule accepts a configuration object of type IQueueModuleOptions from queue-config.interfaces.ts. This configuration object includes the queue configuration and transport layer. + +Here is an example of how to use the forRoot method to configure the QueueModule: + +In this example, myQueue is the name of the queue, and BullMQTransport is the transport layer. You can replace BullMQTransport with your own transport layer if you want. + +Queue Service +The QueueService provides methods to interact with the queue. You can inject it into your services or controllers: + +Here are the methods provided by the QueueService: + +connect(): Connects to the queue. +disconnect(): Disconnects from the queue. +sendMessage(queue: string, data: any): Sends a message to the specified queue. +receiveMessage(queue: string, callback: (data: any) => void): Receives a message from the specified queue and processes it with the provided callback function. +Queue Plugins +You can create custom plugins to modify the behavior of the queue. A plugin must implement the QueuePlugin interface from queue-plugin.interface.ts. The DefaultQueuePluginService from default-queue-plugin.service.ts is an example of a queue plugin. + +Transports +The transport layer is responsible for the actual sending and receiving of messages. The default transport is BullMQTransport from bull.transport.ts, but you can create your own by implementing the TransportInterface from transport.interface.ts. + +Testing +To run tests for the queue library, use the test script in the project.json file: + +This will run the Jest tests configured in jest.config.ts. + +Linting +To lint the queue library, use the lint script in the project.json file: + +This will run ESLint with the configuration specified in .eslintrc.json. diff --git a/libs/queue/jest.config.ts b/libs/queue/jest.config.ts new file mode 100644 index 0000000..240adf0 --- /dev/null +++ b/libs/queue/jest.config.ts @@ -0,0 +1,11 @@ +/* eslint-disable */ +export default { + displayName: 'queue', + preset: '../../jest.preset.js', + testEnvironment: 'node', + transform: { + '^.+\\.[tj]s$': ['ts-jest', { tsconfig: '/tsconfig.spec.json' }], + }, + moduleFileExtensions: ['ts', 'js', 'html'], + coverageDirectory: '../../coverage/libs/queue', +}; diff --git a/libs/queue/project.json b/libs/queue/project.json new file mode 100644 index 0000000..5c58182 --- /dev/null +++ b/libs/queue/project.json @@ -0,0 +1,20 @@ +{ + "name": "queue", + "$schema": "../../node_modules/nx/schemas/project-schema.json", + "sourceRoot": "libs/queue/src", + "projectType": "library", + "targets": { + "lint": { + "executor": "@nx/eslint:lint", + "outputs": ["{options.outputFile}"] + }, + "test": { + "executor": "@nx/jest:jest", + "outputs": ["{workspaceRoot}/coverage/{projectRoot}"], + "options": { + "jestConfig": "libs/queue/jest.config.ts" + } + } + }, + "tags": [] +} diff --git a/libs/queue/src/index.ts b/libs/queue/src/index.ts new file mode 100644 index 0000000..caccd1a --- /dev/null +++ b/libs/queue/src/index.ts @@ -0,0 +1,3 @@ +export * from './lib/plugins'; +export * from './lib/queue.module'; +export * from './lib/queue.service'; diff --git a/libs/queue/src/lib/interface/broker.interface.ts b/libs/queue/src/lib/interface/broker.interface.ts new file mode 100644 index 0000000..9537c70 --- /dev/null +++ b/libs/queue/src/lib/interface/broker.interface.ts @@ -0,0 +1,9 @@ +export interface Broker { + publish(queueName: string, message: string): Promise; + consume( + queueName: string, + callback: (message: string) => void, + ): Promise; + + add(queueName: string, message: string): Promise; +} diff --git a/libs/queue/src/lib/interface/queue-config.interfaces.ts b/libs/queue/src/lib/interface/queue-config.interfaces.ts new file mode 100644 index 0000000..a5f64c0 --- /dev/null +++ b/libs/queue/src/lib/interface/queue-config.interfaces.ts @@ -0,0 +1,13 @@ +import { QueueOptions, WorkerOptions } from 'bullmq'; +import { TransportInterface } from './transport.interface'; + +export interface IQueueModuleOptions< + U extends { queueName: string } & WorkerOptions & QueueOptions = { + queueName: string; + } & WorkerOptions & + QueueOptions, + V extends TransportInterface = TransportInterface, +> { + config: U; + transport?: V; +} diff --git a/libs/queue/src/lib/interface/transport.interface.ts b/libs/queue/src/lib/interface/transport.interface.ts new file mode 100644 index 0000000..a5a642e --- /dev/null +++ b/libs/queue/src/lib/interface/transport.interface.ts @@ -0,0 +1,8 @@ +// transport.interface.ts + +export interface TransportInterface { + connect(): Promise; + sendMessage(queue: string, data: any): Promise; + receiveMessage(queue: string, callback: (data: any) => void): Promise; + disconnect(): Promise; +} diff --git a/libs/queue/src/lib/plugins/default-queue-plugin.service.ts b/libs/queue/src/lib/plugins/default-queue-plugin.service.ts new file mode 100644 index 0000000..b0b5404 --- /dev/null +++ b/libs/queue/src/lib/plugins/default-queue-plugin.service.ts @@ -0,0 +1,14 @@ +import { Injectable } from '@nestjs/common'; +import { QueuePlugin } from './queue-plugin.interface'; + +@Injectable() +export class DefaultQueuePluginService implements QueuePlugin { + beforeEnqueue(item: T): T { + // You can customize the behavior before enqueueing + return item; + } + + afterDequeue(item: T): void { + // You can customize the behavior after dequeuing + } +} diff --git a/libs/queue/src/lib/plugins/index.ts b/libs/queue/src/lib/plugins/index.ts new file mode 100644 index 0000000..b340090 --- /dev/null +++ b/libs/queue/src/lib/plugins/index.ts @@ -0,0 +1,2 @@ +export * from './default-queue-plugin.service'; +export * from './queue-plugin.interface'; diff --git a/libs/queue/src/lib/plugins/queue-plugin.interface.ts b/libs/queue/src/lib/plugins/queue-plugin.interface.ts new file mode 100644 index 0000000..9aa8273 --- /dev/null +++ b/libs/queue/src/lib/plugins/queue-plugin.interface.ts @@ -0,0 +1,4 @@ +export interface QueuePlugin { + beforeEnqueue(item: T): T; + afterDequeue(item: T): void; +} diff --git a/libs/queue/src/lib/queue.module.ts b/libs/queue/src/lib/queue.module.ts new file mode 100644 index 0000000..890bb0b --- /dev/null +++ b/libs/queue/src/lib/queue.module.ts @@ -0,0 +1,32 @@ +import { Global, Module } from '@nestjs/common'; +import { QueueOptions } from 'bullmq'; +import { IQueueModuleOptions } from './interface/queue-config.interfaces'; +import { QueueService } from './queue.service'; +import { BullMQTransport } from './transports/bull'; + +@Global() +@Module({}) +export class QueueModule { + static forRoot< + U = IQueueModuleOptions['config'], + V = IQueueModuleOptions['transport'], + >(rootConfig: IQueueModuleOptions) { + const Transport = + rootConfig.transport || + new BullMQTransport( + rootConfig.config as { queueName: string } & WorkerOptions & + QueueOptions, + ); + return { + module: QueueModule, + providers: [ + { + provide: 'TRANSPORT', + useFactory: () => Transport, + }, + QueueService, + ], + exports: ['TRANSPORT', QueueService], + }; + } +} diff --git a/libs/queue/src/lib/queue.service.ts b/libs/queue/src/lib/queue.service.ts new file mode 100644 index 0000000..bb0f614 --- /dev/null +++ b/libs/queue/src/lib/queue.service.ts @@ -0,0 +1,54 @@ +// queue.service.ts + +import { Inject, Injectable } from '@nestjs/common'; +import { IQueueModuleOptions } from './interface/queue-config.interfaces'; +import { TransportInterface } from './interface/transport.interface'; + +@Injectable() +export class QueueService { + private transport: TransportInterface; + + constructor( + @Inject('TRANSPORT') + private readonly transportV: IQueueModuleOptions['transport'], + ) { + this.transport = this.transportV as TransportInterface; + this.initializeTransport(); + } + + setTransport(transport: TransportInterface) { + this.transport = transport; + } + + async connect() { + if (!this.transport) { + this.initializeTransport(); + } + await this.transport.connect(); + } + + async sendMessage(queue: string, data: any) { + if (!this.transport) { + this.initializeTransport(); + } + await this.transport.sendMessage(queue, data); + } + + async receiveMessage(queue: string, callback: (data: any) => void) { + if (!this.transport) { + this.initializeTransport(); + } + return this.transport.receiveMessage(queue, callback); + } + + async disconnect() { + if (!this.transport) { + this.initializeTransport(); + } + await this.transport.disconnect(); + } + + private async initializeTransport() { + await this.transport.connect(); + } +} diff --git a/libs/queue/src/lib/transports/bull/bull.module.ts b/libs/queue/src/lib/transports/bull/bull.module.ts new file mode 100644 index 0000000..ed5fed5 --- /dev/null +++ b/libs/queue/src/lib/transports/bull/bull.module.ts @@ -0,0 +1,16 @@ +import { BullModuleOptions, BullModule as RootBullModule } from '@nestjs/bull'; +import { DynamicModule, Module } from '@nestjs/common'; + +@Module({ + imports: [], + controllers: [], + providers: [], +}) +export class BullModule extends RootBullModule { + static register(options: BullModuleOptions): DynamicModule { + return { + module: BullModule, + imports: [BullModule.forRoot(options)], + }; + } +} diff --git a/libs/queue/src/lib/transports/bull/bull.transport.ts b/libs/queue/src/lib/transports/bull/bull.transport.ts new file mode 100644 index 0000000..d525252 --- /dev/null +++ b/libs/queue/src/lib/transports/bull/bull.transport.ts @@ -0,0 +1,53 @@ +import { Queue, Worker, WorkerOptions } from 'bullmq'; +import { IQueueModuleOptions } from '../../interface/queue-config.interfaces'; +import { TransportInterface } from '../../interface/transport.interface'; + +export class BullMQTransport implements TransportInterface { + private queue: Queue; + private worker: Worker; + + constructor(private config: IQueueModuleOptions['config']) {} + + async connect() { + this.queue = new Queue(this.config.queueName, this.config); + this.worker = new Worker( + this.config.queueName as string, + async (job) => { + // This is where you process your jobs + console.log('processing job', job.name, job.data); + }, + this.config as WorkerOptions, + ); + } + + async sendMessage(queue: string, data: unknown) { + console.log('sending message to bullmq', queue, data); + await this.queue.add(queue, data); + } + + async receiveMessage( + queue: string, + callback: (data: unknown, j: unknown, k: unknown) => void, + ) { + console.log('receiving message from bullmq', queue); + const worker = new Worker( + queue, + async (job) => { + // This is where you process your jobs + console.log('processing job', job.name, job.data); + callback(job.data, job, worker); + }, + this.config as WorkerOptions, + ); + // console.log('worker', worker); + + worker.on('completed', (job) => { + console.log(`Job completed with result ${job.returnvalue}`); + }); + } + + async disconnect() { + await this.queue.close(); + await this.worker.close(); + } +} diff --git a/libs/queue/src/lib/transports/bull/index.ts b/libs/queue/src/lib/transports/bull/index.ts new file mode 100644 index 0000000..1207b54 --- /dev/null +++ b/libs/queue/src/lib/transports/bull/index.ts @@ -0,0 +1,2 @@ +export * from './bull.module'; +export * from './bull.transport'; diff --git a/libs/queue/tsconfig.json b/libs/queue/tsconfig.json new file mode 100644 index 0000000..f5b8565 --- /dev/null +++ b/libs/queue/tsconfig.json @@ -0,0 +1,22 @@ +{ + "extends": "../../tsconfig.base.json", + "compilerOptions": { + "module": "commonjs", + "forceConsistentCasingInFileNames": true, + "strict": true, + "noImplicitOverride": true, + "noPropertyAccessFromIndexSignature": true, + "noImplicitReturns": true, + "noFallthroughCasesInSwitch": true + }, + "files": [], + "include": [], + "references": [ + { + "path": "./tsconfig.lib.json" + }, + { + "path": "./tsconfig.spec.json" + } + ] +} diff --git a/libs/queue/tsconfig.lib.json b/libs/queue/tsconfig.lib.json new file mode 100644 index 0000000..c297a24 --- /dev/null +++ b/libs/queue/tsconfig.lib.json @@ -0,0 +1,16 @@ +{ + "extends": "./tsconfig.json", + "compilerOptions": { + "outDir": "../../dist/out-tsc", + "declaration": true, + "types": ["node"], + "target": "es2021", + "strictNullChecks": true, + "noImplicitAny": true, + "strictBindCallApply": true, + "forceConsistentCasingInFileNames": true, + "noFallthroughCasesInSwitch": true + }, + "include": ["src/**/*.ts"], + "exclude": ["jest.config.ts", "src/**/*.spec.ts", "src/**/*.test.ts"] +} diff --git a/libs/queue/tsconfig.spec.json b/libs/queue/tsconfig.spec.json new file mode 100644 index 0000000..9b2a121 --- /dev/null +++ b/libs/queue/tsconfig.spec.json @@ -0,0 +1,14 @@ +{ + "extends": "./tsconfig.json", + "compilerOptions": { + "outDir": "../../dist/out-tsc", + "module": "commonjs", + "types": ["jest", "node"] + }, + "include": [ + "jest.config.ts", + "src/**/*.test.ts", + "src/**/*.spec.ts", + "src/**/*.d.ts" + ] +} diff --git a/libs/user/src/index.ts b/libs/user/src/index.ts index 0fea85f..bc8c668 100644 --- a/libs/user/src/index.ts +++ b/libs/user/src/index.ts @@ -1,6 +1,7 @@ export * from './lib/ability/ability.decorator'; export * from './lib/ability/ability.guard'; export * from './lib/auth/auth.module'; +export * from './lib/auth/auth.service'; export * from './lib/auth/guard'; export * as RUMSAN_USER_CONSTANTS from './lib/constants'; export * from './lib/rumsan-user.module'; diff --git a/package-lock.json b/package-lock.json index cac6ec2..4e7858c 100644 --- a/package-lock.json +++ b/package-lock.json @@ -9,17 +9,21 @@ "version": "0.0.0", "license": "MIT", "dependencies": { + "@nestjs/bull": "^10.0.1", "@nestjs/common": "^10.0.2", "@nestjs/config": "^3.1.1", "@nestjs/core": "^10.0.2", "@nestjs/event-emitter": "^2.0.3", "@nestjs/jwt": "^10.2.0", + "@nestjs/mapped-types": "*", "@nestjs/passport": "^10.0.3", "@nestjs/platform-express": "^10.0.2", "@nestjs/swagger": "^7.1.17", "@nodeteam/nestjs-prisma-pagination": "^1.0.6", "@prisma/client": "^5.7.1", "axios": "^1.0.0", + "bull": "^4.12.1", + "bullmq": "^5.1.3", "class-transformer": "^0.5.1", "class-validator": "^0.14.0", "ethers": "^6.9.1", @@ -2405,6 +2409,11 @@ "integrity": "sha512-dvuCeX5fC9dXgJn9t+X5atfmgQAzUOWqS1254Gh0m6i8wKd10ebXkfNKiRK+1GWi/yTvvLDHpoxLr0xxxeslWw==", "dev": true }, + "node_modules/@ioredis/commands": { + "version": "1.2.0", + "resolved": "https://registry.npmjs.org/@ioredis/commands/-/commands-1.2.0.tgz", + "integrity": "sha512-Sx1pU8EM64o2BrqNpEO1CNLtKQwyhuXuqyfH7oGKCk+1a33d2r5saW8zNwm3j6BTExtjrv2BxTgzzkMwts6vGg==" + }, "node_modules/@istanbuljs/load-nyc-config": { "version": "1.1.0", "resolved": "https://registry.npmjs.org/@istanbuljs/load-nyc-config/-/load-nyc-config-1.1.0.tgz", @@ -2779,6 +2788,114 @@ "node": ">=8" } }, + "node_modules/@msgpackr-extract/msgpackr-extract-darwin-arm64": { + "version": "3.0.2", + "resolved": "https://registry.npmjs.org/@msgpackr-extract/msgpackr-extract-darwin-arm64/-/msgpackr-extract-darwin-arm64-3.0.2.tgz", + "integrity": "sha512-9bfjwDxIDWmmOKusUcqdS4Rw+SETlp9Dy39Xui9BEGEk19dDwH0jhipwFzEff/pFg95NKymc6TOTbRKcWeRqyQ==", + "cpu": [ + "arm64" + ], + "optional": true, + "os": [ + "darwin" + ] + }, + "node_modules/@msgpackr-extract/msgpackr-extract-darwin-x64": { + "version": "3.0.2", + "resolved": "https://registry.npmjs.org/@msgpackr-extract/msgpackr-extract-darwin-x64/-/msgpackr-extract-darwin-x64-3.0.2.tgz", + "integrity": "sha512-lwriRAHm1Yg4iDf23Oxm9n/t5Zpw1lVnxYU3HnJPTi2lJRkKTrps1KVgvL6m7WvmhYVt/FIsssWay+k45QHeuw==", + "cpu": [ + "x64" + ], + "optional": true, + "os": [ + "darwin" + ] + }, + "node_modules/@msgpackr-extract/msgpackr-extract-linux-arm": { + "version": "3.0.2", + "resolved": "https://registry.npmjs.org/@msgpackr-extract/msgpackr-extract-linux-arm/-/msgpackr-extract-linux-arm-3.0.2.tgz", + "integrity": "sha512-MOI9Dlfrpi2Cuc7i5dXdxPbFIgbDBGgKR5F2yWEa6FVEtSWncfVNKW5AKjImAQ6CZlBK9tympdsZJ2xThBiWWA==", + "cpu": [ + "arm" + ], + "optional": true, + "os": [ + "linux" + ] + }, + "node_modules/@msgpackr-extract/msgpackr-extract-linux-arm64": { + "version": "3.0.2", + "resolved": "https://registry.npmjs.org/@msgpackr-extract/msgpackr-extract-linux-arm64/-/msgpackr-extract-linux-arm64-3.0.2.tgz", + "integrity": "sha512-FU20Bo66/f7He9Fp9sP2zaJ1Q8L9uLPZQDub/WlUip78JlPeMbVL8546HbZfcW9LNciEXc8d+tThSJjSC+tmsg==", + "cpu": [ + "arm64" + ], + "optional": true, + "os": [ + "linux" + ] + }, + "node_modules/@msgpackr-extract/msgpackr-extract-linux-x64": { + "version": "3.0.2", + "resolved": "https://registry.npmjs.org/@msgpackr-extract/msgpackr-extract-linux-x64/-/msgpackr-extract-linux-x64-3.0.2.tgz", + "integrity": "sha512-gsWNDCklNy7Ajk0vBBf9jEx04RUxuDQfBse918Ww+Qb9HCPoGzS+XJTLe96iN3BVK7grnLiYghP/M4L8VsaHeA==", + "cpu": [ + "x64" + ], + "optional": true, + "os": [ + "linux" + ] + }, + "node_modules/@msgpackr-extract/msgpackr-extract-win32-x64": { + "version": "3.0.2", + "resolved": "https://registry.npmjs.org/@msgpackr-extract/msgpackr-extract-win32-x64/-/msgpackr-extract-win32-x64-3.0.2.tgz", + "integrity": "sha512-O+6Gs8UeDbyFpbSh2CPEz/UOrrdWPTBYNblZK5CxxLisYt4kGX3Sc+czffFonyjiGSq3jWLwJS/CCJc7tBr4sQ==", + "cpu": [ + "x64" + ], + "optional": true, + "os": [ + "win32" + ] + }, + "node_modules/@nestjs/bull": { + "version": "10.0.1", + "resolved": "https://registry.npmjs.org/@nestjs/bull/-/bull-10.0.1.tgz", + "integrity": "sha512-1GcJ8BkHDgQdBMZ7SqAqgUHiFnISXmpGvewFeTc8wf87JLk2PweiKv9j9/KQKU+NI237pCe82XB0bXzTnsdxSw==", + "dependencies": { + "@nestjs/bull-shared": "^10.0.1", + "tslib": "2.6.0" + }, + "peerDependencies": { + "@nestjs/common": "^8.0.0 || ^9.0.0 || ^10.0.0", + "@nestjs/core": "^8.0.0 || ^9.0.0 || ^10.0.0", + "bull": "^3.3 || ^4.0.0" + } + }, + "node_modules/@nestjs/bull-shared": { + "version": "10.0.1", + "resolved": "https://registry.npmjs.org/@nestjs/bull-shared/-/bull-shared-10.0.1.tgz", + "integrity": "sha512-8Td36l2i5x9+iQWjPB5Bd5+6u5Eangb5DclNcwrdwKqvd28xE92MSW97P4JV52C2kxrTjZwx8ck/wObAwtpQPw==", + "dependencies": { + "tslib": "2.6.0" + }, + "peerDependencies": { + "@nestjs/common": "^8.0.0 || ^9.0.0 || ^10.0.0", + "@nestjs/core": "^8.0.0 || ^9.0.0 || ^10.0.0" + } + }, + "node_modules/@nestjs/bull-shared/node_modules/tslib": { + "version": "2.6.0", + "resolved": "https://registry.npmjs.org/tslib/-/tslib-2.6.0.tgz", + "integrity": "sha512-7At1WUettjcSRHXCyYtTselblcHl9PJFFVKiCAy/bY97+BPZXSQ2wbq0P9s8tK2G7dFQfNnlJnPAiArVBVBsfA==" + }, + "node_modules/@nestjs/bull/node_modules/tslib": { + "version": "2.6.0", + "resolved": "https://registry.npmjs.org/tslib/-/tslib-2.6.0.tgz", + "integrity": "sha512-7At1WUettjcSRHXCyYtTselblcHl9PJFFVKiCAy/bY97+BPZXSQ2wbq0P9s8tK2G7dFQfNnlJnPAiArVBVBsfA==" + }, "node_modules/@nestjs/common": { "version": "10.3.0", "resolved": "https://registry.npmjs.org/@nestjs/common/-/common-10.3.0.tgz", @@ -6984,6 +7101,114 @@ "semver": "^7.0.0" } }, + "node_modules/bull": { + "version": "4.12.1", + "resolved": "https://registry.npmjs.org/bull/-/bull-4.12.1.tgz", + "integrity": "sha512-ft4hTmex7WGSHt56mydw9uRKskkvgiNwqTYiV9b6q3ubhplglQmjo9OZrHlcUVNwBqSBhnzlsJQ9N/Wd7nhENA==", + "dependencies": { + "cron-parser": "^4.2.1", + "get-port": "^5.1.1", + "ioredis": "^5.3.2", + "lodash": "^4.17.21", + "msgpackr": "^1.5.2", + "semver": "^7.5.2", + "uuid": "^8.3.0" + }, + "engines": { + "node": ">=12" + } + }, + "node_modules/bull/node_modules/uuid": { + "version": "8.3.2", + "resolved": "https://registry.npmjs.org/uuid/-/uuid-8.3.2.tgz", + "integrity": "sha512-+NYs2QeMWy+GWFOEm9xnn6HCDp0l7QBD7ml8zLUmJ+93Q5NF0NocErnwkTkXVFNiX3/fpC6afS8Dhb/gz7R7eg==", + "bin": { + "uuid": "dist/bin/uuid" + } + }, + "node_modules/bullmq": { + "version": "5.1.3", + "resolved": "https://registry.npmjs.org/bullmq/-/bullmq-5.1.3.tgz", + "integrity": "sha512-safpGwiwKHsNPW01Wk8FPxdWbPS2mA0HZKqIhdQB10J4wWRSDWPeQE2p+YYnAmqEsk0FwJdZnzVcwCfn7w5cVA==", + "dependencies": { + "cron-parser": "^4.6.0", + "glob": "^8.0.3", + "ioredis": "^5.3.2", + "lodash": "^4.17.21", + "msgpackr": "^1.10.1", + "node-abort-controller": "^3.1.1", + "semver": "^7.5.4", + "tslib": "^2.0.0", + "uuid": "^9.0.0" + } + }, + "node_modules/bullmq/node_modules/brace-expansion": { + "version": "2.0.1", + "resolved": "https://registry.npmjs.org/brace-expansion/-/brace-expansion-2.0.1.tgz", + "integrity": "sha512-XnAIvQ8eM+kC6aULx6wuQiwVsnzsi9d3WxzV3FpWTGA19F621kwdbsAcFKXgKUHZWsy+mY6iL1sHTxWEFCytDA==", + "dependencies": { + "balanced-match": "^1.0.0" + } + }, + "node_modules/bullmq/node_modules/glob": { + "version": "8.1.0", + "resolved": "https://registry.npmjs.org/glob/-/glob-8.1.0.tgz", + "integrity": "sha512-r8hpEjiQEYlF2QU0df3dS+nxxSIreXQS1qRhMJM0Q5NDdR386C7jb7Hwwod8Fgiuex+k0GFjgft18yvxm5XoCQ==", + "dependencies": { + "fs.realpath": "^1.0.0", + "inflight": "^1.0.4", + "inherits": "2", + "minimatch": "^5.0.1", + "once": "^1.3.0" + }, + "engines": { + "node": ">=12" + }, + "funding": { + "url": "https://github.com/sponsors/isaacs" + } + }, + "node_modules/bullmq/node_modules/lru-cache": { + "version": "6.0.0", + "resolved": "https://registry.npmjs.org/lru-cache/-/lru-cache-6.0.0.tgz", + "integrity": "sha512-Jo6dJ04CmSjuznwJSS3pUeWmd/H0ffTlkXXgwZi+eq1UCmqQwCh+eLsYOYCwY991i2Fah4h1BEMCx4qThGbsiA==", + "dependencies": { + "yallist": "^4.0.0" + }, + "engines": { + "node": ">=10" + } + }, + "node_modules/bullmq/node_modules/minimatch": { + "version": "5.1.6", + "resolved": "https://registry.npmjs.org/minimatch/-/minimatch-5.1.6.tgz", + "integrity": "sha512-lKwV/1brpG6mBUFHtb7NUmtABCb2WZZmm2wNiOA5hAb8VdCS4B3dtMWyvcoViccwAW/COERjXLt0zP1zXUN26g==", + "dependencies": { + "brace-expansion": "^2.0.1" + }, + "engines": { + "node": ">=10" + } + }, + "node_modules/bullmq/node_modules/semver": { + "version": "7.5.4", + "resolved": "https://registry.npmjs.org/semver/-/semver-7.5.4.tgz", + "integrity": "sha512-1bCSESV6Pv+i21Hvpxp3Dx+pSD8lIPt8uVjRrxAUt/nbswYc+tK6Y2btiULjd4+fnq15PX+nqQDC7Oft7WkwcA==", + "dependencies": { + "lru-cache": "^6.0.0" + }, + "bin": { + "semver": "bin/semver.js" + }, + "engines": { + "node": ">=10" + } + }, + "node_modules/bullmq/node_modules/yallist": { + "version": "4.0.0", + "resolved": "https://registry.npmjs.org/yallist/-/yallist-4.0.0.tgz", + "integrity": "sha512-3wdGidZyq5PB084XLES5TpOSRA3wjXAlIWMhum2kRcv/41Sn2emQ0dycQW4uZXLejwKvg6EsvbdlVL+FYEct7A==" + }, "node_modules/busboy": { "version": "1.6.0", "resolved": "https://registry.npmjs.org/busboy/-/busboy-1.6.0.tgz", @@ -7220,6 +7445,14 @@ "node": ">=0.8" } }, + "node_modules/cluster-key-slot": { + "version": "1.1.2", + "resolved": "https://registry.npmjs.org/cluster-key-slot/-/cluster-key-slot-1.1.2.tgz", + "integrity": "sha512-RMr0FhtfXemyinomL4hrWcYJxmX6deFdCxpJzhDttxgO1+bcCnkk+9drydLVDmAMG7NE6aN/fl4F7ucU/90gAA==", + "engines": { + "node": ">=0.10.0" + } + }, "node_modules/co": { "version": "4.6.0", "resolved": "https://registry.npmjs.org/co/-/co-4.6.0.tgz", @@ -7732,6 +7965,17 @@ "integrity": "sha512-dcKFX3jn0MpIaXjisoRvexIJVEKzaq7z2rZKxf+MSr9TkdmHmsU4m2lcLojrj/FHl8mk5VxMmYA+ftRkP/3oKQ==", "devOptional": true }, + "node_modules/cron-parser": { + "version": "4.9.0", + "resolved": "https://registry.npmjs.org/cron-parser/-/cron-parser-4.9.0.tgz", + "integrity": "sha512-p0SaNjrHOnQeR8/VnfGbmg9te2kfyYSQ7Sc/j/6DtPL3JQvKxmjO9TSjNFpujqV3vEYYBvNNvXSxzyksBWAx1Q==", + "dependencies": { + "luxon": "^3.2.1" + }, + "engines": { + "node": ">=12.0.0" + } + }, "node_modules/cross-spawn": { "version": "7.0.3", "resolved": "https://registry.npmjs.org/cross-spawn/-/cross-spawn-7.0.3.tgz", @@ -8156,6 +8400,14 @@ "node": ">=0.4.0" } }, + "node_modules/denque": { + "version": "2.1.0", + "resolved": "https://registry.npmjs.org/denque/-/denque-2.1.0.tgz", + "integrity": "sha512-HVQE3AAb/pxF8fQAoiqpvg9i3evqug3hoiwakOyZAwJm+6vZehbkYXZ0l4JxS+I3QxM97v5aaRNhj8v5oBhekw==", + "engines": { + "node": ">=0.10" + } + }, "node_modules/depd": { "version": "2.0.0", "resolved": "https://registry.npmjs.org/depd/-/depd-2.0.0.tgz", @@ -9522,6 +9774,17 @@ "node": ">=8.0.0" } }, + "node_modules/get-port": { + "version": "5.1.1", + "resolved": "https://registry.npmjs.org/get-port/-/get-port-5.1.1.tgz", + "integrity": "sha512-g/Q1aTSDOxFpchXC4i8ZWvxA1lnPqx/JHqcpIw0/LX9T8x/GBbi6YnlN5nhaKIFkT8oFsscUKgDJYxfwfS6QsQ==", + "engines": { + "node": ">=8" + }, + "funding": { + "url": "https://github.com/sponsors/sindresorhus" + } + }, "node_modules/get-stream": { "version": "6.0.1", "resolved": "https://registry.npmjs.org/get-stream/-/get-stream-6.0.1.tgz", @@ -10108,6 +10371,29 @@ "resolved": "https://registry.npmjs.org/inherits/-/inherits-2.0.4.tgz", "integrity": "sha512-k/vGaX4/Yla3WzyMCvTQOXYeIHvqOKtnqBduzTHpzpQZzAskKMhZ2K+EnBiSM9zGSoIFeMpXKxa4dYeZIQqewQ==" }, + "node_modules/ioredis": { + "version": "5.3.2", + "resolved": "https://registry.npmjs.org/ioredis/-/ioredis-5.3.2.tgz", + "integrity": "sha512-1DKMMzlIHM02eBBVOFQ1+AolGjs6+xEcM4PDL7NqOS6szq7H9jSaEkIUH6/a5Hl241LzW6JLSiAbNvTQjUupUA==", + "dependencies": { + "@ioredis/commands": "^1.1.1", + "cluster-key-slot": "^1.1.0", + "debug": "^4.3.4", + "denque": "^2.1.0", + "lodash.defaults": "^4.2.0", + "lodash.isarguments": "^3.1.0", + "redis-errors": "^1.2.0", + "redis-parser": "^3.0.0", + "standard-as-callback": "^2.1.0" + }, + "engines": { + "node": ">=12.22.0" + }, + "funding": { + "type": "opencollective", + "url": "https://opencollective.com/ioredis" + } + }, "node_modules/ipaddr.js": { "version": "1.9.1", "resolved": "https://registry.npmjs.org/ipaddr.js/-/ipaddr.js-1.9.1.tgz", @@ -11477,11 +11763,21 @@ "integrity": "sha512-FT1yDzDYEoYWhnSGnpE/4Kj1fLZkDFyqRb7fNt6FdYOSxlUWAtp42Eh6Wb0rGIv/m9Bgo7x4GhQbm5Ys4SG5ow==", "dev": true }, + "node_modules/lodash.defaults": { + "version": "4.2.0", + "resolved": "https://registry.npmjs.org/lodash.defaults/-/lodash.defaults-4.2.0.tgz", + "integrity": "sha512-qjxPLHd3r5DnsdGacqOMU6pb/avJzdh9tFX2ymgoZE27BmjXrNy/y4LoaiTeAb+O3gL8AfpJGtqfX/ae2leYYQ==" + }, "node_modules/lodash.includes": { "version": "4.3.0", "resolved": "https://registry.npmjs.org/lodash.includes/-/lodash.includes-4.3.0.tgz", "integrity": "sha512-W3Bx6mdkRTGtlJISOvVD/lbqjTlPPUDTMnlXZFnVwi9NKJ6tiAk6LVdlhZMm17VZisqhKcgzpO5Wz91PCt5b0w==" }, + "node_modules/lodash.isarguments": { + "version": "3.1.0", + "resolved": "https://registry.npmjs.org/lodash.isarguments/-/lodash.isarguments-3.1.0.tgz", + "integrity": "sha512-chi4NHZlZqZD18a0imDHnZPrDeBbTtVN7GXMwuGdRH9qotxAjYs3aVLKc7zNOG9eddR5Ksd8rvFEBc9SsggPpg==" + }, "node_modules/lodash.isboolean": { "version": "3.0.3", "resolved": "https://registry.npmjs.org/lodash.isboolean/-/lodash.isboolean-3.0.3.tgz", @@ -11587,6 +11883,14 @@ "yallist": "^3.0.2" } }, + "node_modules/luxon": { + "version": "3.4.4", + "resolved": "https://registry.npmjs.org/luxon/-/luxon-3.4.4.tgz", + "integrity": "sha512-zobTr7akeGHnv7eBOXcRgMeCP6+uyYsczwmeRCauvpvaAltgNyTbLH/+VaEAPUeWBT+1GuNmz4wC/6jtQzbbVA==", + "engines": { + "node": ">=12" + } + }, "node_modules/magic-string": { "version": "0.30.1", "resolved": "https://registry.npmjs.org/magic-string/-/magic-string-0.30.1.tgz", @@ -11842,6 +12146,35 @@ "resolved": "https://registry.npmjs.org/ms/-/ms-2.1.2.tgz", "integrity": "sha512-sGkPx+VjMtmA6MX27oA4FBFELFCZZ4S4XqeGOXCv68tT+jb3vk/RyaKWP0PTKyWtmLSM0b+adUTEvbs1PEaH2w==" }, + "node_modules/msgpackr": { + "version": "1.10.1", + "resolved": "https://registry.npmjs.org/msgpackr/-/msgpackr-1.10.1.tgz", + "integrity": "sha512-r5VRLv9qouXuLiIBrLpl2d5ZvPt8svdQTl5/vMvE4nzDMyEX4sgW5yWhuBBj5UmgwOTWj8CIdSXn5sAfsHAWIQ==", + "optionalDependencies": { + "msgpackr-extract": "^3.0.2" + } + }, + "node_modules/msgpackr-extract": { + "version": "3.0.2", + "resolved": "https://registry.npmjs.org/msgpackr-extract/-/msgpackr-extract-3.0.2.tgz", + "integrity": "sha512-SdzXp4kD/Qf8agZ9+iTu6eql0m3kWm1A2y1hkpTeVNENutaB0BwHlSvAIaMxwntmRUAUjon2V4L8Z/njd0Ct8A==", + "hasInstallScript": true, + "optional": true, + "dependencies": { + "node-gyp-build-optional-packages": "5.0.7" + }, + "bin": { + "download-msgpackr-prebuilds": "bin/download-prebuilds.js" + }, + "optionalDependencies": { + "@msgpackr-extract/msgpackr-extract-darwin-arm64": "3.0.2", + "@msgpackr-extract/msgpackr-extract-darwin-x64": "3.0.2", + "@msgpackr-extract/msgpackr-extract-linux-arm": "3.0.2", + "@msgpackr-extract/msgpackr-extract-linux-arm64": "3.0.2", + "@msgpackr-extract/msgpackr-extract-linux-x64": "3.0.2", + "@msgpackr-extract/msgpackr-extract-win32-x64": "3.0.2" + } + }, "node_modules/multer": { "version": "1.4.4-lts.1", "resolved": "https://registry.npmjs.org/multer/-/multer-1.4.4-lts.1.tgz", @@ -12009,8 +12342,7 @@ "node_modules/node-abort-controller": { "version": "3.1.1", "resolved": "https://registry.npmjs.org/node-abort-controller/-/node-abort-controller-3.1.1.tgz", - "integrity": "sha512-AGK2yQKIjRuqnc6VkX2Xj5d+QW8xZ87pa1UK6yA6ouUyuxfHuMP6umE5QK7UmTeOAymo+Zx1Fxiuw9rVx8taHQ==", - "dev": true + "integrity": "sha512-AGK2yQKIjRuqnc6VkX2Xj5d+QW8xZ87pa1UK6yA6ouUyuxfHuMP6umE5QK7UmTeOAymo+Zx1Fxiuw9rVx8taHQ==" }, "node_modules/node-fetch": { "version": "2.7.0", @@ -12040,6 +12372,17 @@ "node": ">= 6.13.0" } }, + "node_modules/node-gyp-build-optional-packages": { + "version": "5.0.7", + "resolved": "https://registry.npmjs.org/node-gyp-build-optional-packages/-/node-gyp-build-optional-packages-5.0.7.tgz", + "integrity": "sha512-YlCCc6Wffkx0kHkmam79GKvDQ6x+QZkMjFGrIMxgFNILFvGSbCp2fCBC55pGTT9gVaz8Na5CLmxt/urtzRv36w==", + "optional": true, + "bin": { + "node-gyp-build-optional-packages": "bin.js", + "node-gyp-build-optional-packages-optional": "optional.js", + "node-gyp-build-optional-packages-test": "build-test.js" + } + }, "node_modules/node-int64": { "version": "0.4.0", "resolved": "https://registry.npmjs.org/node-int64/-/node-int64-0.4.0.tgz", @@ -13781,6 +14124,25 @@ "node": ">= 12.13.0" } }, + "node_modules/redis-errors": { + "version": "1.2.0", + "resolved": "https://registry.npmjs.org/redis-errors/-/redis-errors-1.2.0.tgz", + "integrity": "sha512-1qny3OExCf0UvUV/5wpYKf2YwPcOqXzkwKKSmKHiE6ZMQs5heeE/c8eXK+PNllPvmjgAbfnsbpkGZWy8cBpn9w==", + "engines": { + "node": ">=4" + } + }, + "node_modules/redis-parser": { + "version": "3.0.0", + "resolved": "https://registry.npmjs.org/redis-parser/-/redis-parser-3.0.0.tgz", + "integrity": "sha512-DJnGAeenTdpMEH6uAJRK/uiyEIH9WVsUmoLwzudwGJUwZPp80PDBWPHXSAGNPwNvIXAbe7MSUB1zQFugFml66A==", + "dependencies": { + "redis-errors": "^1.0.0" + }, + "engines": { + "node": ">=4" + } + }, "node_modules/reflect-metadata": { "version": "0.1.14", "resolved": "https://registry.npmjs.org/reflect-metadata/-/reflect-metadata-0.1.14.tgz", @@ -14596,6 +14958,11 @@ "integrity": "sha512-oeVtt7eWQS+Na6F//S4kJ2K2VbRlS9D43mAlMyVpVWovy9o+jfgH8O9agzANzaiLjclA0oYzUXEM4PurhSUChw==", "dev": true }, + "node_modules/standard-as-callback": { + "version": "2.1.0", + "resolved": "https://registry.npmjs.org/standard-as-callback/-/standard-as-callback-2.1.0.tgz", + "integrity": "sha512-qoRRSyROncaz1z0mvYqIE4lCd9p2R90i6GxW3uZv5ucSu8tU7B5HXUP1gG8pVZsYNVaXjk8ClXHPttLyxAL48A==" + }, "node_modules/statuses": { "version": "2.0.1", "resolved": "https://registry.npmjs.org/statuses/-/statuses-2.0.1.tgz", diff --git a/package.json b/package.json index 7da1015..dacdda4 100644 --- a/package.json +++ b/package.json @@ -7,17 +7,21 @@ }, "private": true, "dependencies": { + "@nestjs/bull": "^10.0.1", "@nestjs/common": "^10.0.2", "@nestjs/config": "^3.1.1", "@nestjs/core": "^10.0.2", "@nestjs/event-emitter": "^2.0.3", "@nestjs/jwt": "^10.2.0", + "@nestjs/mapped-types": "*", "@nestjs/passport": "^10.0.3", "@nestjs/platform-express": "^10.0.2", "@nestjs/swagger": "^7.1.17", "@nodeteam/nestjs-prisma-pagination": "^1.0.6", "@prisma/client": "^5.7.1", "axios": "^1.0.0", + "bull": "^4.12.1", + "bullmq": "^5.1.3", "class-transformer": "^0.5.1", "class-validator": "^0.14.0", "ethers": "^6.9.1", diff --git a/tsconfig.base.json b/tsconfig.base.json index 93218ba..01a0550 100644 --- a/tsconfig.base.json +++ b/tsconfig.base.json @@ -16,8 +16,10 @@ "strictPropertyInitialization": false, "baseUrl": ".", "paths": { + "@rumsan/beneficiary": ["libs/beneficiary/src/index.ts"], "@rumsan/core": ["libs/core/src/index.ts"], "@rumsan/prisma": ["libs/prisma/src/index.ts"], + "@rumsan/queue": ["libs/queue/src/index.ts"], "@rumsan/settings": ["libs/settings/src/index.ts"], "@rumsan/user": ["libs/user/src/index.ts"] } From be374b723fdd76911570c9742bb5547f890bbab5 Mon Sep 17 00:00:00 2001 From: Raghav Kattel Date: Mon, 22 Jan 2024 17:47:38 +0545 Subject: [PATCH 02/10] Refactor queue library configuration and documentation --- libs/queue/README.md | 59 ++++++++++++++++++++++++++++++++------------ 1 file changed, 43 insertions(+), 16 deletions(-) diff --git a/libs/queue/README.md b/libs/queue/README.md index 136c408..d766f4a 100644 --- a/libs/queue/README.md +++ b/libs/queue/README.md @@ -1,37 +1,64 @@ -Queue Library Documentation +## Queue Library Documentation + The Queue library is a module that provides a way to handle message queues in your application. It uses the BullMQ library as the default transport layer for sending and receiving messages, but you can also implement your own transport layer. Setup First, import the QueueModule in your application module: -Configuration +```ts +import { QueueModule } from 'libs/queue/src/lib/queue.module'; +``` + +# Configuration + The QueueModule accepts a configuration object of type IQueueModuleOptions from queue-config.interfaces.ts. This configuration object includes the queue configuration and transport layer. Here is an example of how to use the forRoot method to configure the QueueModule: +```ts +@Module({ + imports: [ + QueueModule.forRoot({ + config: { + queueName: 'myQueue', + // other BullMQ options... + }, + transport: new BullMQTransport(/* transport options... */), + }), + ], +}) +export class AppModule {} +``` + In this example, myQueue is the name of the queue, and BullMQTransport is the transport layer. You can replace BullMQTransport with your own transport layer if you want. -Queue Service +# Queue Library + +This library provides a way to handle message queues in your application. It uses the BullMQ library as the default transport layer for sending and receiving messages, but you can also implement your own transport layer. + +## Queue Service + The QueueService provides methods to interact with the queue. You can inject it into your services or controllers: Here are the methods provided by the QueueService: -connect(): Connects to the queue. -disconnect(): Disconnects from the queue. -sendMessage(queue: string, data: any): Sends a message to the specified queue. -receiveMessage(queue: string, callback: (data: any) => void): Receives a message from the specified queue and processes it with the provided callback function. -Queue Plugins +- `connect()`: Connects to the queue. +- `disconnect()`: Disconnects from the queue. +- `sendMessage(queue: string, data: any)`: Sends a message to the specified queue. +- `receiveMessage(queue: string, callback: (data: any) => void)`: Receives a message from the specified queue and processes it with the provided callback function. + +## Queue Plugins + You can create custom plugins to modify the behavior of the queue. A plugin must implement the QueuePlugin interface from queue-plugin.interface.ts. The DefaultQueuePluginService from default-queue-plugin.service.ts is an example of a queue plugin. -Transports -The transport layer is responsible for the actual sending and receiving of messages. The default transport is BullMQTransport from bull.transport.ts, but you can create your own by implementing the TransportInterface from transport.interface.ts. +## Transports -Testing -To run tests for the queue library, use the test script in the project.json file: +The transport layer is responsible for the actual sending and receiving of messages. The default transport is BullMQTransport from bull.transport.ts, but you can create your own by implementing the TransportInterface from transport.interface.ts. -This will run the Jest tests configured in jest.config.ts. +## Testing -Linting -To lint the queue library, use the lint script in the project.json file: +To run tests for the queue library, use the test script in the project.json file: -This will run ESLint with the configuration specified in .eslintrc.json. +```sh +npm run test +``` From f479402e6844659dd8955f08c6b03e8d48fdd45d Mon Sep 17 00:00:00 2001 From: Raghav Kattel Date: Thu, 25 Jan 2024 11:30:31 +0545 Subject: [PATCH 03/10] Add worker threads and database configuration to queue module --- apps/sample/src/app/app.module.ts | 4 +++- apps/sample/src/user/user.service.ts | 13 ++++++++++--- libs/queue/src/lib/queue.module.ts | 4 ++-- .../src/lib/transports/bull/bull.transport.ts | 17 +++++++++++------ 4 files changed, 26 insertions(+), 12 deletions(-) diff --git a/apps/sample/src/app/app.module.ts b/apps/sample/src/app/app.module.ts index 04ed453..00f0c9f 100644 --- a/apps/sample/src/app/app.module.ts +++ b/apps/sample/src/app/app.module.ts @@ -17,11 +17,13 @@ import { AppService } from './app.service'; QueueModule.forRoot({ config: { queueName: 'APP_TEST', - + useWorkerThreads: true, + connection: { host: 'localhost', port: 6379, password: 'raghav123', + db: 0, }, }, }), diff --git a/apps/sample/src/user/user.service.ts b/apps/sample/src/user/user.service.ts index 363455d..7da0b7a 100644 --- a/apps/sample/src/user/user.service.ts +++ b/apps/sample/src/user/user.service.ts @@ -2,6 +2,7 @@ import { Injectable } from '@nestjs/common'; import { PrismaService } from '@rumsan/prisma'; import { QueueService } from '@rumsan/queue'; import { AuthService, UserService } from '@rumsan/user'; +import { Job } from 'bullmq'; @Injectable() export class AppUserService extends UserService { @@ -15,10 +16,16 @@ export class AppUserService extends UserService { async Test(dto: any) { // console.log('queueService', { s: this.queueService }); - // this.queueService.sendMessage('APP_TEST', dto); - // this.queueService.sendMessage('USER_@', { message: 'success' }); - this.queueService.receiveMessage('APP_TEST', (data) => { + this.queueService.sendMessage('USER_TEST', { + message: 'success', + data: { + name: 'test', + }, + }); + this.queueService.receiveMessage('USER_TEST', (data: Job) => { console.log('data', data); + console.log(data.getState()); + return data; }); return { message: 'success' }; diff --git a/libs/queue/src/lib/queue.module.ts b/libs/queue/src/lib/queue.module.ts index 890bb0b..9ea8bb9 100644 --- a/libs/queue/src/lib/queue.module.ts +++ b/libs/queue/src/lib/queue.module.ts @@ -8,9 +8,9 @@ import { BullMQTransport } from './transports/bull'; @Module({}) export class QueueModule { static forRoot< - U = IQueueModuleOptions['config'], + U, //= IQueueModuleOptions['config'], V = IQueueModuleOptions['transport'], - >(rootConfig: IQueueModuleOptions) { + >(rootConfig: IQueueModuleOptions) { const Transport = rootConfig.transport || new BullMQTransport( diff --git a/libs/queue/src/lib/transports/bull/bull.transport.ts b/libs/queue/src/lib/transports/bull/bull.transport.ts index d525252..e9bcc33 100644 --- a/libs/queue/src/lib/transports/bull/bull.transport.ts +++ b/libs/queue/src/lib/transports/bull/bull.transport.ts @@ -27,22 +27,27 @@ export class BullMQTransport implements TransportInterface { async receiveMessage( queue: string, - callback: (data: unknown, j: unknown, k: unknown) => void, + callback: (data: unknown, job: unknown, worker: Worker) => void, ) { console.log('receiving message from bullmq', queue); + + this.queue. + const worker = new Worker( queue, - async (job) => { - // This is where you process your jobs - console.log('processing job', job.name, job.data); + async (job: any) => { + console.log('processing job', job.name, job.data, job); callback(job.data, job, worker); }, this.config as WorkerOptions, ); - // console.log('worker', worker); + + worker.on('failed', (job, err) => { + console.log(`Job ${job.id} failed with ${err.message}`); + }); worker.on('completed', (job) => { - console.log(`Job completed with result ${job.returnvalue}`); + console.log(`Job ${job.id} completed with result ${job.returnvalue}`); }); } From 036d9e63447656e07d55c5e3c5dd57e62fa72440 Mon Sep 17 00:00:00 2001 From: Raghav Kattel Date: Mon, 29 Jan 2024 16:39:54 +0545 Subject: [PATCH 04/10] Remove unused code related to queue service --- apps/sample/src/user/user.service.ts | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/apps/sample/src/user/user.service.ts b/apps/sample/src/user/user.service.ts index 7da0b7a..8216f42 100644 --- a/apps/sample/src/user/user.service.ts +++ b/apps/sample/src/user/user.service.ts @@ -2,7 +2,6 @@ import { Injectable } from '@nestjs/common'; import { PrismaService } from '@rumsan/prisma'; import { QueueService } from '@rumsan/queue'; import { AuthService, UserService } from '@rumsan/user'; -import { Job } from 'bullmq'; @Injectable() export class AppUserService extends UserService { @@ -22,11 +21,11 @@ export class AppUserService extends UserService { name: 'test', }, }); - this.queueService.receiveMessage('USER_TEST', (data: Job) => { - console.log('data', data); - console.log(data.getState()); - return data; - }); + // this.queueService.receiveMessage('USER_TEST', (data: Job) => { + // console.log('data', data); + // console.log(data.getState()); + // return data; + // }); return { message: 'success' }; } From db5d8e12b120e66ef0d7d0aa8702bf224abcbe3c Mon Sep 17 00:00:00 2001 From: Raghav Kattel Date: Mon, 29 Jan 2024 16:40:20 +0545 Subject: [PATCH 05/10] Update queue service method name --- apps/sample/src/user/user.service.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/sample/src/user/user.service.ts b/apps/sample/src/user/user.service.ts index 8216f42..0c399da 100644 --- a/apps/sample/src/user/user.service.ts +++ b/apps/sample/src/user/user.service.ts @@ -15,7 +15,7 @@ export class AppUserService extends UserService { async Test(dto: any) { // console.log('queueService', { s: this.queueService }); - this.queueService.sendMessage('USER_TEST', { + this.queueService.sendMessage('USER_Queue', { message: 'success', data: { name: 'test', From 6926c0cb946520fae593570468e972f20892014c Mon Sep 17 00:00:00 2001 From: Raghav Kattel Date: Tue, 30 Jan 2024 01:06:40 +0545 Subject: [PATCH 06/10] Update project.json and remove console logs in BullMQTransport --- libs/queue/project.json | 20 ++++++++++++++++++- .../src/lib/transports/bull/bull.transport.ts | 10 +--------- 2 files changed, 20 insertions(+), 10 deletions(-) diff --git a/libs/queue/project.json b/libs/queue/project.json index 5c58182..5c76923 100644 --- a/libs/queue/project.json +++ b/libs/queue/project.json @@ -4,9 +4,27 @@ "sourceRoot": "libs/queue/src", "projectType": "library", "targets": { + "build": { + "executor": "@nx/js:tsc", + "outputs": ["{options.outputPath}"], + "options": { + "outputPath": "dist/libs/queue", + "tsConfig": "libs/queue/tsconfig.lib.json", + "packageJson": "libs/queue/package.json", + "main": "libs/queue/src/index.ts", + "assets": ["libs/queue/*.md"] + } + }, + "publish": { + "command": "node tools/scripts/publish.mjs queue {args.ver} {args.tag}", + "dependsOn": ["build"] + }, "lint": { "executor": "@nx/eslint:lint", - "outputs": ["{options.outputFile}"] + "outputs": ["{options.outputFile}"], + "options": { + "lintFilePatterns": ["libs/queue/**/*.ts", "libs/queue/package.json"] + } }, "test": { "executor": "@nx/jest:jest", diff --git a/libs/queue/src/lib/transports/bull/bull.transport.ts b/libs/queue/src/lib/transports/bull/bull.transport.ts index e9bcc33..9144d7c 100644 --- a/libs/queue/src/lib/transports/bull/bull.transport.ts +++ b/libs/queue/src/lib/transports/bull/bull.transport.ts @@ -31,7 +31,7 @@ export class BullMQTransport implements TransportInterface { ) { console.log('receiving message from bullmq', queue); - this.queue. + // this.queue. const worker = new Worker( queue, @@ -41,14 +41,6 @@ export class BullMQTransport implements TransportInterface { }, this.config as WorkerOptions, ); - - worker.on('failed', (job, err) => { - console.log(`Job ${job.id} failed with ${err.message}`); - }); - - worker.on('completed', (job) => { - console.log(`Job ${job.id} completed with result ${job.returnvalue}`); - }); } async disconnect() { From a0a3f4946468a4d7fc7469703e7ce45d616133d8 Mon Sep 17 00:00:00 2001 From: Raghav Kattel Date: Tue, 30 Jan 2024 09:17:55 +0545 Subject: [PATCH 07/10] Add queue decorators and update user module --- apps/sample/src/app/app.module.ts | 30 +++--- apps/sample/src/user/user.module.ts | 14 ++- apps/sample/src/user/user.service.ts | 13 +-- .../listeners/on-error.decorator.ts | 5 + .../listeners/on-message.decorator.ts | 5 + libs/queue/src/lib/queue.module.ts | 59 +++++++++--- libs/queue/src/lib/queue.service.ts | 20 ++-- .../src/lib/transports/bull/bull.transport.ts | 94 ++++++++++++------- 8 files changed, 158 insertions(+), 82 deletions(-) create mode 100644 libs/queue/src/lib/decorators/listeners/on-error.decorator.ts create mode 100644 libs/queue/src/lib/decorators/listeners/on-message.decorator.ts diff --git a/apps/sample/src/app/app.module.ts b/apps/sample/src/app/app.module.ts index 00f0c9f..7ec74f0 100644 --- a/apps/sample/src/app/app.module.ts +++ b/apps/sample/src/app/app.module.ts @@ -2,7 +2,7 @@ import { Module } from '@nestjs/common'; // import { UserModule } from '../user/user.module'; import { PrismaModule } from '@rumsan/prisma'; -import { ConfigModule } from '@nestjs/config'; +import { ConfigModule, ConfigService } from '@nestjs/config'; import { EventEmitterModule } from '@nestjs/event-emitter'; import { QueueModule } from '@rumsan/queue'; import { RumsanUserModule, SignupModule } from '@rumsan/user'; @@ -15,17 +15,25 @@ import { AppService } from './app.service'; @Module({ imports: [ QueueModule.forRoot({ - config: { - queueName: 'APP_TEST', - useWorkerThreads: true, - - connection: { - host: 'localhost', - port: 6379, - password: 'raghav123', - db: 0, + imports: [ConfigModule], + global: true, + useFactory: async (configService: ConfigService) => ({ + config: { + connection: { + name: 'default', + host: configService.get('REDIS_HOST'), + port: +configService.get('REDIS_PORT'), + password: configService.get('REDIS_PASSWORD'), + retryStrategy: (times) => { + // reconnect after + return Math.min(times * 50, 2000); + }, + // might need to change on producttion + maxRetriesPerRequest: 1000, + }, }, - }, + }), + inject: [ConfigService], }), ConfigModule.forRoot({ isGlobal: true }), EventEmitterModule.forRoot({ maxListeners: 10, ignoreErrors: false }), diff --git a/apps/sample/src/user/user.module.ts b/apps/sample/src/user/user.module.ts index fa19c92..3ee1cbe 100644 --- a/apps/sample/src/user/user.module.ts +++ b/apps/sample/src/user/user.module.ts @@ -1,15 +1,21 @@ import { Module } from '@nestjs/common'; import { PrismaModule } from '@rumsan/prisma'; -import { QueueModule } from '@rumsan/queue'; +import { QueueModule } from '@rumsan/queue'; // Ensure correct import path import { AuthModule } from '@rumsan/user'; import { AppUserController } from './user.controller'; import { UserProcessor } from './user.processor'; import { AppUserService } from './user.service'; @Module({ - imports: [AuthModule, PrismaModule], + imports: [ + AuthModule, + PrismaModule, + // QueueModule, + QueueModule.registerQueue({ + name: 'QUEUE_TEST', + }), + ], controllers: [AppUserController], - - providers: [UserProcessor, AppUserService, QueueModule], + providers: [UserProcessor, AppUserService], }) export class UserModule {} diff --git a/apps/sample/src/user/user.service.ts b/apps/sample/src/user/user.service.ts index 0c399da..110238e 100644 --- a/apps/sample/src/user/user.service.ts +++ b/apps/sample/src/user/user.service.ts @@ -2,6 +2,7 @@ import { Injectable } from '@nestjs/common'; import { PrismaService } from '@rumsan/prisma'; import { QueueService } from '@rumsan/queue'; import { AuthService, UserService } from '@rumsan/user'; +import { Job } from 'bullmq'; @Injectable() export class AppUserService extends UserService { @@ -15,17 +16,17 @@ export class AppUserService extends UserService { async Test(dto: any) { // console.log('queueService', { s: this.queueService }); - this.queueService.sendMessage('USER_Queue', { + this.queueService.sendMessage('QUEUE_TEST', { message: 'success', data: { name: 'test', }, }); - // this.queueService.receiveMessage('USER_TEST', (data: Job) => { - // console.log('data', data); - // console.log(data.getState()); - // return data; - // }); + this.queueService.receiveMessage('QUEUE_TEST', (data: Job) => { + console.log('data', data); + console.log(data.getState()); + return data; + }); return { message: 'success' }; } diff --git a/libs/queue/src/lib/decorators/listeners/on-error.decorator.ts b/libs/queue/src/lib/decorators/listeners/on-error.decorator.ts new file mode 100644 index 0000000..e6e1320 --- /dev/null +++ b/libs/queue/src/lib/decorators/listeners/on-error.decorator.ts @@ -0,0 +1,5 @@ +// decorators/on-error.decorator.ts +import { SetMetadata } from '@nestjs/common'; + +export const OnError = (): MethodDecorator => + SetMetadata('isErrorListener', true); diff --git a/libs/queue/src/lib/decorators/listeners/on-message.decorator.ts b/libs/queue/src/lib/decorators/listeners/on-message.decorator.ts new file mode 100644 index 0000000..4b8dc89 --- /dev/null +++ b/libs/queue/src/lib/decorators/listeners/on-message.decorator.ts @@ -0,0 +1,5 @@ +// decorators/on-message.decorator.ts +import { SetMetadata } from '@nestjs/common'; + +export const OnMessage = (): MethodDecorator => + SetMetadata('isMessageListener', true); diff --git a/libs/queue/src/lib/queue.module.ts b/libs/queue/src/lib/queue.module.ts index 9ea8bb9..78fb9a7 100644 --- a/libs/queue/src/lib/queue.module.ts +++ b/libs/queue/src/lib/queue.module.ts @@ -1,32 +1,61 @@ -import { Global, Module } from '@nestjs/common'; -import { QueueOptions } from 'bullmq'; -import { IQueueModuleOptions } from './interface/queue-config.interfaces'; +// queue.module.ts + +import { DynamicModule, Global, Module } from '@nestjs/common'; import { QueueService } from './queue.service'; -import { BullMQTransport } from './transports/bull'; +import { BullMqService } from './transports/bull'; @Global() @Module({}) export class QueueModule { - static forRoot< - U, //= IQueueModuleOptions['config'], - V = IQueueModuleOptions['transport'], - >(rootConfig: IQueueModuleOptions) { - const Transport = - rootConfig.transport || - new BullMQTransport( - rootConfig.config as { queueName: string } & WorkerOptions & - QueueOptions, - ); + private static transport: any = BullMqService; + private static options: any = {}; + + static forRoot(options: any): DynamicModule { + const Transport = options.transport || QueueModule.transport; + return { module: QueueModule, + imports: [...options.imports], + global: options.isGlobal, providers: [ { provide: 'TRANSPORT', - useFactory: () => Transport, + useFactory: async (...args: any[]) => { + const config = await options.useFactory(...args); + this.options = config.config; + this.transport = Transport; + }, + inject: options.inject, }, QueueService, + ...(options.providers || []), ], exports: ['TRANSPORT', QueueService], }; } + + static registerQueue(options: Record): DynamicModule { + console.log('this.options', this.options); + return { + module: QueueModule, + providers: [ + { + provide: 'TRANSPORT', + useFactory(...args) { + console.log('args', args); + return new QueueModule.transport({ + ...options, + ...QueueModule.options, + }); + }, + // useValue: new QueueModule.transport({ + // ...options, + // ...QueueModule.options, + // }), + }, + QueueService, + ], + exports: [QueueService], + }; + } } diff --git a/libs/queue/src/lib/queue.service.ts b/libs/queue/src/lib/queue.service.ts index bb0f614..a540af5 100644 --- a/libs/queue/src/lib/queue.service.ts +++ b/libs/queue/src/lib/queue.service.ts @@ -1,30 +1,26 @@ // queue.service.ts import { Inject, Injectable } from '@nestjs/common'; -import { IQueueModuleOptions } from './interface/queue-config.interfaces'; -import { TransportInterface } from './interface/transport.interface'; @Injectable() export class QueueService { - private transport: TransportInterface; - constructor( @Inject('TRANSPORT') - private readonly transportV: IQueueModuleOptions['transport'], + private readonly transport: any, ) { - this.transport = this.transportV as TransportInterface; + console.log('transport', transport); this.initializeTransport(); } - setTransport(transport: TransportInterface) { - this.transport = transport; - } + // Uncomment the following lines if you want to allow changing the transport + // setTransport(transport: TransportInterface) { + // this.transport = transport; + // } async connect() { if (!this.transport) { this.initializeTransport(); } - await this.transport.connect(); } async sendMessage(queue: string, data: any) { @@ -48,7 +44,5 @@ export class QueueService { await this.transport.disconnect(); } - private async initializeTransport() { - await this.transport.connect(); - } + private async initializeTransport() {} } diff --git a/libs/queue/src/lib/transports/bull/bull.transport.ts b/libs/queue/src/lib/transports/bull/bull.transport.ts index 9144d7c..cbf36a4 100644 --- a/libs/queue/src/lib/transports/bull/bull.transport.ts +++ b/libs/queue/src/lib/transports/bull/bull.transport.ts @@ -1,50 +1,78 @@ -import { Queue, Worker, WorkerOptions } from 'bullmq'; -import { IQueueModuleOptions } from '../../interface/queue-config.interfaces'; +import { Injectable } from '@nestjs/common'; +import { Queue, Worker } from 'bullmq'; import { TransportInterface } from '../../interface/transport.interface'; -export class BullMQTransport implements TransportInterface { - private queue: Queue; - private worker: Worker; +@Injectable() +export class BullMqService implements TransportInterface { + private name: string = 'QUEUE_TEST'; + private queues: Record = {}; + private workers: Record = {}; - constructor(private config: IQueueModuleOptions['config']) {} + constructor(private readonly config: any) { + this.connect(); + if (this.config.name) { + this.name = this.config.name; + } + console.log('BullMqService -> constructor -> config', config); + } + + async connect(): Promise { + // Connect or perform any necessary setup + const queue = new Queue(this.name, { + // connection: this.config.connection, + connection: { + host: 'localhost', + port: 6379, + password: 'raghav123', + }, + }); + this.queues[this.name] = queue; - async connect() { - this.queue = new Queue(this.config.queueName, this.config); - this.worker = new Worker( - this.config.queueName as string, + const worker = new Worker( + this.name, async (job) => { - // This is where you process your jobs - console.log('processing job', job.name, job.data); + console.log('job', job); + }, + { + connection: { + host: 'localhost', + port: 6379, + password: 'raghav123', + }, }, - this.config as WorkerOptions, ); + this.workers[this.name] = worker; } - async sendMessage(queue: string, data: unknown) { - console.log('sending message to bullmq', queue, data); - await this.queue.add(queue, data); + async sendMessage(name: string, data: any): Promise { + if (!this.queues[name]) { + throw new Error(`Queue "${name}" not found.`); + } + + await this.queues[name].add(name, data); } async receiveMessage( - queue: string, - callback: (data: unknown, job: unknown, worker: Worker) => void, - ) { - console.log('receiving message from bullmq', queue); + name: string, + callback: (data: any) => void, + ): Promise { + if (!this.queues[name]) { + throw new Error(`Queue "${name}" not found.`); + } - // this.queue. - - const worker = new Worker( - queue, - async (job: any) => { - console.log('processing job', job.name, job.data, job); - callback(job.data, job, worker); - }, - this.config as WorkerOptions, - ); + if (!this.workers[name]) { + this.workers[name] = new Worker(name, async (job) => { + callback(job.data); + }); + } } - async disconnect() { - await this.queue.close(); - await this.worker.close(); + async disconnect(): Promise { + // Disconnect or perform any necessary cleanup + await Promise.all( + Object.keys(this.queues).map(async (name) => { + await this.queues[name].close(); + }), + ); } } From 01ac2590027d832c11ef37c7ac8183305b29f97e Mon Sep 17 00:00:00 2001 From: Raghav Kattel Date: Tue, 30 Jan 2024 09:21:13 +0545 Subject: [PATCH 08/10] Add on-message decorator and bull transport --- apps/sample/src/user/user.service.ts | 1 + libs/queue/src/index.ts | 2 ++ 2 files changed, 3 insertions(+) diff --git a/apps/sample/src/user/user.service.ts b/apps/sample/src/user/user.service.ts index 110238e..c0cdc4d 100644 --- a/apps/sample/src/user/user.service.ts +++ b/apps/sample/src/user/user.service.ts @@ -14,6 +14,7 @@ export class AppUserService extends UserService { super(prisma, authService); } + // @OnMessage() async Test(dto: any) { // console.log('queueService', { s: this.queueService }); this.queueService.sendMessage('QUEUE_TEST', { diff --git a/libs/queue/src/index.ts b/libs/queue/src/index.ts index caccd1a..3afe637 100644 --- a/libs/queue/src/index.ts +++ b/libs/queue/src/index.ts @@ -1,3 +1,5 @@ +export * from './lib/decorators/listeners/on-message.decorator'; export * from './lib/plugins'; export * from './lib/queue.module'; export * from './lib/queue.service'; +export * from './lib/transports/bull'; From c6818ab2bfbf8922673ce932e2db5a811086ab77 Mon Sep 17 00:00:00 2001 From: Raghav Kattel Date: Tue, 30 Jan 2024 09:54:48 +0545 Subject: [PATCH 09/10] Refactor Types --- apps/sample/src/app/app.module.ts | 22 ++++++------- .../lib/interface/queue-config.interfaces.ts | 32 +++++++++++++------ libs/queue/src/lib/queue.module.ts | 18 ++++++++--- 3 files changed, 45 insertions(+), 27 deletions(-) diff --git a/apps/sample/src/app/app.module.ts b/apps/sample/src/app/app.module.ts index 7ec74f0..0bf1fad 100644 --- a/apps/sample/src/app/app.module.ts +++ b/apps/sample/src/app/app.module.ts @@ -18,19 +18,17 @@ import { AppService } from './app.service'; imports: [ConfigModule], global: true, useFactory: async (configService: ConfigService) => ({ - config: { - connection: { - name: 'default', - host: configService.get('REDIS_HOST'), - port: +configService.get('REDIS_PORT'), - password: configService.get('REDIS_PASSWORD'), - retryStrategy: (times) => { - // reconnect after - return Math.min(times * 50, 2000); - }, - // might need to change on producttion - maxRetriesPerRequest: 1000, + connection: { + name: 'default', + host: configService.get('REDIS_HOST'), + port: +configService.get('REDIS_PORT'), + password: configService.get('REDIS_PASSWORD'), + retryStrategy: (times) => { + // reconnect after + return Math.min(times * 50, 2000); }, + // might need to change on producttion + maxRetriesPerRequest: 1000, }, }), inject: [ConfigService], diff --git a/libs/queue/src/lib/interface/queue-config.interfaces.ts b/libs/queue/src/lib/interface/queue-config.interfaces.ts index a5f64c0..d7a91f3 100644 --- a/libs/queue/src/lib/interface/queue-config.interfaces.ts +++ b/libs/queue/src/lib/interface/queue-config.interfaces.ts @@ -1,13 +1,25 @@ -import { QueueOptions, WorkerOptions } from 'bullmq'; -import { TransportInterface } from './transport.interface'; +import { + DynamicModule, + ForwardReference, + InjectionToken, + OptionalFactoryDependency, + Provider, + Type, +} from '@nestjs/common'; -export interface IQueueModuleOptions< - U extends { queueName: string } & WorkerOptions & QueueOptions = { - queueName: string; - } & WorkerOptions & - QueueOptions, - V extends TransportInterface = TransportInterface, -> { - config: U; +// export interface IQueueConfig { +// config: U; +// } + +export interface IQueueModuleOptions { + imports?: Array< + Type | DynamicModule | Promise | ForwardReference + >; + global: boolean; + providers?: Provider[]; + exports?: Provider[]; transport?: V; + useFactory?: (...args: any[]) => U | Promise; + inject?: Array; + config?: 'useFactory' extends keyof this ? U | undefined : U; // <-- make config optional when useFactory is defined } diff --git a/libs/queue/src/lib/queue.module.ts b/libs/queue/src/lib/queue.module.ts index 78fb9a7..bc081ca 100644 --- a/libs/queue/src/lib/queue.module.ts +++ b/libs/queue/src/lib/queue.module.ts @@ -1,6 +1,9 @@ // queue.module.ts import { DynamicModule, Global, Module } from '@nestjs/common'; +import { QueueOptions } from 'bull'; +import { IQueueModuleOptions } from './interface/queue-config.interfaces'; +import { TransportInterface } from './interface/transport.interface'; import { QueueService } from './queue.service'; import { BullMqService } from './transports/bull'; @@ -10,19 +13,24 @@ export class QueueModule { private static transport: any = BullMqService; private static options: any = {}; - static forRoot(options: any): DynamicModule { + static forRoot( + options: IQueueModuleOptions, + ): DynamicModule { const Transport = options.transport || QueueModule.transport; return { module: QueueModule, - imports: [...options.imports], - global: options.isGlobal, + imports: options.imports || [], + global: options.global, providers: [ { provide: 'TRANSPORT', useFactory: async (...args: any[]) => { - const config = await options.useFactory(...args); - this.options = config.config; + const config = options.useFactory + ? await options?.useFactory(...args) + : options.config; + this.options = config; + this.transport = Transport; }, inject: options.inject, From f3db218a1b7c0e560f660c08c0329ad51e132207 Mon Sep 17 00:00:00 2001 From: Raghav Kattel Date: Wed, 31 Jan 2024 11:53:24 +0545 Subject: [PATCH 10/10] Remove unused code related to queue and decorators --- apps/sample/src/app/app.controller.ts | 12 +++---- apps/sample/src/app/app.module.ts | 14 ++++---- apps/sample/src/app/app.service.ts | 3 +- apps/sample/src/user/user.module.ts | 8 ++--- apps/sample/src/user/user.processor.ts | 15 ++++---- apps/sample/src/user/user.service.ts | 17 +++++---- libs/queue/src/index.ts | 2 -- .../listeners/on-error.decorator.ts | 5 --- .../listeners/on-message.decorator.ts | 5 --- .../src/lib/interface/broker.interface.ts | 9 ----- .../plugins/default-queue-plugin.service.ts | 14 -------- libs/queue/src/lib/plugins/index.ts | 2 -- .../src/lib/plugins/queue-plugin.interface.ts | 4 --- libs/queue/src/lib/queue.module.ts | 35 +++++-------------- libs/queue/src/lib/queue.service.ts | 29 +++++---------- .../src/lib/transports/bull/bull.transport.ts | 12 ++----- 16 files changed, 52 insertions(+), 134 deletions(-) delete mode 100644 libs/queue/src/lib/decorators/listeners/on-error.decorator.ts delete mode 100644 libs/queue/src/lib/decorators/listeners/on-message.decorator.ts delete mode 100644 libs/queue/src/lib/interface/broker.interface.ts delete mode 100644 libs/queue/src/lib/plugins/default-queue-plugin.service.ts delete mode 100644 libs/queue/src/lib/plugins/index.ts delete mode 100644 libs/queue/src/lib/plugins/queue-plugin.interface.ts diff --git a/apps/sample/src/app/app.controller.ts b/apps/sample/src/app/app.controller.ts index 108373f..8543a08 100644 --- a/apps/sample/src/app/app.controller.ts +++ b/apps/sample/src/app/app.controller.ts @@ -1,18 +1,16 @@ -import { Controller, Get, UseGuards } from '@nestjs/common'; +import { Controller, Get } from '@nestjs/common'; +import { ApiTags } from '@nestjs/swagger'; import { AppService } from './app.service'; -import { AbilitiesGuard, CheckAbilities, JwtGuard } from '@rumsan/user'; -import { ACTIONS, APP, SUBJECTS } from '../constants'; -import { ApiBearerAuth, ApiTags } from '@nestjs/swagger'; @Controller('app') @ApiTags('App') -@ApiBearerAuth(APP.JWT_BEARER) +//@ApiBearerAuth(APP.JWT_BEARER) export class AppController { constructor(private readonly appService: AppService) {} - @CheckAbilities({ action: ACTIONS.READ, subject: SUBJECTS.USER }) - @UseGuards(JwtGuard, AbilitiesGuard) + // @CheckAbilities({ action: ACTIONS.READ, subject: SUBJECTS.USER }) + // @UseGuards(JwtGuard, AbilitiesGuard) @Get() getData() { return this.appService.getData(); diff --git a/apps/sample/src/app/app.module.ts b/apps/sample/src/app/app.module.ts index 0bf1fad..2151250 100644 --- a/apps/sample/src/app/app.module.ts +++ b/apps/sample/src/app/app.module.ts @@ -16,19 +16,17 @@ import { AppService } from './app.service'; imports: [ QueueModule.forRoot({ imports: [ConfigModule], - global: true, useFactory: async (configService: ConfigService) => ({ connection: { - name: 'default', host: configService.get('REDIS_HOST'), port: +configService.get('REDIS_PORT'), password: configService.get('REDIS_PASSWORD'), - retryStrategy: (times) => { - // reconnect after - return Math.min(times * 50, 2000); - }, - // might need to change on producttion - maxRetriesPerRequest: 1000, + // retryStrategy: (times) => { + // // reconnect after + // return Math.min(times * 50, 2000); + // }, + // // might need to change on producttion + // maxRetriesPerRequest: 1000, }, }), inject: [ConfigService], diff --git a/apps/sample/src/app/app.service.ts b/apps/sample/src/app/app.service.ts index 01a14b2..def1158 100644 --- a/apps/sample/src/app/app.service.ts +++ b/apps/sample/src/app/app.service.ts @@ -1,11 +1,12 @@ -import { PrismaService } from '@rumsan/prisma'; import { Injectable } from '@nestjs/common'; +import { PrismaService } from '@rumsan/prisma'; @Injectable() export class AppService { constructor(private prisma: PrismaService) {} async getData() { const d = await this.prisma.user.findMany(); + // this.queue.sendMessage('APP_QUEUE', { message: 'Hello Santosh', data: d }); return { message: 'Hello API', data: d }; } } diff --git a/apps/sample/src/user/user.module.ts b/apps/sample/src/user/user.module.ts index 3ee1cbe..5462f9d 100644 --- a/apps/sample/src/user/user.module.ts +++ b/apps/sample/src/user/user.module.ts @@ -1,6 +1,6 @@ import { Module } from '@nestjs/common'; import { PrismaModule } from '@rumsan/prisma'; -import { QueueModule } from '@rumsan/queue'; // Ensure correct import path +import { QueueModule, QueueService } from '@rumsan/queue'; import { AuthModule } from '@rumsan/user'; import { AppUserController } from './user.controller'; import { UserProcessor } from './user.processor'; @@ -10,12 +10,12 @@ import { AppUserService } from './user.service'; imports: [ AuthModule, PrismaModule, - // QueueModule, + //QueueModule, QueueModule.registerQueue({ - name: 'QUEUE_TEST', + name: 'APP_QUEUE', }), ], controllers: [AppUserController], - providers: [UserProcessor, AppUserService], + providers: [UserProcessor, AppUserService, QueueService], }) export class UserModule {} diff --git a/apps/sample/src/user/user.processor.ts b/apps/sample/src/user/user.processor.ts index b2b0b7a..51e9013 100644 --- a/apps/sample/src/user/user.processor.ts +++ b/apps/sample/src/user/user.processor.ts @@ -1,15 +1,14 @@ -import { Inject, Injectable, Logger } from '@nestjs/common'; -import { QueueService } from '@rumsan/queue'; +import { Injectable, Logger } from '@nestjs/common'; @Injectable() export class UserProcessor { private readonly _logger = new Logger('USER_TEST'); - constructor(@Inject('TRANSPORT') private readonly queue: QueueService) {} + // constructor(@Inject('TRANSPORT') private readonly queue: QueueService) {} - public async sendOTP(d): Promise { - this.queue.receiveMessage('USER_TEST', (data) => { - console.log('data', data); - }); - } + // public async sendOTP(d): Promise { + // this.queue.receiveMessage('USER_TEST', (data) => { + // console.log('data', data); + // }); + // } } diff --git a/apps/sample/src/user/user.service.ts b/apps/sample/src/user/user.service.ts index c0cdc4d..6a38d2d 100644 --- a/apps/sample/src/user/user.service.ts +++ b/apps/sample/src/user/user.service.ts @@ -2,14 +2,13 @@ import { Injectable } from '@nestjs/common'; import { PrismaService } from '@rumsan/prisma'; import { QueueService } from '@rumsan/queue'; import { AuthService, UserService } from '@rumsan/user'; -import { Job } from 'bullmq'; @Injectable() export class AppUserService extends UserService { constructor( - private readonly queueService: QueueService, protected prisma: PrismaService, public authService: AuthService, + private queueService: QueueService, ) { super(prisma, authService); } @@ -17,17 +16,17 @@ export class AppUserService extends UserService { // @OnMessage() async Test(dto: any) { // console.log('queueService', { s: this.queueService }); - this.queueService.sendMessage('QUEUE_TEST', { - message: 'success', + this.queueService.sendMessage('APP_QUEUE', { + message: 'another santosh', data: { name: 'test', }, }); - this.queueService.receiveMessage('QUEUE_TEST', (data: Job) => { - console.log('data', data); - console.log(data.getState()); - return data; - }); + // this.queueService.receiveMessage('QUEUE_TEST', (data: Job) => { + // console.log('data', data); + // console.log(data.getState()); + // return data; + // }); return { message: 'success' }; } diff --git a/libs/queue/src/index.ts b/libs/queue/src/index.ts index 3afe637..2e4cc44 100644 --- a/libs/queue/src/index.ts +++ b/libs/queue/src/index.ts @@ -1,5 +1,3 @@ -export * from './lib/decorators/listeners/on-message.decorator'; -export * from './lib/plugins'; export * from './lib/queue.module'; export * from './lib/queue.service'; export * from './lib/transports/bull'; diff --git a/libs/queue/src/lib/decorators/listeners/on-error.decorator.ts b/libs/queue/src/lib/decorators/listeners/on-error.decorator.ts deleted file mode 100644 index e6e1320..0000000 --- a/libs/queue/src/lib/decorators/listeners/on-error.decorator.ts +++ /dev/null @@ -1,5 +0,0 @@ -// decorators/on-error.decorator.ts -import { SetMetadata } from '@nestjs/common'; - -export const OnError = (): MethodDecorator => - SetMetadata('isErrorListener', true); diff --git a/libs/queue/src/lib/decorators/listeners/on-message.decorator.ts b/libs/queue/src/lib/decorators/listeners/on-message.decorator.ts deleted file mode 100644 index 4b8dc89..0000000 --- a/libs/queue/src/lib/decorators/listeners/on-message.decorator.ts +++ /dev/null @@ -1,5 +0,0 @@ -// decorators/on-message.decorator.ts -import { SetMetadata } from '@nestjs/common'; - -export const OnMessage = (): MethodDecorator => - SetMetadata('isMessageListener', true); diff --git a/libs/queue/src/lib/interface/broker.interface.ts b/libs/queue/src/lib/interface/broker.interface.ts deleted file mode 100644 index 9537c70..0000000 --- a/libs/queue/src/lib/interface/broker.interface.ts +++ /dev/null @@ -1,9 +0,0 @@ -export interface Broker { - publish(queueName: string, message: string): Promise; - consume( - queueName: string, - callback: (message: string) => void, - ): Promise; - - add(queueName: string, message: string): Promise; -} diff --git a/libs/queue/src/lib/plugins/default-queue-plugin.service.ts b/libs/queue/src/lib/plugins/default-queue-plugin.service.ts deleted file mode 100644 index b0b5404..0000000 --- a/libs/queue/src/lib/plugins/default-queue-plugin.service.ts +++ /dev/null @@ -1,14 +0,0 @@ -import { Injectable } from '@nestjs/common'; -import { QueuePlugin } from './queue-plugin.interface'; - -@Injectable() -export class DefaultQueuePluginService implements QueuePlugin { - beforeEnqueue(item: T): T { - // You can customize the behavior before enqueueing - return item; - } - - afterDequeue(item: T): void { - // You can customize the behavior after dequeuing - } -} diff --git a/libs/queue/src/lib/plugins/index.ts b/libs/queue/src/lib/plugins/index.ts deleted file mode 100644 index b340090..0000000 --- a/libs/queue/src/lib/plugins/index.ts +++ /dev/null @@ -1,2 +0,0 @@ -export * from './default-queue-plugin.service'; -export * from './queue-plugin.interface'; diff --git a/libs/queue/src/lib/plugins/queue-plugin.interface.ts b/libs/queue/src/lib/plugins/queue-plugin.interface.ts deleted file mode 100644 index 9aa8273..0000000 --- a/libs/queue/src/lib/plugins/queue-plugin.interface.ts +++ /dev/null @@ -1,4 +0,0 @@ -export interface QueuePlugin { - beforeEnqueue(item: T): T; - afterDequeue(item: T): void; -} diff --git a/libs/queue/src/lib/queue.module.ts b/libs/queue/src/lib/queue.module.ts index bc081ca..727f670 100644 --- a/libs/queue/src/lib/queue.module.ts +++ b/libs/queue/src/lib/queue.module.ts @@ -1,9 +1,6 @@ // queue.module.ts import { DynamicModule, Global, Module } from '@nestjs/common'; -import { QueueOptions } from 'bull'; -import { IQueueModuleOptions } from './interface/queue-config.interfaces'; -import { TransportInterface } from './interface/transport.interface'; import { QueueService } from './queue.service'; import { BullMqService } from './transports/bull'; @@ -11,59 +8,43 @@ import { BullMqService } from './transports/bull'; @Module({}) export class QueueModule { private static transport: any = BullMqService; - private static options: any = {}; - - static forRoot( - options: IQueueModuleOptions, - ): DynamicModule { - const Transport = options.transport || QueueModule.transport; + static forRoot(options: any): DynamicModule { return { module: QueueModule, - imports: options.imports || [], - global: options.global, providers: [ { - provide: 'TRANSPORT', + provide: 'QUEUE_CONFIG', useFactory: async (...args: any[]) => { const config = options.useFactory ? await options?.useFactory(...args) : options.config; - this.options = config; - - this.transport = Transport; + return config; }, inject: options.inject, }, - QueueService, - ...(options.providers || []), ], - exports: ['TRANSPORT', QueueService], + exports: ['QUEUE_CONFIG'], }; } static registerQueue(options: Record): DynamicModule { - console.log('this.options', this.options); return { module: QueueModule, providers: [ { provide: 'TRANSPORT', - useFactory(...args) { - console.log('args', args); + useFactory: (config) => { return new QueueModule.transport({ ...options, - ...QueueModule.options, + ...config, }); }, - // useValue: new QueueModule.transport({ - // ...options, - // ...QueueModule.options, - // }), + inject: ['QUEUE_CONFIG'], }, QueueService, ], - exports: [QueueService], + exports: ['TRANSPORT', QueueService], }; } } diff --git a/libs/queue/src/lib/queue.service.ts b/libs/queue/src/lib/queue.service.ts index a540af5..0ec19a1 100644 --- a/libs/queue/src/lib/queue.service.ts +++ b/libs/queue/src/lib/queue.service.ts @@ -1,48 +1,37 @@ // queue.service.ts -import { Inject, Injectable } from '@nestjs/common'; +import { Inject, Injectable, OnModuleInit } from '@nestjs/common'; @Injectable() -export class QueueService { +export class QueueService implements OnModuleInit { constructor( @Inject('TRANSPORT') private readonly transport: any, ) { - console.log('transport', transport); - this.initializeTransport(); + console.log('transport', this.transport); } - // Uncomment the following lines if you want to allow changing the transport - // setTransport(transport: TransportInterface) { - // this.transport = transport; - // } + onModuleInit() { + console.log('transport', this.transport); + this.connect(); + } async connect() { if (!this.transport) { - this.initializeTransport(); + throw new Error('Transport is not defined'); } + await this.transport.connect(); } async sendMessage(queue: string, data: any) { - if (!this.transport) { - this.initializeTransport(); - } await this.transport.sendMessage(queue, data); } async receiveMessage(queue: string, callback: (data: any) => void) { - if (!this.transport) { - this.initializeTransport(); - } return this.transport.receiveMessage(queue, callback); } async disconnect() { - if (!this.transport) { - this.initializeTransport(); - } await this.transport.disconnect(); } - - private async initializeTransport() {} } diff --git a/libs/queue/src/lib/transports/bull/bull.transport.ts b/libs/queue/src/lib/transports/bull/bull.transport.ts index cbf36a4..0a5e03e 100644 --- a/libs/queue/src/lib/transports/bull/bull.transport.ts +++ b/libs/queue/src/lib/transports/bull/bull.transport.ts @@ -4,27 +4,21 @@ import { TransportInterface } from '../../interface/transport.interface'; @Injectable() export class BullMqService implements TransportInterface { - private name: string = 'QUEUE_TEST'; + private name: string; private queues: Record = {}; private workers: Record = {}; constructor(private readonly config: any) { - this.connect(); + console.log('config', config); if (this.config.name) { this.name = this.config.name; } - console.log('BullMqService -> constructor -> config', config); } async connect(): Promise { // Connect or perform any necessary setup const queue = new Queue(this.name, { - // connection: this.config.connection, - connection: { - host: 'localhost', - port: 6379, - password: 'raghav123', - }, + connection: this.config.connection, }); this.queues[this.name] = queue;