Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion src/academicyears/academicyears.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,10 @@ export class AcademicYearService {
if (getCurrentActiveYear) {
const updateStatus = await this.academicYearRespository.update(
{ id: getCurrentActiveYear.id },
{ isActive: false }
{
isActive: false,
updatedBy: academicYearDto.updatedBy
}
);
}
//save record
Expand Down
2 changes: 2 additions & 0 deletions src/cohortAcademicYear/cohortAcademicYear.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,13 @@ import { Cohort } from "src/cohort/entities/cohort.entity";
import { AcademicYear } from "src/academicyears/entities/academicyears-entity";
import { Tenants } from "src/userTenantMapping/entities/tenant.entity";
import { AcademicyearsModule } from "src/academicyears/academicyears.module";
import { KafkaModule } from "src/kafka/kafka.module";


@Module({
imports: [
AcademicyearsModule,
KafkaModule,
TypeOrmModule.forFeature([
CohortAcademicYear,
Cohort,
Expand Down
19 changes: 18 additions & 1 deletion src/cohortAcademicYear/cohortAcademicYear.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import { Request, Response } from "express";
import { CohortAcademicYearDto } from "src/cohortAcademicYear/dto/cohort-academicyear.dto";
import { APIID } from "@utils/api-id.config";
import APIResponse from "src/common/responses/response";
import { KafkaService } from "src/kafka/kafka.service";
import { LoggerUtil } from "src/common/logger/LoggerUtil";
import { API_RESPONSES } from "@utils/response.messages";
import { AcademicYearService } from "src/academicyears/academicyears.service";
import { Cohort } from "src/cohort/entities/cohort.entity";
Expand All @@ -19,6 +21,7 @@ export class CohortAcademicYearService {
private readonly cohortRepository: Repository<Cohort>,
@InjectRepository(CohortAcademicYear)
private readonly cohortAcademicYearRepository: Repository<CohortAcademicYear>,
private readonly kafkaService: KafkaService,
) { }

async createCohortAcademicYear(tenantId: string, request: Request, cohortAcademicYearDto: CohortAcademicYearDto, response: Response) {
Expand Down Expand Up @@ -68,13 +71,27 @@ export class CohortAcademicYearService {
const createdAcademicYear = await this.insertCohortAcademicYear(cohortAcademicYearDto.cohortId, cohortAcademicYearDto.academicYearId, cohortAcademicYearDto.createdBy, cohortAcademicYearDto.updatedBy);

if (createdAcademicYear) {
return APIResponse.success(
const apiResponse = APIResponse.success(
response,
apiId,
createdAcademicYear,
HttpStatus.OK,
API_RESPONSES.ADD_COHORT_TO_ACADEMIC_YEAR
);

const enrichedData = {
...createdAcademicYear,
tenantId,
};
// Publish cohort academic year created event to Kafka asynchronously - after response is sent to client
this.kafkaService.publishCohortAcademicYearEvent('created', enrichedData, enrichedData.cohortAcademicYearId)
.catch(error => LoggerUtil.error(
`Failed to publish cohort academic year created event to Kafka`,
`Error: ${error.message}`,
apiId
));

return apiResponse;
}

} catch (error) {
Expand Down
42 changes: 29 additions & 13 deletions src/cohortMembers/cohortMembers.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -494,7 +494,7 @@ ON CM."userId" = U."userId" ${whereCase}`;
res: Response,
tenantId: string,
deviceId: string,
academicyearId: string
academicYearId: string
) {
const apiId = APIID.COHORT_MEMBER_CREATE;
try {
Expand All @@ -514,7 +514,7 @@ ON CM."userId" = U."userId" ${whereCase}`;
}
// check year is live or not
const academicYear = await this.academicyearService.getActiveAcademicYear(
academicyearId,
academicYearId,
tenantId
);

Expand All @@ -529,7 +529,7 @@ ON CM."userId" = U."userId" ${whereCase}`;
}
//check this cohort exist this year or not
const isExistAcademicYear = await this.findCohortAcademicYearId(
academicyearId,
academicYearId,
cohortMembers
);
if (!isExistAcademicYear) {
Expand All @@ -541,13 +541,13 @@ ON CM."userId" = U."userId" ${whereCase}`;
HttpStatus.NOT_FOUND
);
}
const cohortacAdemicyearId = isExistAcademicYear.cohortAcademicYearId;
const cohortAcademicYearId = isExistAcademicYear.cohortAcademicYearId;
//check user is already exist in this cohort for this year or not
const existrole = await this.cohortMembersRepository.find({
where: {
userId: cohortMembers.userId,
cohortId: cohortMembers.cohortId,
cohortAcademicYearId: cohortacAdemicyearId,
cohortAcademicYearId: cohortAcademicYearId,
},
});
if (existrole.length > 0) {
Expand All @@ -562,19 +562,35 @@ ON CM."userId" = U."userId" ${whereCase}`;

cohortMembers.createdBy = loginUser;
cohortMembers.updatedBy = loginUser;
cohortMembers.cohortAcademicYearId = cohortacAdemicyearId;
cohortMembers.cohortAcademicYearId = cohortAcademicYearId;
// Create a new CohortMembers entity and populate it with cohortMembers data
const savedCohortMember = await this.cohortMembersRepository.save(
cohortMembers
);

return APIResponse.success(
res,
apiId,
savedCohortMember,
HttpStatus.OK,
API_RESPONSES.COHORTMEMBER_CREATED_SUCCESSFULLY
);
if (savedCohortMember) {
const apiResponse = APIResponse.success(
res,
apiId,
savedCohortMember,
HttpStatus.OK,
API_RESPONSES.COHORTMEMBER_CREATED_SUCCESSFULLY
);

const enrichedData = {
...savedCohortMember,
academicYearId,
};

this.kafkaService.publishCohortMemberEvent('created', enrichedData, enrichedData.cohortMembershipId).catch(error => {
LoggerUtil.error(
`Failed to publish cohort member created event to Kafka`,
`Error: ${error.message}`,
enrichedData.cohortAcademicYearId
)
Comment on lines +586 to +590
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The LoggerUtil.error call in the catch block is inconsistent with the rest of the file. Usually, apiId is passed as the third argument (context) to allow for better log tracing and filtering. Using enrichedData.cohortAcademicYearId as the context here deviates from that pattern.

})
Comment on lines +585 to +591
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Inconsistent Kafka event publishing. In createCohortMembers, you are calling this.kafkaService.publishCohortMemberEvent directly, whereas in updateCohortMembers (line 792), you use the internal wrapper this.publishCohortMemberEvent. The internal wrapper is designed to enrich the event payload with custom fields (like subject, fees, etc.). Bypassing it here means the 'created' event will have a different schema and less data than the 'updated' event, which can cause issues for downstream consumers.

Suggested change
this.kafkaService.publishCohortMemberEvent('created', enrichedData, enrichedData.cohortMembershipId).catch(error => {
LoggerUtil.error(
`Failed to publish cohort member created event to Kafka`,
`Error: ${error.message}`,
enrichedData.cohortAcademicYearId
)
})
this.publishCohortMemberEvent('created', savedCohortMember, apiId).catch(error => {
LoggerUtil.error(
`Failed to publish cohort member created event to Kafka`,
`Error: ${error.message}`,
apiId
)
})

return apiResponse;
}
} catch (e) {
LoggerUtil.error(
`${API_RESPONSES.SERVER_ERROR}`,
Expand Down
46 changes: 46 additions & 0 deletions src/kafka/kafka.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -374,7 +374,7 @@
* @param cohortData - The cohort data to include in the event
* @param cohortId - The ID of the cohort (used as the message key)
*/
async publishCohortEvent(eventType: 'created' | 'updated' | 'deleted', cohortData: any, cohortId: string): Promise<void> {

Check warning on line 377 in src/kafka/kafka.service.ts

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Replace this union type with a type alias.

See more on https://sonarcloud.io/project/issues?id=tekdi_user-microservice&issues=AZ5fJsS0kyB13zfF7iyS&open=AZ5fJsS0kyB13zfF7iyS&pullRequest=749
if (!this.isKafkaEnabled) {
this.logger.warn('Kafka is disabled. Skipping cohort event publish.');
return; // Do nothing if Kafka is disabled
Expand Down Expand Up @@ -492,4 +492,50 @@
await this.publishMessage(topic, payload, cohortMembershipId);
this.logger.log(`Cohort member ${eventType} event published for cohortMembershipId ${cohortMembershipId}`);
}

/**
* Publish a cohort-academic-year-related event to Kafka
*
* @param eventType - The type of cohort academic year event (created, updated, deleted)
* @param cohortAcademicYearData - The cohort academic year data to include in the event
* @param cohortAcademicYearId - The ID of the cohort academic year (used as the message key)
*/
async publishCohortAcademicYearEvent(
eventType: 'created' | 'updated' | 'deleted',
cohortAcademicYearData: any,
cohortAcademicYearId: string
): Promise<void> {
if (!this.isKafkaEnabled) {
this.logger.warn('Kafka is disabled. Skipping cohort academic year event publish.');
return;
}

const topic = this.configService.get<string>('KAFKA_TOPIC', 'user-topic');

let fullEventType = '';
switch (eventType) {
case 'created':
fullEventType = 'COHORT_ACADEMIC_YEAR_CREATED';
break;
case 'updated':
fullEventType = 'COHORT_ACADEMIC_YEAR_UPDATED';
break;
case 'deleted':
fullEventType = 'COHORT_ACADEMIC_YEAR_DELETED';
break;
default:
fullEventType = 'UNKNOWN_EVENT';
break;
}

const payload = {
eventType: fullEventType,
timestamp: new Date().toISOString(),
cohortAcademicYearId,
data: cohortAcademicYearData
};

await this.publishMessage(topic, payload, cohortAcademicYearId);
this.logger.log(`Cohort academic year ${eventType} event published for cohortAcademicYearId ${cohortAcademicYearId}`);
}
}