diff --git a/src/academicyears/academicyears.service.ts b/src/academicyears/academicyears.service.ts index 87f4e009..84a2196b 100644 --- a/src/academicyears/academicyears.service.ts +++ b/src/academicyears/academicyears.service.ts @@ -66,7 +66,10 @@ export class AcademicYearService { if (getCurrentActiveYear) { const updateStatus = await this.academicYearRespository.update( { id: getCurrentActiveYear.id }, - { isActive: false } + { + isActive: false, + updatedBy: academicYearDto.updatedBy + } ); } //save record diff --git a/src/cohortAcademicYear/cohortAcademicYear.module.ts b/src/cohortAcademicYear/cohortAcademicYear.module.ts index 114c6350..dafd3cfb 100644 --- a/src/cohortAcademicYear/cohortAcademicYear.module.ts +++ b/src/cohortAcademicYear/cohortAcademicYear.module.ts @@ -7,11 +7,13 @@ import { Cohort } from "src/cohort/entities/cohort.entity"; import { AcademicYear } from "src/academicyears/entities/academicyears-entity"; import { Tenants } from "src/userTenantMapping/entities/tenant.entity"; import { AcademicyearsModule } from "src/academicyears/academicyears.module"; +import { KafkaModule } from "src/kafka/kafka.module"; @Module({ imports: [ AcademicyearsModule, + KafkaModule, TypeOrmModule.forFeature([ CohortAcademicYear, Cohort, diff --git a/src/cohortAcademicYear/cohortAcademicYear.service.ts b/src/cohortAcademicYear/cohortAcademicYear.service.ts index 385be1de..4d6032cb 100644 --- a/src/cohortAcademicYear/cohortAcademicYear.service.ts +++ b/src/cohortAcademicYear/cohortAcademicYear.service.ts @@ -6,6 +6,8 @@ import { Request, Response } from "express"; import { CohortAcademicYearDto } from "src/cohortAcademicYear/dto/cohort-academicyear.dto"; import { APIID } from "@utils/api-id.config"; import APIResponse from "src/common/responses/response"; +import { KafkaService } from "src/kafka/kafka.service"; +import { LoggerUtil } from "src/common/logger/LoggerUtil"; import { API_RESPONSES } from "@utils/response.messages"; import { AcademicYearService } from "src/academicyears/academicyears.service"; import { Cohort } from "src/cohort/entities/cohort.entity"; @@ -19,6 +21,7 @@ export class CohortAcademicYearService { private readonly cohortRepository: Repository, @InjectRepository(CohortAcademicYear) private readonly cohortAcademicYearRepository: Repository, + private readonly kafkaService: KafkaService, ) { } async createCohortAcademicYear(tenantId: string, request: Request, cohortAcademicYearDto: CohortAcademicYearDto, response: Response) { @@ -68,13 +71,27 @@ export class CohortAcademicYearService { const createdAcademicYear = await this.insertCohortAcademicYear(cohortAcademicYearDto.cohortId, cohortAcademicYearDto.academicYearId, cohortAcademicYearDto.createdBy, cohortAcademicYearDto.updatedBy); if (createdAcademicYear) { - return APIResponse.success( + const apiResponse = APIResponse.success( response, apiId, createdAcademicYear, HttpStatus.OK, API_RESPONSES.ADD_COHORT_TO_ACADEMIC_YEAR ); + + const enrichedData = { + ...createdAcademicYear, + tenantId, + }; + // Publish cohort academic year created event to Kafka asynchronously - after response is sent to client + this.kafkaService.publishCohortAcademicYearEvent('created', enrichedData, enrichedData.cohortAcademicYearId) + .catch(error => LoggerUtil.error( + `Failed to publish cohort academic year created event to Kafka`, + `Error: ${error.message}`, + apiId + )); + + return apiResponse; } } catch (error) { diff --git a/src/cohortMembers/cohortMembers.service.ts b/src/cohortMembers/cohortMembers.service.ts index f7bc70d9..07d0df8f 100644 --- a/src/cohortMembers/cohortMembers.service.ts +++ b/src/cohortMembers/cohortMembers.service.ts @@ -494,7 +494,7 @@ ON CM."userId" = U."userId" ${whereCase}`; res: Response, tenantId: string, deviceId: string, - academicyearId: string + academicYearId: string ) { const apiId = APIID.COHORT_MEMBER_CREATE; try { @@ -514,7 +514,7 @@ ON CM."userId" = U."userId" ${whereCase}`; } // check year is live or not const academicYear = await this.academicyearService.getActiveAcademicYear( - academicyearId, + academicYearId, tenantId ); @@ -529,7 +529,7 @@ ON CM."userId" = U."userId" ${whereCase}`; } //check this cohort exist this year or not const isExistAcademicYear = await this.findCohortAcademicYearId( - academicyearId, + academicYearId, cohortMembers ); if (!isExistAcademicYear) { @@ -541,13 +541,13 @@ ON CM."userId" = U."userId" ${whereCase}`; HttpStatus.NOT_FOUND ); } - const cohortacAdemicyearId = isExistAcademicYear.cohortAcademicYearId; + const cohortAcademicYearId = isExistAcademicYear.cohortAcademicYearId; //check user is already exist in this cohort for this year or not const existrole = await this.cohortMembersRepository.find({ where: { userId: cohortMembers.userId, cohortId: cohortMembers.cohortId, - cohortAcademicYearId: cohortacAdemicyearId, + cohortAcademicYearId: cohortAcademicYearId, }, }); if (existrole.length > 0) { @@ -562,19 +562,35 @@ ON CM."userId" = U."userId" ${whereCase}`; cohortMembers.createdBy = loginUser; cohortMembers.updatedBy = loginUser; - cohortMembers.cohortAcademicYearId = cohortacAdemicyearId; + cohortMembers.cohortAcademicYearId = cohortAcademicYearId; // Create a new CohortMembers entity and populate it with cohortMembers data const savedCohortMember = await this.cohortMembersRepository.save( cohortMembers ); - return APIResponse.success( - res, - apiId, - savedCohortMember, - HttpStatus.OK, - API_RESPONSES.COHORTMEMBER_CREATED_SUCCESSFULLY - ); + if (savedCohortMember) { + const apiResponse = APIResponse.success( + res, + apiId, + savedCohortMember, + HttpStatus.OK, + API_RESPONSES.COHORTMEMBER_CREATED_SUCCESSFULLY + ); + + const enrichedData = { + ...savedCohortMember, + academicYearId, + }; + + this.kafkaService.publishCohortMemberEvent('created', enrichedData, enrichedData.cohortMembershipId).catch(error => { + LoggerUtil.error( + `Failed to publish cohort member created event to Kafka`, + `Error: ${error.message}`, + enrichedData.cohortAcademicYearId + ) + }) + return apiResponse; + } } catch (e) { LoggerUtil.error( `${API_RESPONSES.SERVER_ERROR}`, diff --git a/src/kafka/kafka.service.ts b/src/kafka/kafka.service.ts index 9e6f71d6..974f95f6 100644 --- a/src/kafka/kafka.service.ts +++ b/src/kafka/kafka.service.ts @@ -492,4 +492,50 @@ export class KafkaService implements OnModuleInit, OnModuleDestroy { await this.publishMessage(topic, payload, cohortMembershipId); this.logger.log(`Cohort member ${eventType} event published for cohortMembershipId ${cohortMembershipId}`); } + + /** + * Publish a cohort-academic-year-related event to Kafka + * + * @param eventType - The type of cohort academic year event (created, updated, deleted) + * @param cohortAcademicYearData - The cohort academic year data to include in the event + * @param cohortAcademicYearId - The ID of the cohort academic year (used as the message key) + */ + async publishCohortAcademicYearEvent( + eventType: 'created' | 'updated' | 'deleted', + cohortAcademicYearData: any, + cohortAcademicYearId: string + ): Promise { + if (!this.isKafkaEnabled) { + this.logger.warn('Kafka is disabled. Skipping cohort academic year event publish.'); + return; + } + + const topic = this.configService.get('KAFKA_TOPIC', 'user-topic'); + + let fullEventType = ''; + switch (eventType) { + case 'created': + fullEventType = 'COHORT_ACADEMIC_YEAR_CREATED'; + break; + case 'updated': + fullEventType = 'COHORT_ACADEMIC_YEAR_UPDATED'; + break; + case 'deleted': + fullEventType = 'COHORT_ACADEMIC_YEAR_DELETED'; + break; + default: + fullEventType = 'UNKNOWN_EVENT'; + break; + } + + const payload = { + eventType: fullEventType, + timestamp: new Date().toISOString(), + cohortAcademicYearId, + data: cohortAcademicYearData + }; + + await this.publishMessage(topic, payload, cohortAcademicYearId); + this.logger.log(`Cohort academic year ${eventType} event published for cohortAcademicYearId ${cohortAcademicYearId}`); + } }