From befb5c16b611841ed201b743d078d24fc27d81a2 Mon Sep 17 00:00:00 2001 From: abhaykalshetti-gif Date: Mon, 25 May 2026 14:36:39 +0530 Subject: [PATCH 1/6] Added changes for new AcademicYear --- src/academicyears/academicyears.service.ts | 5 +- .../cohortAcademicYear.module.ts | 2 + .../cohortAcademicYear.service.ts | 19 +++++++- src/cohortMembers/cohortMembers.service.ts | 20 +++++++- src/kafka/kafka.service.ts | 46 +++++++++++++++++++ 5 files changed, 88 insertions(+), 4 deletions(-) diff --git a/src/academicyears/academicyears.service.ts b/src/academicyears/academicyears.service.ts index 87f4e009..84a2196b 100644 --- a/src/academicyears/academicyears.service.ts +++ b/src/academicyears/academicyears.service.ts @@ -66,7 +66,10 @@ export class AcademicYearService { if (getCurrentActiveYear) { const updateStatus = await this.academicYearRespository.update( { id: getCurrentActiveYear.id }, - { isActive: false } + { + isActive: false, + updatedBy: academicYearDto.updatedBy + } ); } //save record diff --git a/src/cohortAcademicYear/cohortAcademicYear.module.ts b/src/cohortAcademicYear/cohortAcademicYear.module.ts index 114c6350..dafd3cfb 100644 --- a/src/cohortAcademicYear/cohortAcademicYear.module.ts +++ b/src/cohortAcademicYear/cohortAcademicYear.module.ts @@ -7,11 +7,13 @@ import { Cohort } from "src/cohort/entities/cohort.entity"; import { AcademicYear } from "src/academicyears/entities/academicyears-entity"; import { Tenants } from "src/userTenantMapping/entities/tenant.entity"; import { AcademicyearsModule } from "src/academicyears/academicyears.module"; +import { KafkaModule } from "src/kafka/kafka.module"; @Module({ imports: [ AcademicyearsModule, + KafkaModule, TypeOrmModule.forFeature([ CohortAcademicYear, Cohort, diff --git a/src/cohortAcademicYear/cohortAcademicYear.service.ts b/src/cohortAcademicYear/cohortAcademicYear.service.ts index 385be1de..4d6032cb 100644 --- a/src/cohortAcademicYear/cohortAcademicYear.service.ts +++ b/src/cohortAcademicYear/cohortAcademicYear.service.ts @@ -6,6 +6,8 @@ import { Request, Response } from "express"; import { CohortAcademicYearDto } from "src/cohortAcademicYear/dto/cohort-academicyear.dto"; import { APIID } from "@utils/api-id.config"; import APIResponse from "src/common/responses/response"; +import { KafkaService } from "src/kafka/kafka.service"; +import { LoggerUtil } from "src/common/logger/LoggerUtil"; import { API_RESPONSES } from "@utils/response.messages"; import { AcademicYearService } from "src/academicyears/academicyears.service"; import { Cohort } from "src/cohort/entities/cohort.entity"; @@ -19,6 +21,7 @@ export class CohortAcademicYearService { private readonly cohortRepository: Repository, @InjectRepository(CohortAcademicYear) private readonly cohortAcademicYearRepository: Repository, + private readonly kafkaService: KafkaService, ) { } async createCohortAcademicYear(tenantId: string, request: Request, cohortAcademicYearDto: CohortAcademicYearDto, response: Response) { @@ -68,13 +71,27 @@ export class CohortAcademicYearService { const createdAcademicYear = await this.insertCohortAcademicYear(cohortAcademicYearDto.cohortId, cohortAcademicYearDto.academicYearId, cohortAcademicYearDto.createdBy, cohortAcademicYearDto.updatedBy); if (createdAcademicYear) { - return APIResponse.success( + const apiResponse = APIResponse.success( response, apiId, createdAcademicYear, HttpStatus.OK, API_RESPONSES.ADD_COHORT_TO_ACADEMIC_YEAR ); + + const enrichedData = { + ...createdAcademicYear, + tenantId, + }; + // Publish cohort academic year created event to Kafka asynchronously - after response is sent to client + this.kafkaService.publishCohortAcademicYearEvent('created', enrichedData, enrichedData.cohortAcademicYearId) + .catch(error => LoggerUtil.error( + `Failed to publish cohort academic year created event to Kafka`, + `Error: ${error.message}`, + apiId + )); + + return apiResponse; } } catch (error) { diff --git a/src/cohortMembers/cohortMembers.service.ts b/src/cohortMembers/cohortMembers.service.ts index f7bc70d9..261513dc 100644 --- a/src/cohortMembers/cohortMembers.service.ts +++ b/src/cohortMembers/cohortMembers.service.ts @@ -567,14 +567,30 @@ ON CM."userId" = U."userId" ${whereCase}`; const savedCohortMember = await this.cohortMembersRepository.save( cohortMembers ); - - return APIResponse.success( + +if (savedCohortMember){ + const apiResponse= APIResponse.success( res, apiId, savedCohortMember, HttpStatus.OK, API_RESPONSES.COHORTMEMBER_CREATED_SUCCESSFULLY ); + + const enrichedData = { + ...savedCohortMember, + academicyearId, +}; + +this.kafkaService.publishCohortMemberEvent('created', enrichedData, apiId).catch(error => { + LoggerUtil.error( + `Failed to publish cohort member created event to Kafka`, + `Error: ${error.message}`, + apiId + ) +}) +return apiResponse; +} } catch (e) { LoggerUtil.error( `${API_RESPONSES.SERVER_ERROR}`, diff --git a/src/kafka/kafka.service.ts b/src/kafka/kafka.service.ts index 9e6f71d6..99541bbe 100644 --- a/src/kafka/kafka.service.ts +++ b/src/kafka/kafka.service.ts @@ -492,4 +492,50 @@ export class KafkaService implements OnModuleInit, OnModuleDestroy { await this.publishMessage(topic, payload, cohortMembershipId); this.logger.log(`Cohort member ${eventType} event published for cohortMembershipId ${cohortMembershipId}`); } + + /** + * Publish a cohort-academic-year-related event to Kafka + * + * @param eventType - The type of cohort academic year event (created, updated, deleted) + * @param cohortAcademicYearData - The cohort academic year data to include in the event + * @param cohortAcademicYearId - The ID of the cohort academic year (used as the message key) + */ + async publishCohortAcademicYearEvent( + eventType: 'created' | 'updated' | 'deleted', + cohortAcademicYearData: any, + cohortAcademicYearId: string + ): Promise { + if (!this.isKafkaEnabled) { + this.logger.warn('Kafka is disabled. Skipping cohort academic year event publish.'); + return; + } +console.log(cohortAcademicYearData) + const topic = this.configService.get('KAFKA_TOPIC', 'user-topic'); + + let fullEventType = ''; + switch (eventType) { + case 'created': + fullEventType = 'COHORT_ACADEMIC_YEAR_CREATED'; + break; + case 'updated': + fullEventType = 'COHORT_ACADEMIC_YEAR_UPDATED'; + break; + case 'deleted': + fullEventType = 'COHORT_ACADEMIC_YEAR_DELETED'; + break; + default: + fullEventType = 'UNKNOWN_EVENT'; + break; + } + + const payload = { + eventType: fullEventType, + timestamp: new Date().toISOString(), + cohortAcademicYearId, + data: cohortAcademicYearData + }; + + await this.publishMessage(topic, payload, cohortAcademicYearId); + this.logger.log(`Cohort academic year ${eventType} event published for cohortAcademicYearId ${cohortAcademicYearId}`); + } } From 075fa8be86f0b37dbbca113666e5aa2389689e23 Mon Sep 17 00:00:00 2001 From: abhaykalshetti-gif Date: Mon, 25 May 2026 16:54:21 +0530 Subject: [PATCH 2/6] Resolved err --- src/cohortMembers/cohortMembers.service.ts | 58 +++++++++++----------- src/kafka/kafka.service.ts | 2 +- 2 files changed, 30 insertions(+), 30 deletions(-) diff --git a/src/cohortMembers/cohortMembers.service.ts b/src/cohortMembers/cohortMembers.service.ts index 261513dc..bb93370b 100644 --- a/src/cohortMembers/cohortMembers.service.ts +++ b/src/cohortMembers/cohortMembers.service.ts @@ -494,7 +494,7 @@ ON CM."userId" = U."userId" ${whereCase}`; res: Response, tenantId: string, deviceId: string, - academicyearId: string + academicYearId: string ) { const apiId = APIID.COHORT_MEMBER_CREATE; try { @@ -514,7 +514,7 @@ ON CM."userId" = U."userId" ${whereCase}`; } // check year is live or not const academicYear = await this.academicyearService.getActiveAcademicYear( - academicyearId, + academicYearId, tenantId ); @@ -529,7 +529,7 @@ ON CM."userId" = U."userId" ${whereCase}`; } //check this cohort exist this year or not const isExistAcademicYear = await this.findCohortAcademicYearId( - academicyearId, + academicYearId, cohortMembers ); if (!isExistAcademicYear) { @@ -541,13 +541,13 @@ ON CM."userId" = U."userId" ${whereCase}`; HttpStatus.NOT_FOUND ); } - const cohortacAdemicyearId = isExistAcademicYear.cohortAcademicYearId; + const cohortAcademicYearId = isExistAcademicYear.cohortAcademicYearId; //check user is already exist in this cohort for this year or not const existrole = await this.cohortMembersRepository.find({ where: { userId: cohortMembers.userId, cohortId: cohortMembers.cohortId, - cohortAcademicYearId: cohortacAdemicyearId, + cohortAcademicYearId: cohortAcademicYearId, }, }); if (existrole.length > 0) { @@ -562,35 +562,35 @@ ON CM."userId" = U."userId" ${whereCase}`; cohortMembers.createdBy = loginUser; cohortMembers.updatedBy = loginUser; - cohortMembers.cohortAcademicYearId = cohortacAdemicyearId; + cohortMembers.cohortAcademicYearId = cohortAcademicYearId; // Create a new CohortMembers entity and populate it with cohortMembers data const savedCohortMember = await this.cohortMembersRepository.save( cohortMembers ); - -if (savedCohortMember){ - const apiResponse= APIResponse.success( - res, - apiId, - savedCohortMember, - HttpStatus.OK, - API_RESPONSES.COHORTMEMBER_CREATED_SUCCESSFULLY - ); - const enrichedData = { - ...savedCohortMember, - academicyearId, -}; - -this.kafkaService.publishCohortMemberEvent('created', enrichedData, apiId).catch(error => { - LoggerUtil.error( - `Failed to publish cohort member created event to Kafka`, - `Error: ${error.message}`, - apiId - ) -}) -return apiResponse; -} + if (savedCohortMember) { + const apiResponse = APIResponse.success( + res, + apiId, + savedCohortMember, + HttpStatus.OK, + API_RESPONSES.COHORTMEMBER_CREATED_SUCCESSFULLY + ); + + const enrichedData = { + ...savedCohortMember, + academicYearId, + }; + + this.kafkaService.publishCohortMemberEvent('created', enrichedData, enrichedData.cohortAcademicYearId).catch(error => { + LoggerUtil.error( + `Failed to publish cohort member created event to Kafka`, + `Error: ${error.message}`, + enrichedData.cohortAcademicYearId + ) + }) + return apiResponse; + } } catch (e) { LoggerUtil.error( `${API_RESPONSES.SERVER_ERROR}`, diff --git a/src/kafka/kafka.service.ts b/src/kafka/kafka.service.ts index 99541bbe..974f95f6 100644 --- a/src/kafka/kafka.service.ts +++ b/src/kafka/kafka.service.ts @@ -509,7 +509,7 @@ export class KafkaService implements OnModuleInit, OnModuleDestroy { this.logger.warn('Kafka is disabled. Skipping cohort academic year event publish.'); return; } -console.log(cohortAcademicYearData) + const topic = this.configService.get('KAFKA_TOPIC', 'user-topic'); let fullEventType = ''; From 0538256f85954657095bb98a2471af1445c17ca9 Mon Sep 17 00:00:00 2001 From: abhaykalshetti-gif Date: Mon, 25 May 2026 17:05:20 +0530 Subject: [PATCH 3/6] Resolved err --- src/cohortMembers/cohortMembers.service.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/cohortMembers/cohortMembers.service.ts b/src/cohortMembers/cohortMembers.service.ts index bb93370b..fbeecc30 100644 --- a/src/cohortMembers/cohortMembers.service.ts +++ b/src/cohortMembers/cohortMembers.service.ts @@ -580,7 +580,7 @@ ON CM."userId" = U."userId" ${whereCase}`; const enrichedData = { ...savedCohortMember, academicYearId, - }; + }; this.kafkaService.publishCohortMemberEvent('created', enrichedData, enrichedData.cohortAcademicYearId).catch(error => { LoggerUtil.error( From 22da91b4e881d0c3a87416b6060a8943df3c46ec Mon Sep 17 00:00:00 2001 From: abhaykalshetti-gif Date: Mon, 25 May 2026 17:09:50 +0530 Subject: [PATCH 4/6] Added changes for new AcademicYear --- src/cohortMembers/cohortMembers.service.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/cohortMembers/cohortMembers.service.ts b/src/cohortMembers/cohortMembers.service.ts index fbeecc30..62914828 100644 --- a/src/cohortMembers/cohortMembers.service.ts +++ b/src/cohortMembers/cohortMembers.service.ts @@ -585,7 +585,7 @@ ON CM."userId" = U."userId" ${whereCase}`; this.kafkaService.publishCohortMemberEvent('created', enrichedData, enrichedData.cohortAcademicYearId).catch(error => { LoggerUtil.error( `Failed to publish cohort member created event to Kafka`, - `Error: ${error.message}`, + `Error: ${error.message}`, enrichedData.cohortAcademicYearId ) }) From fe773d4e3528b2a626c1bec127cb614d857ecf16 Mon Sep 17 00:00:00 2001 From: abhaykalshetti-gif Date: Mon, 25 May 2026 17:42:14 +0530 Subject: [PATCH 5/6] Added changes for new AcademicYear --- src/cohortMembers/cohortMembers.service.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/cohortMembers/cohortMembers.service.ts b/src/cohortMembers/cohortMembers.service.ts index 62914828..7bb620d7 100644 --- a/src/cohortMembers/cohortMembers.service.ts +++ b/src/cohortMembers/cohortMembers.service.ts @@ -582,7 +582,7 @@ ON CM."userId" = U."userId" ${whereCase}`; academicYearId, }; - this.kafkaService.publishCohortMemberEvent('created', enrichedData, enrichedData.cohortAcademicYearId).catch(error => { + this.kafkaService.publishCohortMemberEvent('created', enrichedData, enrichedData.cohortMembershipId).catch(error => { LoggerUtil.error( `Failed to publish cohort member created event to Kafka`, `Error: ${error.message}`, From ecef51e0fbdf897bac3f0f84a32f3caae4944356 Mon Sep 17 00:00:00 2001 From: abhaykalshetti-gif Date: Mon, 25 May 2026 18:06:42 +0530 Subject: [PATCH 6/6] Added the kafka event for cohortAcademicYear and updated for cohortMember --- src/cohortMembers/cohortMembers.service.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/cohortMembers/cohortMembers.service.ts b/src/cohortMembers/cohortMembers.service.ts index 7bb620d7..07d0df8f 100644 --- a/src/cohortMembers/cohortMembers.service.ts +++ b/src/cohortMembers/cohortMembers.service.ts @@ -581,7 +581,7 @@ ON CM."userId" = U."userId" ${whereCase}`; ...savedCohortMember, academicYearId, }; - + this.kafkaService.publishCohortMemberEvent('created', enrichedData, enrichedData.cohortMembershipId).catch(error => { LoggerUtil.error( `Failed to publish cohort member created event to Kafka`,