diff --git a/.env.schema b/.env.schema
index e9163250..3f37a956 100644
--- a/.env.schema
+++ b/.env.schema
@@ -6,6 +6,7 @@ DB_NAME=
DB_PASSWORD=
DB_PORT=
DB_USER=
+FILES_LIMIT_SIZE=
ID_CUSTOM_ALPHABET=
ID_CUSTOM_SIZE=
ID_USELOCAL=
diff --git a/README.md b/README.md
index e6be60b1..a37fdbed 100644
--- a/README.md
+++ b/README.md
@@ -56,23 +56,24 @@ A model-agnostic, tabular data submission system designed to manage and validate
### Environment Variables
-| Name | Description | Default |
-| --------------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ | -------------------------------------- |
-| `ALLOWED_ORIGINS` | Specifies a list of permitted origins for Cross-Origin Resource Sharing (CORS). These origins, separated by commas, are allowed to make requests to the server, ensuring only trusted domains can access resources. (Example: https://www.example.com,https://subdomain.example.com) | |
-| `AUDIT_ENABLED` | Ensures that any modifications to the submitted data are logged, providing a way to identify who made changes and when they were made. | true |
-| `CORS_ENABLED` | Controls whether the CORS functionality is enabled or disabled. | false |
-| `DB_HOST` | Database Hostname | |
-| `DB_NAME` | Database Name | |
-| `DB_PASSWORD` | Database Password | |
-| `DB_PORT` | Database Port | |
-| `DB_USER` | Database User | |
-| `ID_CUSTOM_ALPHABET` | Custom Alphabet for local ID generation | '0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ' |
-| `ID_CUSTOM_SIZE` | Custom size of ID for local ID generation | 21 |
-| `ID_USELOCAL` | Generate ID locally | true |
-| `LECTERN_URL` | Schema Service (Lectern) URL | |
-| `LOG_LEVEL` | Log Level | 'info' |
-| `PLURALIZE_SCHEMAS_ENABLED` | This feature automatically convert schema names to their plural forms when handling compound documents. Pluralization assumes the words are in English | true |
-| `PORT` | Server Port | 3030 |
+| Name | Description | Default |
+| --------------------------- | ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | -------------------------------------- |
+| `ALLOWED_ORIGINS` | Specifies a list of permitted origins for Cross-Origin Resource Sharing (CORS). These origins, separated by commas, are allowed to make requests to the server, ensuring only trusted domains can access resources. (Example: https://www.example.com,https://subdomain.example.com) | |
+| `AUDIT_ENABLED` | Ensures that any modifications to the submitted data are logged, providing a way to identify who made changes and when they were made. | true |
+| `CORS_ENABLED` | Controls whether the CORS functionality is enabled or disabled. | false |
+| `DB_HOST` | Database Hostname | |
+| `DB_NAME` | Database Name | |
+| `DB_PASSWORD` | Database Password | |
+| `DB_PORT` | Database Port | |
+| `DB_USER` | Database User | |
+| `FILES_LIMIT_SIZE` | Limit upload file size in string or number.
Supported units and abbreviations are as follows and are case-insensitive:
- b for bytes
- kb for kilobytes
- mb for megabytes
- gb for gigabytes
- tb for terabytes
- pb for petabytes
Any other text is considered as byte | '10mb' |
+| `ID_CUSTOM_ALPHABET` | Custom Alphabet for local ID generation | '0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ' |
+| `ID_CUSTOM_SIZE` | Custom size of ID for local ID generation | 21 |
+| `ID_USELOCAL` | Generate ID locally | true |
+| `LECTERN_URL` | Schema Service (Lectern) URL | |
+| `LOG_LEVEL` | Log Level | 'info' |
+| `PLURALIZE_SCHEMAS_ENABLED` | This feature automatically convert schema names to their plural forms when handling compound documents. Pluralization assumes the words are in English | true |
+| `PORT` | Server Port | 3030 |
### Script Commands
diff --git a/apps/server/src/config/app.ts b/apps/server/src/config/app.ts
index d101b05c..6be2b32d 100644
--- a/apps/server/src/config/app.ts
+++ b/apps/server/src/config/app.ts
@@ -48,6 +48,9 @@ export const appConfig: AppConfig = {
pluralizeSchemasName: getBoolean(process.env.PLURALIZE_SCHEMAS_ENABLED, true),
},
},
+ files: {
+ limitSize: process.env.FILES_LIMIT_SIZE || '10mb',
+ },
idService: {
useLocal: getBoolean(process.env.ID_USELOCAL, true),
customAlphabet: process.env.ID_CUSTOM_ALPHABET || '0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ',
diff --git a/apps/server/swagger/submission-api.yml b/apps/server/swagger/submission-api.yml
index 045f67b6..4fef9aa3 100644
--- a/apps/server/swagger/submission-api.yml
+++ b/apps/server/swagger/submission-api.yml
@@ -168,19 +168,26 @@
summary: Add new data to a submission for the specified category. Returns an Active Submission containing the newly created records
tags:
- Submission
+ consumes:
+ - multipart/form-data
parameters:
- $ref: '#/components/parameters/path/CategoryId'
- - $ref: '#/components/parameters/query/EntityName'
- $ref: '#/components/parameters/query/Organization'
requestBody:
- description: The JSON payload containing the data to be added to the submission
+ description: Files to be submitted. Accepted file extensions are '.tsv' and '.csv'.
required: true
content:
- application/json:
+ multipart/form-data:
schema:
- type: array
- items:
- type: object
+ type: object
+ properties:
+ files:
+ type: array
+ items:
+ type: string
+ format: binary
+ required:
+ - files
responses:
200:
description: Submission accepted
@@ -200,19 +207,26 @@
summary: Modifies existing data for a submission. Returns an Active Submission containing the records that will be updated
tags:
- Submission
+ consumes:
+ - multipart/form-data
parameters:
- $ref: '#/components/parameters/path/CategoryId'
- - $ref: '#/components/parameters/query/EntityName'
- $ref: '#/components/parameters/query/Organization'
requestBody:
- description: The JSON payload containing the data to be added to the submission
+ description: Files to be submitted. Accepted file extensions are '.tsv' and '.csv'.
required: true
content:
- application/json:
+ multipart/form-data:
schema:
- type: array
- items:
- type: object
+ type: object
+ properties:
+ files:
+ type: array
+ items:
+ type: string
+ format: binary
+ required:
+ - files
responses:
200:
description: Edit Data request accepted
diff --git a/packages/data-provider/package.json b/packages/data-provider/package.json
index 345a499a..c7aec650 100644
--- a/packages/data-provider/package.json
+++ b/packages/data-provider/package.json
@@ -37,10 +37,14 @@
"@overture-stack/lectern-client": "2.0.0-beta.4",
"@overture-stack/lyric-data-model": "workspace:^",
"@overture-stack/sqon-builder": "^1.1.0",
+ "bytes": "^3.1.2",
+ "csv-parse": "^5.6.0",
"dotenv": "^16.4.5",
"drizzle-orm": "^0.29.5",
"express": "^4.19.2",
+ "firstline": "^2.0.2",
"lodash-es": "^4.17.21",
+ "multer": "^2.0.1",
"nanoid": "^5.0.7",
"pg": "^8.12.0",
"plur": "^5.1.0",
@@ -48,12 +52,15 @@
"zod": "^3.23.8"
},
"devDependencies": {
+ "@types/bytes": "^3.1.5",
"@types/chai-as-promised": "^8.0.1",
"@types/deep-freeze": "^0.1.5",
"@types/express": "^4.17.21",
"@types/express-serve-static-core": "^4.19.5",
+ "@types/firstline": "^2.0.4",
"@types/lodash": "^4.17.7",
"@types/lodash-es": "^4.17.12",
+ "@types/multer": "^1.4.13",
"@types/pg": "^8.11.6",
"@types/qs": "^6.9.15",
"chai-as-promised": "^8.0.0",
diff --git a/packages/data-provider/src/config/config.ts b/packages/data-provider/src/config/config.ts
index bbeb0113..f488054b 100644
--- a/packages/data-provider/src/config/config.ts
+++ b/packages/data-provider/src/config/config.ts
@@ -20,6 +20,10 @@ export type FeaturesConfig = {
recordHierarchy: RecordHierarchyConfig;
};
+export type FilesConfig = {
+ limitSize: string;
+};
+
export type SchemaServiceConfig = {
url: string;
};
@@ -43,6 +47,7 @@ export type AppConfig = {
auth: AuthConfig;
db: DbConfig;
features?: FeaturesConfig;
+ files: FilesConfig;
idService: IdServiceConfig;
logger: LoggerConfig;
onFinishCommit?: (resultOnCommit: ResultOnCommit) => void;
diff --git a/packages/data-provider/src/controllers/submissionController.ts b/packages/data-provider/src/controllers/submissionController.ts
index 424a29be..62206a80 100644
--- a/packages/data-provider/src/controllers/submissionController.ts
+++ b/packages/data-provider/src/controllers/submissionController.ts
@@ -2,10 +2,13 @@ import { isEmpty } from 'lodash-es';
import { BaseDependencies } from '../config/config.js';
import { type AuthConfig, shouldBypassAuth } from '../middleware/auth.js';
+import categoryRepository from '../repository/categoryRepository.js';
import submissionService from '../services/submission/submission.js';
import submittedDataService from '../services/submittedData/submmittedData.js';
import { hasUserWriteAccess } from '../utils/authUtils.js';
+import { getSchemaByName } from '../utils/dictionaryUtils.js';
import { BadRequest, Forbidden, NotFound } from '../utils/errors.js';
+import { extractEntityNameFromFileName, parseFileToRecords, prevalidateDataFile } from '../utils/files.js';
import { validateRequest } from '../utils/requestValidation.js';
import {
dataDeleteBySystemIdRequestSchema,
@@ -18,7 +21,13 @@ import {
submissionsByCategoryRequestSchema,
uploadSubmissionRequestSchema,
} from '../utils/schemas.js';
-import { SUBMISSION_ACTION_TYPE } from '../utils/types.js';
+import {
+ BATCH_ERROR_TYPE,
+ type BatchError,
+ CREATE_SUBMISSION_STATUS,
+ type EntityData,
+ SUBMISSION_ACTION_TYPE,
+} from '../utils/types.js';
const controller = ({
baseDependencies,
@@ -99,7 +108,7 @@ const controller = ({
logger.info(
LOG_MODULE,
- `Request Delete '${entityName ? entityName : 'all'}' records on '{${actionType}}' Active Submission '${submissionId}'`,
+ `Request Delete '${entityName ? entityName : 'all'}' records on '${actionType}' Active Submission '${submissionId}'`,
);
const submission = await service.getSubmissionById(submissionId);
@@ -164,28 +173,75 @@ const controller = ({
editSubmittedData: validateRequest(dataEditRequestSchema, async (req, res, next) => {
try {
const categoryId = Number(req.params.categoryId);
- const entityName = req.query.entityName;
const organization = req.query.organization;
- const payload = req.body;
+ const files = Array.isArray(req.files) ? req.files : [];
const user = req.user;
+ const username = user?.username || '';
- logger.info(LOG_MODULE, `Request Edit Submitted Data`);
-
- if (!payload || payload.length == 0) {
- throw new BadRequest(
- 'The "payload" parameter is missing or empty. Please include the records in the request for processing.',
- );
- }
+ logger.info(
+ LOG_MODULE,
+ `Request Edit Submitted Data: categoryId '${categoryId}'`,
+ ` organization '${organization}'`,
+ ` files '${files.length}'`,
+ );
if (!shouldBypassAuth(req, authConfig) && !hasUserWriteAccess(organization, user)) {
throw new Forbidden(`User is not authorized to edit data from '${organization}'`);
}
- const username = user?.username || '';
+ const { getActiveDictionaryByCategory } = categoryRepository(baseDependencies);
+ const currentDictionary = await getActiveDictionaryByCategory(categoryId);
+ if (!currentDictionary) {
+ throw new BadRequest(`Dictionary in category '${categoryId}' not found`);
+ }
+
+ const fileErrors: BatchError[] = [];
+ const entityData: EntityData = {};
+
+ for (const file of files) {
+ try {
+ // Step 1 prevalidation: validate filename matches the schema name in the dictionary
+ const entityName = extractEntityNameFromFileName(file.originalname);
+ const schema = getSchemaByName(entityName, currentDictionary);
+ if (!schema || !entityName) {
+ fileErrors.push({
+ type: BATCH_ERROR_TYPE.INVALID_FILE_NAME,
+ message: `Invalid entity name for submission`,
+ batchName: file.originalname,
+ });
+ continue;
+ }
+
+ // Step 2 prevalidation: validate file extension is accepted and check required column names based on schema
+ const { error } = await prevalidateDataFile(file, schema, true);
+ if (error) {
+ fileErrors.push(error);
+ continue;
+ }
+
+ // Converts TSV/CSV file into a JSON object format
+ const extractedData = await parseFileToRecords(file, schema);
+ entityData[entityName] = extractedData;
+ } catch (error) {
+ logger.error(LOG_MODULE, `Error processing file`, error);
+ }
+ }
+
+ if (fileErrors.length > 0) {
+ logger.info(
+ LOG_MODULE,
+ 'Submission could not be processed because some files contain errors',
+ JSON.stringify(fileErrors),
+ );
+ return res.status(200).send({
+ status: CREATE_SUBMISSION_STATUS.INVALID_SUBMISSION,
+ batchErrors: fileErrors,
+ inProcessEntities: [],
+ });
+ }
const editSubmittedDataResult = await dataService.editSubmittedData({
- records: payload,
- entityName,
+ data: entityData,
categoryId,
organization,
username,
@@ -287,40 +343,83 @@ const controller = ({
submit: validateRequest(uploadSubmissionRequestSchema, async (req, res, next) => {
try {
const categoryId = Number(req.params.categoryId);
- const entityName = req.query.entityName;
const organization = req.query.organization;
- const payload = req.body;
+ const files = Array.isArray(req.files) ? req.files : [];
const user = req.user;
+ const username = user?.username || '';
logger.info(
LOG_MODULE,
`Submission Request: categoryId '${categoryId}'`,
` organization '${organization}'`,
- ` entityName '${entityName}'`,
+ ` files '${files.length}'`,
);
- if (!payload || payload.length == 0) {
- throw new BadRequest(
- 'The "payload" parameter is missing or empty. Please include the records in the request for processing.',
- );
- }
-
if (!shouldBypassAuth(req, authConfig) && !hasUserWriteAccess(organization, user)) {
throw new Forbidden(`User is not authorized to submit data to '${organization}'`);
}
- const username = user?.username || '';
+ const { getActiveDictionaryByCategory } = categoryRepository(baseDependencies);
+ const currentDictionary = await getActiveDictionaryByCategory(categoryId);
+ if (!currentDictionary) {
+ throw new BadRequest(`Dictionary in category '${categoryId}' not found`);
+ }
+
+ const fileErrors: BatchError[] = [];
+ const entityData: EntityData = {};
+
+ for (const file of files) {
+ try {
+ // Step 1 prevalidation: validate filename matches the schema name in the dictionary
+ const entityName = extractEntityNameFromFileName(file.originalname);
+ const schema = getSchemaByName(entityName, currentDictionary);
+ if (!schema || !entityName) {
+ fileErrors.push({
+ type: BATCH_ERROR_TYPE.INVALID_FILE_NAME,
+ message: `Invalid entity name for submission`,
+ batchName: file.originalname,
+ });
+ continue;
+ }
+
+ // Step 2 prevalidation: validate file extension is accepted and check required column names based on schema
+ const { error } = await prevalidateDataFile(file, schema);
+ if (error) {
+ fileErrors.push(error);
+ continue;
+ }
+
+ // Converts TSV/CSV file into a JSON object format
+ const extractedData = await parseFileToRecords(file, schema);
+ entityData[entityName] = extractedData;
+ } catch (error) {
+ logger.error(LOG_MODULE, `Error processing file`, error);
+ }
+ }
+
+ if (fileErrors.length > 0) {
+ logger.info(LOG_MODULE, 'Submission could not be processed because some files contain errors', fileErrors);
+ return res.status(200).send({
+ status: CREATE_SUBMISSION_STATUS.INVALID_SUBMISSION,
+ batchErrors: fileErrors,
+ inProcessEntities: [],
+ });
+ }
+ // Send submission data, organized by entity.
const resultSubmission = await service.submit({
- records: payload,
- entityName,
+ data: entityData,
categoryId,
organization,
username,
});
- // This response provides the details of data Submission
- return res.status(200).send(resultSubmission);
+ // This response provides the details of file Submission
+ return res.status(200).send({
+ submissionId: resultSubmission.submissionId,
+ status: resultSubmission.status,
+ inProcessEntities: Object.keys(entityData),
+ });
} catch (error) {
next(error);
}
diff --git a/packages/data-provider/src/core/provider.ts b/packages/data-provider/src/core/provider.ts
index eea65e23..9189c747 100644
--- a/packages/data-provider/src/core/provider.ts
+++ b/packages/data-provider/src/core/provider.ts
@@ -52,7 +52,11 @@ const provider = (configData: AppConfig) => {
audit: auditRouter({ baseDependencies: baseDeps, authConfig: configData.auth }),
category: categoryRouter({ baseDependencies: baseDeps, authConfig: configData.auth }),
dictionary: dictionaryRouter({ baseDependencies: baseDeps, authConfig: configData.auth }),
- submission: submissionRouter({ baseDependencies: baseDeps, authConfig: configData.auth }),
+ submission: submissionRouter({
+ baseDependencies: baseDeps,
+ authConfig: configData.auth,
+ filesConfig: configData.files,
+ }),
submittedData: submittedDataRouter({ baseDependencies: baseDeps, authConfig: configData.auth }),
},
controllers: {
diff --git a/packages/data-provider/src/middleware/fileFilter.ts b/packages/data-provider/src/middleware/fileFilter.ts
new file mode 100644
index 00000000..0edadb17
--- /dev/null
+++ b/packages/data-provider/src/middleware/fileFilter.ts
@@ -0,0 +1,37 @@
+import { Request } from 'express';
+import type { FileFilterCallback } from 'multer';
+
+import { BadRequest } from '../utils/errors.js';
+import { getValidFileExtension, SUPPORTED_FILE_EXTENSIONS } from '../utils/files.js';
+
+/**
+ * Middleware function for filtering uploaded files in a Multer-based file upload.
+ *
+ * Validates that the request contains files and that each file has a valid extension.
+ * If the validation fails, it invokes the callback with a `BadRequest` error.
+ * Otherwise, it allows the file to be processed.
+ *
+ * @param req - The Express request object, expected to contain uploaded files.
+ * @param file - The file object provided by Multer for the current file being processed.
+ * @param cb - The callback function to signal acceptance or rejection of the file.
+ *
+ * @throws {BadRequest} If the "files" parameter is missing or empty, or if the file extension is invalid.
+ */
+export const fileFilter = (req: Request, file: Express.Multer.File, cb: FileFilterCallback) => {
+ const files = Array.isArray(req.files) ? req.files : [];
+ if (!files || files.length === 0) {
+ cb(
+ new BadRequest('The "files" parameter is missing or empty. Please include files in the request for processing.'),
+ );
+ }
+
+ if (!getValidFileExtension(file.originalname)) {
+ return cb(
+ new BadRequest(
+ `File '${file.originalname}' has invalid file extension. File extension must be '${SUPPORTED_FILE_EXTENSIONS.options}'.`,
+ ),
+ );
+ }
+
+ cb(null, true);
+};
diff --git a/packages/data-provider/src/repository/categoryRepository.ts b/packages/data-provider/src/repository/categoryRepository.ts
index 85b9d492..9bb58a20 100644
--- a/packages/data-provider/src/repository/categoryRepository.ts
+++ b/packages/data-provider/src/repository/categoryRepository.ts
@@ -1,11 +1,11 @@
import { eq } from 'drizzle-orm/sql';
-import { ListAllCategoriesResponse } from 'src/utils/types.js';
import { Dictionary as SchemasDictionary } from '@overture-stack/lectern-client';
import { Category, Dictionary, dictionaryCategories, NewCategory } from '@overture-stack/lyric-data-model/models';
import { BaseDependencies } from '../config/config.js';
import { ServiceUnavailable } from '../utils/errors.js';
+import { ListAllCategoriesResponse } from '../utils/types.js';
const repository = (dependencies: BaseDependencies) => {
const LOG_MODULE = 'CATEGORY_REPOSITORY';
diff --git a/packages/data-provider/src/routers/submissionRouter.ts b/packages/data-provider/src/routers/submissionRouter.ts
index 7878550e..8a05c0a6 100644
--- a/packages/data-provider/src/routers/submissionRouter.ts
+++ b/packages/data-provider/src/routers/submissionRouter.ts
@@ -1,16 +1,27 @@
import { json, Router, urlencoded } from 'express';
+import multer from 'multer';
-import { BaseDependencies } from '../config/config.js';
+import { BaseDependencies, FilesConfig } from '../config/config.js';
import submissionController from '../controllers/submissionController.js';
import { type AuthConfig, authMiddleware } from '../middleware/auth.js';
+import { fileFilter } from '../middleware/fileFilter.js';
+import { getSizeInBytes } from '../utils/files.js';
const router = ({
baseDependencies,
authConfig,
+ filesConfig,
}: {
baseDependencies: BaseDependencies;
authConfig: AuthConfig;
+ filesConfig: FilesConfig;
}): Router => {
+ const fileSizeLimit = getSizeInBytes(filesConfig.limitSize);
+ const upload = multer({
+ dest: '/tmp',
+ limits: { fileSize: fileSizeLimit },
+ fileFilter,
+ });
const router = Router();
router.use(urlencoded({ extended: false }));
router.use(json());
@@ -59,6 +70,7 @@ const router = ({
router.post(
'/category/:categoryId/data',
+ upload.array('files'),
submissionController({
baseDependencies,
authConfig,
@@ -75,6 +87,7 @@ const router = ({
router.put(
`/category/:categoryId/data`,
+ upload.array('files'),
submissionController({
baseDependencies,
authConfig,
diff --git a/packages/data-provider/src/services/submission/processor.ts b/packages/data-provider/src/services/submission/processor.ts
index e7e5464b..b92ffff1 100644
--- a/packages/data-provider/src/services/submission/processor.ts
+++ b/packages/data-provider/src/services/submission/processor.ts
@@ -1,6 +1,6 @@
import * as _ from 'lodash-es';
-import { type DataRecord, DictionaryValidationRecordErrorDetails, type Schema } from '@overture-stack/lectern-client';
+import { type DataRecord, DictionaryValidationRecordErrorDetails } from '@overture-stack/lectern-client';
import {
Submission,
SubmissionData,
@@ -13,11 +13,12 @@ import {
import { BaseDependencies } from '../../config/config.js';
import submissionRepository from '../../repository/activeSubmissionRepository.js';
import categoryRepository from '../../repository/categoryRepository.js';
-import dictionaryRepository from '../../repository/dictionaryRepository.js';
import submittedRepository from '../../repository/submittedRepository.js';
import { getDictionarySchemaRelations, type SchemaChildNode } from '../../utils/dictionarySchemaRelations.js';
+import { validateSchemas } from '../../utils/dictionaryUtils.js';
import { BadRequest } from '../../utils/errors.js';
-import { convertRecordToString } from '../../utils/formatUtils.js';
+import { mergeDeleteRecords, mergeInsertsRecords, mergeUpdatesBySystemId } from '../../utils/mergeRecords.js';
+import { parseRecordsToEdit, parseRecordsToInsert } from '../../utils/recordsParser.js';
import {
extractSchemaDataFromMergedDataRecords,
filterDeletesFromUpdates,
@@ -26,12 +27,7 @@ import {
groupSchemaErrorsByEntity,
mapGroupedUpdateSubmissionData,
mergeAndReferenceEntityData,
- mergeDeleteRecords,
- mergeInsertsRecords,
- mergeUpdatesBySystemId,
- parseToSchema,
segregateFieldChangeRecords,
- validateSchemas,
} from '../../utils/submissionUtils.js';
import {
computeDataDiff,
@@ -44,9 +40,10 @@ import {
} from '../../utils/submittedDataUtils.js';
import {
CommitSubmissionParams,
+ type EntityData,
+ type SchemasDictionary,
SUBMISSION_STATUS,
type SubmittedDataResponse,
- type ValidateFilesParams,
} from '../../utils/types.js';
import searchDataRelations from '../submittedData/searchDataRelations.js';
@@ -431,14 +428,9 @@ const processor = (dependencies: BaseDependencies) => {
logger.info(LOG_MODULE, `Errors detected in data submission:${errorMessage}`);
}
- // Update Active Submission
- return await updateActiveSubmission({
+ // Update validation results for the active submission
+ return await updateValidationResultForSubmission({
idActiveSubmission: originalSubmission.id,
- submissionData: {
- inserts: submissionData.inserts,
- deletes: submissionData.deletes,
- updates: submissionData.updates,
- },
schemaErrors: submissionSchemaErrors,
dictionaryId: currentDictionary.id,
username,
@@ -448,44 +440,55 @@ const processor = (dependencies: BaseDependencies) => {
/**
* Void function to process and validate uploaded records on an Active Submission.
* Performs the schema data validation of data to be edited combined with all Submitted Data.
- * @param records Records to be processed
+ * @param records A map of entity names to arrays of raw records to be processed.
* @param params
- * @param params.schema Schema to parse data with
- * @param params.submission A `Submission` object representing the Active Submission
+ * @param params.schemasDictionary A dictionary of schema definitions used for record validation.
+ * @param params.submissionId Submission ID
* @param params.username User who performs the action
*/
const processEditRecordsAsync = async (
- records: Record[],
+ records: EntityData,
{
- schema,
- submission,
+ schemasDictionary,
+ submissionId,
username,
}: {
- schema: Schema;
- submission: Submission;
+ schemasDictionary: SchemasDictionary;
+ submissionId: number;
username: string;
},
): Promise => {
- const { getDictionaryById } = dictionaryRepository(dependencies);
+ const { getSubmissionById, update } = submissionRepository(dependencies);
try {
- // Parse file data
- const recordsParsed = records.map(convertRecordToString).map(parseToSchema(schema));
+ const mapRecordsParsed = parseRecordsToEdit(records, schemasDictionary);
- const filesDataProcessed = await compareUpdatedData(recordsParsed);
-
- const currentDictionary = await getDictionaryById(submission.dictionaryId);
- if (!currentDictionary) {
- throw new BadRequest(`Dictionary in category '${submission.dictionaryCategoryId}' not found`);
+ if (Object.keys(mapRecordsParsed).length === 0) {
+ throw new Error('No entities to edit on this submission');
}
+ const mapDataProcessed = Object.fromEntries(
+ await Promise.all(
+ Object.entries(mapRecordsParsed).map(async ([schemaName, dataRecords]) => {
+ const filesDataProcessed = await compareUpdatedData(dataRecords);
+ return [schemaName, filesDataProcessed];
+ }),
+ ),
+ );
+
// get dictionary relations
- const dictionaryRelations = getDictionarySchemaRelations(currentDictionary.dictionary);
+ const dictionaryRelations = getDictionarySchemaRelations(schemasDictionary.schemas);
+
+ // Get Active Submission from database
+ const activeSubmission = await getSubmissionById(submissionId);
+ if (!activeSubmission) {
+ throw new Error(`Submission '${activeSubmission}' not found`);
+ }
const foundDependentUpdates = await findUpdateDependents({
dictionaryRelations,
- organization: submission.organization,
- submissionUpdateData: { [schema.name]: filesDataProcessed },
+ organization: activeSubmission.organization,
+ submissionUpdateData: mapDataProcessed,
});
const systemIdsWithDependents: string[] = [];
@@ -511,14 +514,14 @@ const processor = (dependencies: BaseDependencies) => {
// Identify what requested updates involves ID and nonID field changes
const { idFieldChangeRecord, nonIdFieldChangeRecord } = segregateFieldChangeRecords(
- { [schema.name]: filesDataProcessed },
+ mapDataProcessed,
dictionaryRelations,
);
// Aggegates all Update changes on Submission
// Note: We do not include records involving primary ID fields changes in here. We would rather do a DELETE and an INSERT
const updatedActiveSubmissionData: Record = mergeUpdatesBySystemId(
- submission.data.updates ?? {},
+ activeSubmission.data.updates ?? {},
totalDependants,
nonIdFieldChangeRecord,
);
@@ -527,30 +530,34 @@ const processor = (dependencies: BaseDependencies) => {
const additions = await handleIdFieldChanges(idFieldChangeRecord);
// Merge Active Submission Inserts with Edit generated new Inserts
- const mergedInserts = mergeInsertsRecords(submission.data.inserts ?? {}, additions.inserts);
+ const mergedInserts = mergeInsertsRecords(activeSubmission.data.inserts ?? {}, additions.inserts);
// Merge Active Submission Deletes with Edit generated new Deletes
- const mergedDeletes = mergeDeleteRecords(submission.data.deletes ?? {}, additions.deletes);
+ const mergedDeletes = mergeDeleteRecords(activeSubmission.data.deletes ?? {}, additions.deletes);
// filter out delete records found on update records
const filteredDeletes = filterDeletesFromUpdates(mergedDeletes, updatedActiveSubmissionData);
+ // Result merge Active Submission data with incoming TSV file data processed
+ const mergedSubmissionData: SubmissionData = {
+ inserts: mergedInserts,
+ deletes: filteredDeletes,
+ updates: updatedActiveSubmissionData,
+ };
+
+ await update(activeSubmission.id, {
+ data: mergedSubmissionData,
+ updatedBy: username,
+ });
+
// Perform Schema Data validation Async.
performDataValidation({
- originalSubmission: submission,
- submissionData: {
- inserts: mergedInserts,
- deletes: filteredDeletes,
- updates: updatedActiveSubmissionData,
- },
+ originalSubmission: activeSubmission,
+ submissionData: mergedSubmissionData,
username,
});
} catch (error) {
- logger.error(
- LOG_MODULE,
- `There was an error processing records on entity '${schema.name}'`,
- JSON.stringify(error),
- );
+ logger.error(LOG_MODULE, `There was an error processing records on this submission`, JSON.stringify(error));
}
logger.info(LOG_MODULE, `Finished validating files`);
};
@@ -594,29 +601,27 @@ const processor = (dependencies: BaseDependencies) => {
};
/**
- * Update Active Submission in database
+ * Store validation results for the active submission in the database.
+ * IMPORTANT: Submission data is not updated
* @param {Object} input
* @param {number} input.dictionaryId The Dictionary ID of the Submission
- * @param {SubmissionData} input.submissionData Data to be submitted grouped on inserts, updates and deletes
* @param {number} input.idActiveSubmission ID of the Active Submission
* @param {Record>} input.schemaErrors Array of schemaErrors
* @param {string} input.username User updating the active submission
* @returns {Promise} An Active Submission updated
*/
- const updateActiveSubmission = async (input: {
+ const updateValidationResultForSubmission = async (input: {
dictionaryId: number;
- submissionData: SubmissionData;
idActiveSubmission: number;
schemaErrors: Record>;
username: string;
}): Promise => {
- const { dictionaryId, submissionData, idActiveSubmission, schemaErrors, username } = input;
+ const { dictionaryId, idActiveSubmission, schemaErrors, username } = input;
const { update } = submissionRepository(dependencies);
const newStatusSubmission =
Object.keys(schemaErrors).length > 0 ? SUBMISSION_STATUS.INVALID : SUBMISSION_STATUS.VALID;
- // Update with new data
+ // Update validation results only — submission data has already been updated.
const updatedActiveSubmission = await update(idActiveSubmission, {
- data: submissionData,
status: newStatusSubmission,
dictionaryId: dictionaryId,
updatedBy: username,
@@ -631,54 +636,63 @@ const processor = (dependencies: BaseDependencies) => {
};
/**
- * Void function to process and validate records on an Active Submission.
- * Performs the schema data validation combined with all Submitted Data.
- * @param {Record} records Records to be processed
- * @param {Object} params
- * @param {number} params.categoryId Category Identifier
- * @param {string} params.organization Organization name
- * @param {Schema} params.schema Schema to validate records with
- * @param {string} params.username User who performs the action
- * @returns {void}
+ * Processes and validates a batch of incoming records for an active submission.
+ * This function updates the submission merging the new records with existing submission data.
+ * Performs a full schema data validation against the combined dataset
+ * @param params
+ * @param params.records A map of entity names to arrays of raw records to be processed.
+ * @param params.schemasDictionary A dictionary of schema definitions used for record validation.
+ * @param params.submissionId Submission ID
+ * @param params.username User who performs the action
+ * @returns
*/
- const validateRecordsAsync = async (records: Record[], params: ValidateFilesParams) => {
- const { getActiveSubmission } = submissionRepository(dependencies);
-
- const { categoryId, organization, username, schema } = params;
+ const processInsertRecordsAsync = async ({
+ records,
+ schemasDictionary,
+ submissionId,
+ username,
+ }: {
+ records: EntityData;
+ schemasDictionary: SchemasDictionary;
+ submissionId: number;
+ username: string;
+ }) => {
+ const { getSubmissionById, update } = submissionRepository(dependencies);
try {
// Get Active Submission from database
- const activeSubmission = await getActiveSubmission({ categoryId, username, organization });
+ const activeSubmission = await getSubmissionById(submissionId);
if (!activeSubmission) {
- throw new BadRequest(`Submission '${activeSubmission}' not found`);
+ throw new Error(`Submission '${activeSubmission}' not found`);
}
- const recordsParsed = records.map(convertRecordToString).map(parseToSchema(schema));
+ const insertRecords = parseRecordsToInsert(records, schemasDictionary);
- const insertRecords: Record = {
- [schema.name]: {
- batchName: schema.name,
- records: recordsParsed,
- },
+ // Merge Active Submission insert records with incoming TSV file data processed
+ const insertActiveSubmissionData = mergeInsertsRecords(activeSubmission.data.inserts ?? {}, insertRecords);
+
+ // Result merged submission Data
+ const mergedSubmissionData: SubmissionData = {
+ inserts: insertActiveSubmissionData,
+ deletes: activeSubmission.data.deletes,
+ updates: activeSubmission.data.updates,
};
- // Merge Active Submission data with incoming TSV file data processed
- const insertActiveSubmissionData = mergeInsertsRecords(activeSubmission.data.inserts ?? {}, insertRecords);
+ await update(activeSubmission.id, {
+ data: mergedSubmissionData,
+ updatedBy: username,
+ });
// Perform Schema Data validation Async.
await performDataValidation({
originalSubmission: activeSubmission,
- submissionData: {
- inserts: insertActiveSubmissionData,
- deletes: activeSubmission.data.deletes,
- updates: activeSubmission.data.updates,
- },
+ submissionData: mergedSubmissionData,
username,
});
} catch (error) {
logger.error(
LOG_MODULE,
- `There was an error processing records on entity '${schema.name}'`,
+ `There was an error processing records on submission '${submissionId}'`,
JSON.stringify(error),
);
}
@@ -689,8 +703,8 @@ const processor = (dependencies: BaseDependencies) => {
processEditRecordsAsync,
performCommitSubmissionAsync,
performDataValidation,
- updateActiveSubmission,
- validateRecordsAsync,
+ updateValidationResultForSubmission,
+ processInsertRecordsAsync,
};
};
diff --git a/packages/data-provider/src/services/submission/submission.ts b/packages/data-provider/src/services/submission/submission.ts
index 71c6e636..8c00bf05 100644
--- a/packages/data-provider/src/services/submission/submission.ts
+++ b/packages/data-provider/src/services/submission/submission.ts
@@ -8,17 +8,18 @@ import systemIdGenerator from '../../external/systemIdGenerator.js';
import submissionRepository from '../../repository/activeSubmissionRepository.js';
import categoryRepository from '../../repository/categoryRepository.js';
import submittedRepository from '../../repository/submittedRepository.js';
+import { getSchemaByName } from '../../utils/dictionaryUtils.js';
import { BadRequest, InternalServerError, StatusConflict } from '../../utils/errors.js';
import {
- canTransitionToClosed,
- parseSubmissionResponse,
+ parseSubmissionDetailsResponse,
parseSubmissionSummaryResponse,
- removeItemsFromSubmission,
-} from '../../utils/submissionUtils.js';
+} from '../../utils/submissionResponseParser.js';
+import { canTransitionToClosed, removeItemsFromSubmission } from '../../utils/submissionUtils.js';
import {
CommitSubmissionResult,
CREATE_SUBMISSION_STATUS,
type CreateSubmissionResult,
+ type EntityData,
type PaginationOptions,
SUBMISSION_ACTION_TYPE,
SUBMISSION_STATUS,
@@ -187,7 +188,7 @@ const service = (dependencies: BaseDependencies) => {
index: number | null;
},
): Promise => {
- const { getSubmissionById } = submissionRepository(dependencies);
+ const { getSubmissionById, update } = submissionRepository(dependencies);
const submission = await getSubmissionById(submissionId);
if (!submission) {
@@ -220,6 +221,11 @@ const service = (dependencies: BaseDependencies) => {
...filter,
});
+ await update(submission.id, {
+ data: updatedActiveSubmissionData,
+ updatedBy: username,
+ });
+
const updatedRecord = await performDataValidation({
originalSubmission: submission,
submissionData: updatedActiveSubmissionData,
@@ -227,7 +233,6 @@ const service = (dependencies: BaseDependencies) => {
});
logger.info(LOG_MODULE, `Submission '${updatedRecord.id}' updated with new status '${updatedRecord.status}'`);
-
return updatedRecord;
};
@@ -289,7 +294,7 @@ const service = (dependencies: BaseDependencies) => {
return;
}
- return parseSubmissionResponse(submission);
+ return parseSubmissionDetailsResponse(submission);
};
/**
@@ -363,37 +368,35 @@ const service = (dependencies: BaseDependencies) => {
/**
* Validates and Creates the Entities Schemas of the Active Submission and stores it in the database
* @param {object} params
- * @param {Record[]} params.records An array of records
- * @param {string} params.entityName Entity Name of the Records
+ * @param {EntityData} params.data Data to be processed
* @param {number} params.categoryId Category ID of the Submission
* @param {string} params.organization Organization name
* @param {string} params.username User name creating the Submission
* @returns The Active Submission created or Updated
*/
const submit = async ({
- records,
- entityName,
+ data,
categoryId,
organization,
username,
}: {
- records: Record[];
- entityName: string;
+ data: EntityData;
categoryId: number;
organization: string;
username: string;
}): Promise => {
+ const entityNames = Object.keys(data);
logger.info(
LOG_MODULE,
- `Processing '${records.length}' records on category id '${categoryId}' organization '${organization}'`,
+ `Processing '${entityNames.length}' entities on category id '${categoryId}' organization '${organization}'`,
);
const { getActiveDictionaryByCategory } = categoryRepository(dependencies);
- const { validateRecordsAsync } = processor(dependencies);
+ const { processInsertRecordsAsync } = processor(dependencies);
- if (records.length === 0) {
+ if (entityNames.length === 0) {
return {
status: CREATE_SUBMISSION_STATUS.INVALID_SUBMISSION,
- description: 'No valid records for submission',
+ description: 'No valid data for submission',
};
}
@@ -413,23 +416,23 @@ const service = (dependencies: BaseDependencies) => {
};
// Validate entity name
- const entitySchema = schemasDictionary.schemas.find((item) => item.name === entityName);
- if (!entitySchema) {
+ const invalidEntities = entityNames.filter((name) => !getSchemaByName(name, schemasDictionary));
+ if (invalidEntities.length) {
return {
status: CREATE_SUBMISSION_STATUS.INVALID_SUBMISSION,
- description: `Invalid entity name ${entityName} for submission`,
+ description: `Invalid entity name '${invalidEntities}' for submission`,
};
}
// Get Active Submission or Open a new one
const activeSubmission = await getOrCreateActiveSubmission({ categoryId, username, organization });
- // Running Schema validation in the background do not need to wait
- // Result of validations will be stored in database
- validateRecordsAsync(records, {
- categoryId,
- organization,
- schema: entitySchema,
+ // Schema validation runs asynchronously and does not block execution.
+ // The results will be saved to the database.
+ processInsertRecordsAsync({
+ records: data,
+ submissionId: activeSubmission.id,
+ schemasDictionary,
username,
});
diff --git a/packages/data-provider/src/services/submittedData/submmittedData.ts b/packages/data-provider/src/services/submittedData/submmittedData.ts
index f0981634..45eade31 100644
--- a/packages/data-provider/src/services/submittedData/submmittedData.ts
+++ b/packages/data-provider/src/services/submittedData/submmittedData.ts
@@ -1,14 +1,18 @@
import * as _ from 'lodash-es';
import type { Dictionary as SchemasDictionary } from '@overture-stack/lectern-client';
+import type { SubmissionData } from '@overture-stack/lyric-data-model/models';
import { SQON } from '@overture-stack/sqon-builder';
import { BaseDependencies } from '../../config/config.js';
+import submissionRepository from '../../repository/activeSubmissionRepository.js';
import categoryRepository from '../../repository/categoryRepository.js';
import submittedRepository from '../../repository/submittedRepository.js';
import { convertSqonToQuery } from '../../utils/convertSqonToQuery.js';
import { getDictionarySchemaRelations } from '../../utils/dictionarySchemaRelations.js';
-import { filterUpdatesFromDeletes, mergeDeleteRecords } from '../../utils/submissionUtils.js';
+import { getSchemaByName } from '../../utils/dictionaryUtils.js';
+import { mergeDeleteRecords } from '../../utils/mergeRecords.js';
+import { filterUpdatesFromDeletes } from '../../utils/submissionUtils.js';
import {
fetchDataErrorResponse,
getEntityNamesFromFilterOptions,
@@ -17,6 +21,7 @@ import {
import {
CREATE_SUBMISSION_STATUS,
type CreateSubmissionStatus,
+ type EntityData,
PaginationOptions,
SubmittedDataResponse,
VIEW_TYPE,
@@ -52,6 +57,7 @@ const submittedData = (dependencies: BaseDependencies) => {
const { getSubmittedDataBySystemId } = submittedDataRepo;
const { getActiveDictionaryByCategory } = categoryRepository(dependencies);
const { getOrCreateActiveSubmission } = submissionService(dependencies);
+ const { update } = submissionRepository(dependencies);
const { performDataValidation } = processor(dependencies);
// get SubmittedData by SystemId
@@ -116,14 +122,22 @@ const submittedData = (dependencies: BaseDependencies) => {
// filter out update records found matching systemID on delete records
const filteredUpdates = filterUpdatesFromDeletes(activeSubmission.data.updates ?? {}, mergedSubmissionDeletes);
+ // Result merged submissionData
+ const mergedSubmissionData: SubmissionData = {
+ inserts: activeSubmission.data.inserts,
+ updates: filteredUpdates,
+ deletes: mergedSubmissionDeletes,
+ };
+
+ await update(activeSubmission.id, {
+ data: mergedSubmissionData,
+ updatedBy: username,
+ });
+
// Validate and update Active Submission
performDataValidation({
originalSubmission: activeSubmission,
- submissionData: {
- inserts: activeSubmission.data.inserts,
- updates: filteredUpdates,
- deletes: mergedSubmissionDeletes,
- },
+ submissionData: mergedSubmissionData,
username,
});
@@ -139,33 +153,32 @@ const submittedData = (dependencies: BaseDependencies) => {
const editSubmittedData = async ({
categoryId,
- entityName,
+ data,
organization,
- records,
username,
}: {
categoryId: number;
- entityName: string;
+ data: EntityData;
organization: string;
- records: Record[];
username: string;
}): Promise<{
description?: string;
submissionId?: number;
status: string;
}> => {
+ const entityNames = Object.keys(data);
logger.info(
LOG_MODULE,
- `Processing '${records.length}' records on category id '${categoryId}' organization '${organization}'`,
+ `Processing '${entityNames.length}' entities on category id '${categoryId}' organization '${organization}'`,
);
const { getActiveDictionaryByCategory } = categoryRepository(dependencies);
const { getOrCreateActiveSubmission } = submissionService(dependencies);
const { processEditRecordsAsync } = processor(dependencies);
- if (records.length === 0) {
+ if (entityNames.length === 0) {
return {
status: CREATE_SUBMISSION_STATUS.INVALID_SUBMISSION,
- description: 'No valid records for submission',
+ description: 'No valid data for submission',
};
}
@@ -185,11 +198,11 @@ const submittedData = (dependencies: BaseDependencies) => {
};
// Validate entity name
- const entitySchema = schemasDictionary.schemas.find((item) => item.name === entityName);
- if (!entitySchema) {
+ const invalidEntities = entityNames.filter((name) => !getSchemaByName(name, schemasDictionary));
+ if (invalidEntities.length) {
return {
status: CREATE_SUBMISSION_STATUS.INVALID_SUBMISSION,
- description: `Invalid entity name ${entityName} for submission`,
+ description: `Invalid entity name '${invalidEntities}' for submission`,
};
}
@@ -198,9 +211,9 @@ const submittedData = (dependencies: BaseDependencies) => {
// Running Schema validation in the background do not need to wait
// Result of validations will be stored in database
- processEditRecordsAsync(records, {
- submission: activeSubmission,
- schema: entitySchema,
+ processEditRecordsAsync(data, {
+ submissionId: activeSubmission.id,
+ schemasDictionary,
username,
});
diff --git a/packages/data-provider/src/services/submittedData/viewMode.ts b/packages/data-provider/src/services/submittedData/viewMode.ts
index 4cf34a5c..2c26cff9 100644
--- a/packages/data-provider/src/services/submittedData/viewMode.ts
+++ b/packages/data-provider/src/services/submittedData/viewMode.ts
@@ -3,8 +3,8 @@ import type { Schema } from '@overture-stack/lectern-client';
import type { BaseDependencies } from '../../config/config.js';
import submittedRepository from '../../repository/submittedRepository.js';
import { generateHierarchy, type TreeNode } from '../../utils/dictionarySchemaRelations.js';
+import { pluralizeSchemaName } from '../../utils/dictionaryUtils.js';
import { InternalServerError } from '../../utils/errors.js';
-import { pluralizeSchemaName } from '../../utils/submissionUtils.js';
import { groupByEntityName } from '../../utils/submittedDataUtils.js';
import { type DataRecordNested, ORDER_TYPE, type SubmittedDataResponse } from '../../utils/types.js';
diff --git a/packages/data-provider/src/utils/dictionaryUtils.ts b/packages/data-provider/src/utils/dictionaryUtils.ts
index e1005050..4ac02a15 100644
--- a/packages/data-provider/src/utils/dictionaryUtils.ts
+++ b/packages/data-provider/src/utils/dictionaryUtils.ts
@@ -1,7 +1,24 @@
-import { Dictionary as SchemasDictionary, type Schema } from '@overture-stack/lectern-client';
+import plur from 'plur';
+
+import {
+ type DataRecord,
+ Dictionary as SchemasDictionary,
+ type Schema,
+ validate,
+} from '@overture-stack/lectern-client';
import type { FieldNamesByPriorityMap } from './types.js';
+/**
+ * Retrieves a schema definition by its name from the provided schemas dictionary.
+ * @param schemaName The name of the schema to look up.
+ * @param schemasDictionary The dictionary containing all available schemas.
+ * @returns The matching schema if found, otherwise `undefined`
+ */
+export const getSchemaByName = (schemaName: string, schemasDictionary: SchemasDictionary): Schema | undefined => {
+ return schemasDictionary.schemas.find((schema) => schema.name === schemaName);
+};
+
/**
* Get Fields from Schema
* @param {Schema} schema Schema object
@@ -25,4 +42,31 @@ export const getSchemaFieldNames = (schema: Schema): FieldNamesByPriorityMap =>
);
};
+export const pluralizeSchemaName = (schemaName: string) => {
+ return plur(schemaName);
+};
+
+/**
+ * Validate a full set of Schema Data using a Dictionary
+ * @param {SchemasDictionary & {id: number }} dictionary
+ * @param {Record} schemasData
+ * @returns A TestResult object representing the outcome of a test applied to some data.
+ * If a test is valid, no additional data is added to the result. If it is invalid, then the
+ * reason (or array of reasons) for why the test failed should be given.
+ */
+export const validateSchemas = (
+ dictionary: SchemasDictionary & {
+ id: number;
+ },
+ schemasData: Record,
+) => {
+ const schemasDictionary: SchemasDictionary = {
+ name: dictionary.name,
+ version: dictionary.version,
+ schemas: dictionary.schemas,
+ };
+
+ return validate.validateDictionary(schemasData, schemasDictionary);
+};
+
export { SchemasDictionary };
diff --git a/packages/data-provider/src/utils/files.ts b/packages/data-provider/src/utils/files.ts
new file mode 100644
index 00000000..ef4eff0a
--- /dev/null
+++ b/packages/data-provider/src/utils/files.ts
@@ -0,0 +1,244 @@
+import bytes, { type Unit } from 'bytes';
+import { parse as csvParse } from 'csv-parse';
+import firstline from 'firstline';
+import fs from 'fs';
+import z from 'zod';
+
+import { type Schema, type UnprocessedDataRecord } from '@overture-stack/lectern-client';
+
+import { BATCH_ERROR_TYPE, type BatchError } from './types.js';
+
+export const SUPPORTED_FILE_EXTENSIONS = z.enum(['tsv', 'csv']);
+export type SupportedFileExtensions = z.infer;
+
+const systemIdColumn = 'systemId' as const;
+
+export const columnSeparatorValue = {
+ tsv: '\t',
+ csv: ',',
+} as const satisfies Record;
+
+export const extractEntityNameFromFileName = (filename: string) => {
+ return filename.split('.')[0]?.toLowerCase();
+};
+
+/**
+ * Formats a file size from bytes to a specified unit with a defined precision.
+ *
+ * @param sizeInBytes - The file size in bytes to be formatted.
+ * @param unit - The unit to which the size should be converted (e.g., 'MB', 'GB').
+ * @param precision - The number of decimal places to include in the formatted output.
+ * @returns The file size formatted as a string in the specified unit with the given precision.
+ *
+ */
+export const formatByteSize = (sizeInBytes: number, unit: Unit, precision: number) => {
+ return bytes.format(sizeInBytes, { unit, decimalPlaces: precision });
+};
+
+/**
+ * tsv exported from excel might add double quotations to indicate string and escape double quotes
+ * this function removes those extra double quatations from a given string
+ * @param data
+ * @returns
+ */
+export const formatForExcelCompatibility = (data: string) => {
+ return data
+ .trim()
+ .replace(/^"/, '') // excel might add a beginning double quotes to indicate string
+ .replace(/"$/, '') // excel might add a trailing double quote to indicate string
+ .replace(/""/g, '"') // excel might've used a second double quote to escape a double quote in a string
+ .trim();
+};
+
+/**
+ * Extracts the file extension from a given file name.
+ * @param {string} fileName
+ * @returns {string | undefined}
+ */
+export const getFileExtension = (fileName: string): string | undefined => {
+ return fileName.split('.').pop()?.toLowerCase();
+};
+
+export const getSizeInBytes = (size: string | number): number => {
+ // Parse the string value into an integer in bytes.
+ // If value is a number it is assumed is in bytes.
+ return bytes.parse(size) || 0;
+};
+
+/**
+ * Determines the separator character for a given file based on its extension.
+ * @param fileName The name of the file whose extension determines the separator character.
+ * @returns The separator character associated with the file extension, or `undefined` if
+ * the file extension is invalid or unrecognized.
+ */
+export const getSeparatorCharacter = (fileName: string) => {
+ const fileExtension = getValidFileExtension(fileName);
+ if (fileExtension) {
+ return columnSeparatorValue[fileExtension];
+ }
+ return;
+};
+
+/**
+ * Extracts and validates the file extension from the filename.
+ * @param {string} fileName
+ * @returns {SupportedFileExtensions | undefined}
+ */
+export const getValidFileExtension = (fileName: string): SupportedFileExtensions | undefined => {
+ const extension = getFileExtension(fileName);
+ return extension ? validateFileExtension(extension) : undefined;
+};
+
+/**
+ * Maps a record array to an object with keys from headers, formatting each value for compatibility.
+ * @param headers An array of header names, used as keys for the returned object.
+ * @param record An array of values corresponding to each header, to be formatted and mapped.
+ * @returns An `UnprocessedDataRecord` object where each header in `headers` is a key,
+ * and each value is the corresponding entry in `record` formatted for compatibility.
+ */
+export const mapRecordToHeaders = (headers: string[], record: string[]) => {
+ return headers.reduce((obj: UnprocessedDataRecord, nextKey, index) => {
+ const dataStr = record[index] || '';
+ const formattedData = formatForExcelCompatibility(dataStr);
+ obj[nextKey] = formattedData;
+ return obj;
+ }, {});
+};
+
+/**
+ * Read a file and parse field names based on schema definition
+ * Supported files: .tsv or .csv
+ * @param {Express.Multer.File} file A file to read
+ * @param {Schema} schema Schema to parse field names
+ * @returns an array of records where each record is a key-value pair object representing
+ * a row in the file.
+ */
+export const parseFileToRecords = async (
+ file: Express.Multer.File,
+ schema: Schema,
+): Promise[]> => {
+ const returnRecords: Record[] = [];
+ const separatorCharacter = getSeparatorCharacter(file.originalname);
+ if (!separatorCharacter) {
+ throw new Error('Invalid file Extension');
+ }
+
+ let headers: string[] = [];
+
+ const schemaDisplayNames = schema.fields.reduce>((acc, field) => {
+ acc[field.meta?.displayName?.toString() || field.name] = field.name;
+ return acc;
+ }, {});
+
+ return new Promise((resolve) => {
+ const stream = fs.createReadStream(file.path).pipe(csvParse({ delimiter: separatorCharacter }));
+
+ stream.on('data', (record: string[]) => {
+ if (!headers.length) {
+ headers = record
+ .map((value) => schemaDisplayNames[value] ?? value)
+ .filter((value) => value)
+ .map((str) => str.trim());
+ } else {
+ const mappedRecord = mapRecordToHeaders(headers, record);
+
+ returnRecords.push(mappedRecord);
+ }
+ });
+
+ stream.on('end', () => {
+ resolve(returnRecords);
+ });
+
+ stream.on('close', () => {
+ stream.destroy();
+ fs.unlink(file.path, () => {});
+ });
+ });
+};
+
+/**
+ * Pre-validates a data file before submission.
+ *
+ * This function performs a series of checks on the provided file to ensure it meets the necessary criteria before it can be submitted for data processing.
+ * The following checks are performed:
+ * - Verifies that the file has a supported extension and format.
+ * - Verifies that the file contains the required column names as per the provided schema.
+ * - Verifies if the file is for editing data, it must contain the systemId column
+ *
+ * If any of these checks fail, an error is returned
+ * @param file The file to be validated
+ * @param schema The schema against which the file will be validated
+ * @returns
+ */
+export const prevalidateDataFile = async (
+ file: Express.Multer.File,
+ schema: Schema,
+ isEditFile: boolean = false,
+): Promise<{ error?: BatchError }> => {
+ // check if extension is supported
+ const separatorCharacter = getSeparatorCharacter(file.originalname);
+ if (!separatorCharacter) {
+ const message = `Invalid file extension ${file.originalname.split('.')[1]}`;
+ return {
+ error: {
+ type: BATCH_ERROR_TYPE.INVALID_FILE_EXTENSION,
+ message,
+ batchName: file.originalname,
+ },
+ };
+ }
+
+ const firstLine = await readHeaders(file);
+ const fileHeaders = firstLine.split(separatorCharacter).map((str) => str.trim());
+
+ if (isEditFile && !fileHeaders.includes(systemIdColumn)) {
+ const message = `File is missing the column '${systemIdColumn}'`;
+ return {
+ error: {
+ type: BATCH_ERROR_TYPE.MISSING_REQUIRED_HEADER,
+ message,
+ batchName: file.originalname,
+ },
+ };
+ }
+
+ const missingRequiredFields = schema.fields
+ .filter((field) => field.restrictions && 'required' in field.restrictions) // filter required fields
+ .map((field) => field.meta?.displayName?.toString() || field.name) // map displayName if exists
+ .filter((fieldName) => !fileHeaders.includes(fieldName));
+ if (missingRequiredFields.length > 0) {
+ const message = `Missing required fields '${JSON.stringify(missingRequiredFields)}'`;
+ return {
+ error: {
+ type: BATCH_ERROR_TYPE.MISSING_REQUIRED_HEADER,
+ message,
+ batchName: file.originalname,
+ },
+ };
+ }
+ return {};
+};
+
+/**
+ * Reads only first line of the file
+ * Usefull when file is too large and we're only interested in column names
+ * @param file A file we want to read
+ * @returns a string with the content of the first line of the file
+ */
+export const readHeaders = async (file: Express.Multer.File) => {
+ return firstline(file.path);
+};
+
+/**
+ * Validates if the file extension is supported.
+ * @param {string} extension
+ * @returns {SupportedFileExtensions | undefined}
+ */
+export const validateFileExtension = (extension: string): SupportedFileExtensions | undefined => {
+ try {
+ return SUPPORTED_FILE_EXTENSIONS.parse(extension);
+ } catch {
+ return;
+ }
+};
diff --git a/packages/data-provider/src/utils/formatUtils.ts b/packages/data-provider/src/utils/formatUtils.ts
index c6d20eb8..a9d0bc1c 100644
--- a/packages/data-provider/src/utils/formatUtils.ts
+++ b/packages/data-provider/src/utils/formatUtils.ts
@@ -115,6 +115,10 @@ export const deepCompare = (obj1: unknown, obj2: unknown): boolean => {
return true;
};
+export const isNotNull = (value: T): value is Exclude => {
+ return value !== null;
+};
+
// Helper function to check if an object is a plain object
function isObject(obj: unknown): obj is Record {
return typeof obj === 'object' && obj !== null && !Array.isArray(obj);
diff --git a/packages/data-provider/src/utils/mergeRecords.ts b/packages/data-provider/src/utils/mergeRecords.ts
new file mode 100644
index 00000000..afedd4f9
--- /dev/null
+++ b/packages/data-provider/src/utils/mergeRecords.ts
@@ -0,0 +1,127 @@
+import type { DataRecord } from '@overture-stack/lectern-client';
+import type {
+ SubmissionDeleteData,
+ SubmissionInsertData,
+ SubmissionUpdateData,
+} from '@overture-stack/lyric-data-model/models';
+
+import { deepCompare } from './formatUtils.js';
+
+/**
+ * Merges multiple `Record` objects into a single object.
+ * If there are duplicate keys between the objects, the `records` arrays of `SubmissionInsertData`
+ * are concatenated for the matching keys, ensuring no duplicates.
+ *
+ * @param objects An array of objects where each object is a `Record`.
+ * Each key represents the entityName, and the value is an object of type `SubmissionInsertData`.
+ *
+ * @returns A new `Record` where:
+ * - If a key is unique across all objects, its value is directly included.
+ * - If a key appears in multiple objects, the `records` arrays are concatenated for that key, avoiding duplicates.
+ */
+export const mergeInsertsRecords = (
+ ...objects: Record[]
+): Record => {
+ const result: Record = {};
+
+ let seen: DataRecord[] = [];
+ // Iterate over all objects
+ objects.forEach((obj) => {
+ // Iterate over each key in the current object
+ Object.entries(obj).forEach(([key, value]) => {
+ if (result[key]) {
+ // The key already exists in the result, concatenate the `records` arrays, avoiding duplicates
+ let uniqueData: DataRecord[] = [];
+
+ result[key].records.concat(value.records).forEach((item) => {
+ if (!seen.some((existingItem) => deepCompare(existingItem, item))) {
+ uniqueData = uniqueData.concat(item);
+ seen = seen.concat(item);
+ }
+ });
+
+ result[key].records = uniqueData;
+ return;
+ } else {
+ // The key doesn't exists in the result, create as it comes
+ result[key] = value;
+ return;
+ }
+ });
+ });
+
+ return result;
+};
+
+/**
+ * Merges multiple `Record` objects into a single object.
+ * For each key, the `SubmissionDeleteData[]` arrays are concatenated, ensuring no duplicate
+ * `SubmissionDeleteData` objects based on the `systemId` field.
+ *
+ * @param objects Multiple `Record` objects to be merged.
+ * Each key represents an identifier, and the value is an array of `SubmissionDeleteData`.
+ *
+ * @returns
+ */
+export const mergeDeleteRecords = (
+ ...objects: Record[]
+): Record => {
+ const result: Record = {};
+
+ // Iterate over all objects
+ objects.forEach((obj) => {
+ // Iterate over each key in the current object
+ Object.entries(obj).forEach(([key, value]) => {
+ if (!result[key]) {
+ result[key] = [];
+ }
+ const uniqueRecords = new Map();
+
+ // Add existing records to the map
+ result[key].forEach((record) => uniqueRecords.set(record.systemId, record));
+
+ // Add new records, overriding duplicates based on systemId
+ value.forEach((record) => uniqueRecords.set(record.systemId, record));
+
+ // Convert the map back to an array
+ result[key] = Array.from(uniqueRecords.values());
+ });
+ });
+
+ return result;
+};
+
+/**
+ * Merge Active Submission data with incoming TSV file data processed
+ *
+ * @param objects
+ * @returns An arbitrary number of arrays of Record
+ */
+export const mergeUpdatesBySystemId = (
+ ...objects: Record[]
+): Record => {
+ const result: Record = {};
+
+ // Iterate over all objects
+ objects.forEach((obj) => {
+ // Iterate over each key in the current object
+ Object.entries(obj).forEach(([key, value]) => {
+ // Initialize a map to track unique systemIds for this key
+ if (!result[key]) {
+ result[key] = [];
+ }
+
+ const existingIds = new Map(result[key].map((item) => [item.systemId, item]));
+
+ // Add or update entries based on systemId uniqueness
+ value.forEach((item) => {
+ existingIds.set(item.systemId, item);
+ });
+
+ // Convert the map back to an array and store it in the result
+ result[key] = Array.from(existingIds.values());
+ });
+ });
+
+ return result;
+};
diff --git a/packages/data-provider/src/utils/recordsParser.ts b/packages/data-provider/src/utils/recordsParser.ts
new file mode 100644
index 00000000..c744c925
--- /dev/null
+++ b/packages/data-provider/src/utils/recordsParser.ts
@@ -0,0 +1,92 @@
+import { type DataRecord, parse, type Schema } from '@overture-stack/lectern-client';
+import type { SubmissionInsertData } from '@overture-stack/lyric-data-model/models';
+
+import { getSchemaByName } from './dictionaryUtils.js';
+import { convertRecordToString, isNotNull, notEmpty } from './formatUtils.js';
+import { createBatchResponse } from './submissionResponseParser.js';
+import type { EntityData, SchemasDictionary } from './types.js';
+
+/**
+ * Creates a parser function that converts raw string-based records into typed values using the given schema.
+ * Uses Lectern client parsing function
+ * @param schema The schema definition used to interpret and convert field values.
+ * @returns A function that takes a record with string values and returns a typed data record based on the schema.
+ */
+export const getSchemaParser = (schema: Schema) => (record: Record) => {
+ const parsedRecord = parse.parseRecordValues(record, schema);
+ return parsedRecord.data.record;
+};
+
+/**
+ * Parses raw records into typed data records based on the provided schema.
+ * @param dataRecords An array of unprocessed records with unknown value types.
+ * @param schema The schema definition used to convert and validate each record's fields.
+ * @returns An array of valid typed data records.
+ */
+export const convertToTypedRecords = (dataRecords: Record[], schema: Schema): DataRecord[] => {
+ return Object.values(dataRecords).map(convertRecordToString).map(getSchemaParser(schema)).filter(notEmpty);
+};
+
+/**
+ * Converts a collection of raw entity records into typed records
+ * using schema definitions to validate and transform the data.
+ * @param records A map of entity names to arrays of raw records. Each record is untyped and unvalidated.
+ * @param schemasDictionary A dictionary of schema definitions used to validate and convert each entity's records.
+ * @returns A map of entity names to `DataRecord[]` containing typed records.
+ */
+export const parseRecordsToEdit = (
+ records: EntityData,
+ schemasDictionary: SchemasDictionary,
+): Record => {
+ return Object.fromEntries(
+ Object.entries(records)
+ .map(([schemaName, dataRecords]) => {
+ const entitySchema = getSchemaByName(schemaName, schemasDictionary);
+ if (!entitySchema) {
+ // Entity name not found
+ return null;
+ }
+
+ const parsedRecords = convertToTypedRecords(dataRecords, entitySchema);
+ if (parsedRecords.length === 0) {
+ // No records for this entity
+ return null;
+ }
+
+ return [schemaName, parsedRecords];
+ })
+ .filter(isNotNull),
+ );
+};
+
+/**
+ * Converts a collection of raw entity records into typed batches ready for insertion,
+ * using schema definitions to validate and transform the data.
+ * @param records A map of entity names to arrays of raw records. Each record is untyped and unvalidated.
+ * @param schemasDictionary A dictionary of schema definitions used to validate and convert each entity's records.
+ * @returns A map of entity names to `SubmissionInsertData` batches containing typed records.
+ */
+export const parseRecordsToInsert = (
+ records: EntityData,
+ schemasDictionary: SchemasDictionary,
+): Record => {
+ return Object.fromEntries(
+ Object.entries(records)
+ .map(([schemaName, dataRecords]) => {
+ const entitySchema = getSchemaByName(schemaName, schemasDictionary);
+ if (!entitySchema) {
+ // Entity name not found
+ return null;
+ }
+
+ const parsedRecords = convertToTypedRecords(dataRecords, entitySchema);
+ if (parsedRecords.length === 0) {
+ // No records for this entity
+ return null;
+ }
+
+ return [schemaName, createBatchResponse(schemaName, parsedRecords)];
+ })
+ .filter(isNotNull),
+ );
+};
diff --git a/packages/data-provider/src/utils/schemas.ts b/packages/data-provider/src/utils/schemas.ts
index 4aa2e4a1..4410780d 100644
--- a/packages/data-provider/src/utils/schemas.ts
+++ b/packages/data-provider/src/utils/schemas.ts
@@ -300,19 +300,16 @@ export const submissionDeleteEntityNameRequestSchema: RequestValidation<
};
export interface uploadSubmissionRequestQueryParams extends ParsedQs {
- entityName: string;
organization: string;
}
export const uploadSubmissionRequestSchema: RequestValidation<
- Array>,
+ object,
uploadSubmissionRequestQueryParams,
categoryPathParams
> = {
- body: z.record(z.unknown()).array(),
pathParams: categoryPathParamsSchema,
query: z.object({
- entityName: entityNameSchema,
organization: organizationSchema,
}),
};
@@ -332,19 +329,12 @@ export const dataDeleteBySystemIdRequestSchema: RequestValidation