Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
310 changes: 160 additions & 150 deletions packages/data-provider/src/services/submission/submissionProcessor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -281,181 +281,191 @@ const submissionProcessor = (dependencies: BaseDependencies) => {
* @returns void
*/
const performCommitSubmissionAsync = async (params: CommitSubmissionParams): Promise<void> => {
const submissionRepo = createSubmissionRepository(dependencies);
const dataSubmittedRepo = submittedRepository(dependencies);
try {
const submissionRepo = createSubmissionRepository(dependencies);
const dataSubmittedRepo = submittedRepository(dependencies);

const { dictionary, dataToValidate, submissionId, username } = params;
const { dictionary, dataToValidate, submissionId, username } = params;

const submission = await submissionRepo.getSubmissionById(submissionId);
const submission = await submissionRepo.getSubmissionById(submissionId);

if (!submission) {
throw new Error(`Submission '${submissionId}' not found`);
}
if (!submission) {
throw new Error(`Submission '${submissionId}' not found`);
}

// Merge Submitted Data with items to be inserted, updated or deleted consist on 3 steps
// Step 1: Exclude items that are marked for deletion
const systemIdsToDelete = new Set<string>(dataToValidate?.deletes?.map((item) => item.systemId) || []);
logger.info(LOG_MODULE, `Found '${systemIdsToDelete.size}' Records to delete on Submission '${submission.id}'`);
const submittedData = dataToValidate.submittedData?.filter((item) => !systemIdsToDelete.has(item.systemId));
// Merge Submitted Data with items to be inserted, updated or deleted consist on 3 steps
// Step 1: Exclude items that are marked for deletion
const systemIdsToDelete = new Set<string>(dataToValidate?.deletes?.map((item) => item.systemId) || []);
logger.info(LOG_MODULE, `Found '${systemIdsToDelete.size}' Records to delete on Submission '${submission.id}'`);
const submittedData = dataToValidate.submittedData?.filter((item) => !systemIdsToDelete.has(item.systemId));

// Step 2: Modify items marked for update
const systemIdsToUpdate = new Set<string>(dataToValidate.updates ? Object.keys(dataToValidate.updates) : []);
logger.info(LOG_MODULE, `Found '${systemIdsToUpdate.size}' Records to update on Submission '${submission.id}'`);
const submittedDataToValidate = dataToValidate.updates
? updateSubmittedDataArray(submittedData, Object.values(dataToValidate.updates))
: submittedData;

// Step 3: Add items marked for insertion
logger.info(
LOG_MODULE,
`Found '${dataToValidate.inserts.length}' Records to insert on Submission '${submission.id}'`,
);
const schemasDataToValidate = groupSchemaDataByEntityName({
inserts: dataToValidate.inserts,
submittedData: submittedDataToValidate,
});

// Step 2: Modify items marked for update
const systemIdsToUpdate = new Set<string>(dataToValidate.updates ? Object.keys(dataToValidate.updates) : []);
logger.info(LOG_MODULE, `Found '${systemIdsToUpdate.size}' Records to update on Submission '${submission.id}'`);
const submittedDataToValidate = dataToValidate.updates
? updateSubmittedDataArray(submittedData, Object.values(dataToValidate.updates))
: submittedData;
const resultValidation = validateSchemas(dictionary, schemasDataToValidate.schemaDataByEntityName);

// Step 3: Add items marked for insertion
logger.info(
LOG_MODULE,
`Found '${dataToValidate.inserts.length}' Records to insert on Submission '${submission.id}'`,
);
const schemasDataToValidate = groupSchemaDataByEntityName({
inserts: dataToValidate.inserts,
submittedData: submittedDataToValidate,
});
const resultCommit: {
inserts: SubmittedDataResponse[];
updates: SubmittedDataResponse[];
deletes: SubmittedDataResponse[];
} = {
inserts: [],
updates: [],
deletes: [],
};

const resultValidation = validateSchemas(dictionary, schemasDataToValidate.schemaDataByEntityName);

const resultCommit: {
inserts: SubmittedDataResponse[];
updates: SubmittedDataResponse[];
deletes: SubmittedDataResponse[];
} = {
inserts: [],
updates: [],
deletes: [],
};

type UpdateSubmittedDataParams = {
submittedDataId: number;
dataDiff: DataDiff;
newData: Partial<SubmittedData>;
oldIsValid: boolean;
submissionId: number;
};

const insertsToSave: NewSubmittedData[] = [];
const updatesToSave: UpdateSubmittedDataParams[] = [];
const deletesToProcess: { diff: DataDiff; submissionId: number; systemId: string; username: string }[] = [];

Object.entries(schemasDataToValidate.submittedDataByEntityName).forEach(([entityName, dataArray], index) => {
dataArray.forEach((data) => {
const invalidRecordErrors = findInvalidRecordErrorsBySchemaName(resultValidation, entityName);
const hasErrorByIndex = groupErrorsByIndex(invalidRecordErrors);
const oldIsValid = data.isValid;
const newIsValid = !hasErrorsByIndex(hasErrorByIndex, index);
if (data.id) {
const inputUpdate: Partial<SubmittedData> = {};
const submisionUpdateData = dataToValidate.updates && dataToValidate.updates[data.systemId];
if (submisionUpdateData) {
logger.info(LOG_MODULE, `Updating submittedData system ID '${data.systemId}' in entity '${entityName}'`);
inputUpdate.data = data.data;
}
type UpdateSubmittedDataParams = {
submittedDataId: number;
dataDiff: DataDiff;
newData: Partial<SubmittedData>;
oldIsValid: boolean;
submissionId: number;
};

if (oldIsValid !== newIsValid) {
inputUpdate.isValid = newIsValid;
if (newIsValid) {
const insertsToSave: NewSubmittedData[] = [];
const updatesToSave: UpdateSubmittedDataParams[] = [];
const deletesToProcess: { diff: DataDiff; submissionId: number; systemId: string; username: string }[] = [];

Object.entries(schemasDataToValidate.submittedDataByEntityName).forEach(([entityName, dataArray], index) => {
dataArray.forEach((data) => {
const invalidRecordErrors = findInvalidRecordErrorsBySchemaName(resultValidation, entityName);
const hasErrorByIndex = groupErrorsByIndex(invalidRecordErrors);
const oldIsValid = data.isValid;
const newIsValid = !hasErrorsByIndex(hasErrorByIndex, index);
if (data.id) {
const inputUpdate: Partial<SubmittedData> = {};
const submisionUpdateData = dataToValidate.updates && dataToValidate.updates[data.systemId];
if (submisionUpdateData) {
logger.info(LOG_MODULE, `Updating submittedData system ID '${data.systemId}' in entity '${entityName}'`);
inputUpdate.data = data.data;
}

if (oldIsValid !== newIsValid) {
inputUpdate.isValid = newIsValid;
if (newIsValid) {
logger.info(
LOG_MODULE,
`Updating submittedData system ID '${data.systemId}' as Valid in entity '${entityName}'`,
);
inputUpdate.lastValidSchemaId = dictionary.id;
}
logger.info(
LOG_MODULE,
`Updating submittedData system ID '${data.systemId}' as Valid in entity '${entityName}'`,
`Updating submittedData system ID '${data.systemId}' as invalid in entity '${entityName}'`,
);
inputUpdate.lastValidSchemaId = dictionary.id;
}
logger.info(
LOG_MODULE,
`Updating submittedData system ID '${data.systemId}' as invalid in entity '${entityName}'`,
);
}

if (Object.values(inputUpdate)) {
inputUpdate.updatedBy = username;
if (Object.values(inputUpdate)) {
inputUpdate.updatedBy = username;
if (newIsValid) {
inputUpdate.lastValidSchemaId = dictionary.id;
}
updatesToSave.push({
submittedDataId: data.id,
newData: inputUpdate,
dataDiff: { old: submisionUpdateData?.old ?? {}, new: submisionUpdateData?.new ?? {} },
oldIsValid: oldIsValid,
submissionId: submission.id,
});

// Check if either 'data' or 'isValid' keys has been updated
if ('data' in inputUpdate || 'isValid' in inputUpdate) {
resultCommit.updates.push({
isValid: newIsValid,
entityName,
organization: data.organization,
data: data.data,
systemId: data.systemId,
});
}
}
} else {
data.isValid = newIsValid;
if (newIsValid) {
inputUpdate.lastValidSchemaId = dictionary.id;
data.lastValidSchemaId = dictionary.id;
}
updatesToSave.push({
submittedDataId: data.id,
newData: inputUpdate,
dataDiff: { old: submisionUpdateData?.old ?? {}, new: submisionUpdateData?.new ?? {} },
oldIsValid: oldIsValid,
submissionId: submission.id,
insertsToSave.push(data);

resultCommit.inserts.push({
isValid: newIsValid,
entityName,
organization: data.organization,
data: data.data,
systemId: data.systemId,
});

// Check if either 'data' or 'isValid' keys has been updated
if ('data' in inputUpdate || 'isValid' in inputUpdate) {
resultCommit.updates.push({
isValid: newIsValid,
entityName,
organization: data.organization,
data: data.data,
systemId: data.systemId,
});
}
}
} else {
data.isValid = newIsValid;
if (newIsValid) {
data.lastValidSchemaId = dictionary.id;
}
insertsToSave.push(data);

resultCommit.inserts.push({
isValid: newIsValid,
entityName,
organization: data.organization,
data: data.data,
systemId: data.systemId,
});
}
});
});
});

// iterate if there are any record to be deleted
dataToValidate?.deletes?.forEach((item) => {
deletesToProcess.push({
submissionId: submission.id,
systemId: item.systemId,
diff: computeDataDiff(item.data, null),
username,
// iterate if there are any record to be deleted
dataToValidate?.deletes?.forEach((item) => {
deletesToProcess.push({
submissionId: submission.id,
systemId: item.systemId,
diff: computeDataDiff(item.data, null),
username,
});

resultCommit.deletes.push({
isValid: item.isValid,
entityName: item.entityName,
organization: item.organization,
data: item.data,
systemId: item.systemId,
});
});

resultCommit.deletes.push({
isValid: item.isValid,
entityName: item.entityName,
organization: item.organization,
data: item.data,
systemId: item.systemId,
await dependencies.db.transaction(async (tx) => {
if (insertsToSave.length) {
await dataSubmittedRepo.save(insertsToSave, tx);
}
if (updatesToSave.length) {
await dataSubmittedRepo.update(updatesToSave, tx);
}
if (deletesToProcess.length) {
await dataSubmittedRepo.deleteBySystemId(deletesToProcess, tx);
}

await submissionRepo.update(
submission.id,
{
status: SUBMISSION_STATUS.COMMITTED,
updatedAt: new Date(),
},
tx,
);
});
});

await dependencies.db.transaction(async (tx) => {
if (insertsToSave.length) {
await dataSubmittedRepo.save(insertsToSave, tx);
}
if (updatesToSave.length) {
await dataSubmittedRepo.update(updatesToSave, tx);
}
if (deletesToProcess.length) {
await dataSubmittedRepo.deleteBySystemId(deletesToProcess, tx);
if (params.onFinishCommit) {
params.onFinishCommit({
submissionId: submission.id,
organization: submission.organization,
categoryId: submission.dictionaryCategory.id,
data: resultCommit,
});
}

await submissionRepo.update(
submission.id,
{
status: SUBMISSION_STATUS.COMMITTED,
updatedAt: new Date(),
},
tx,
} catch (error) {
const message = error instanceof Error ? error.message : error;
logger.info(
LOG_MODULE,
`Unable to complete performCommitSubmissionAsync for submission ${params.submissionId}, an error was thrown during execution`,
message,
);
});

if (params.onFinishCommit) {
params.onFinishCommit({
submissionId: submission.id,
organization: submission.organization,
categoryId: submission.dictionaryCategory.id,
data: resultCommit,
});
logger.error(error);
}
};

Expand Down