Conversation
|
Go through this PR first for this to merge |
There was a problem hiding this comment.
Pull request overview
Adds a beneficiary CSV “import v2” flow that persists import records and stores uploaded CSVs in Cloudflare R2, with async processing via Bull.
Changes:
- Introduces
BeneficiaryImportmodel + migrations to track imports (status, source, extras). - Adds Imports API endpoints + Bull queue/processor to validate CSV and bulk-insert beneficiaries.
- Adds SDK client/constants and seed script for Cloudflare R2 settings.
Reviewed changes
Copilot reviewed 31 out of 33 changed files in this pull request and generated 11 comments.
Show a summary per file
| File | Description |
|---|---|
| prisma/seed.user.ts | Updates seeded demo user name. |
| prisma/seed.cloudflare-r2.ts | Adds seed script to upsert Cloudflare R2 settings. |
| prisma/schema.prisma | Adds ImportStatus enum and BeneficiaryImport model. |
| prisma/migrations/20260316103717_beneficairy_import/migration.sql | Creates import enum/table/index. |
| prisma/migrations/20260317071644_import_source/migration.sql | Adds source column to imports table. |
| package.json | Adds AWS S3 SDK + csv parsing deps and seed script. |
| libs/sdk/src/constants/index.ts | Adds Bull queue constant for imports. |
| libs/sdk/src/clients/index.ts | Exports new imports client. |
| libs/sdk/src/clients/imports.client.ts | Adds SDK client/types for imports endpoints. |
| libs/sdk/src/beneficiary/beneficiary.events.ts | Adds new import job name constant. |
| libs/sdk/package.json | Bumps SDK version. |
| apps/rahat/src/queue/queue.module.ts | Registers imports Bull queue. |
| apps/rahat/src/processors/processors.module.ts | Wires imports processor + module + prisma module. |
| apps/rahat/src/processors/import.processor.ts | Implements import job processor (parse/validate/import). |
| apps/rahat/src/imports/imports.service.ts | Implements R2 upload/download + import CRUD + job progress. |
| apps/rahat/src/imports/imports.module.ts | Adds Imports module composition. |
| apps/rahat/src/imports/imports.controller.ts | Adds REST + SSE endpoints for imports. |
| apps/rahat/src/imports/import-validator.util.ts | Adds CSV + DB validation and error CSV generation. |
| apps/rahat/src/imports/csv-parser.util.ts | Adds CSV parsing and row mapping utilities. |
| apps/rahat/src/beneficiary/beneficiary.controller.ts | Adds debug logs to an existing endpoint. |
| apps/rahat/src/app/app.module.ts | Registers ImportsModule in main app. |
| apps/beneficiary/src/beneficiary/beneficiary.service.ts | Minor const/for-of cleanups and removes stray statement. |
| bruno/* | Adds Bruno requests for imports/app and a small formatting fix. |
Files not reviewed (1)
- pnpm-lock.yaml: Language not supported
Comments suppressed due to low confidence (1)
prisma/seed.cloudflare-r2.ts:1
- Seeding empty-string credentials into the DB can lead to hard-to-debug runtime failures (and may overwrite valid settings on update). Consider failing fast if required env vars are missing, or avoid updating existing settings unless values are provided.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| // 1. Download CSV from the temporary URL | ||
| const response = await firstValueFrom( | ||
| this.httpService.get(data.fileUrl, { responseType: 'arraybuffer' }) | ||
| ); |
There was a problem hiding this comment.
Downloading a user-provided fileUrl server-side introduces an SSRF vector (e.g., hitting internal IPs/metadata endpoints). Add URL validation/allowlisting (scheme, hostname), and block private/loopback/link-local ranges (or proxy downloads through an approved storage/upload mechanism).
| const timestamp = Date.now(); | ||
| // ${ metadata.groupUUID }_${ timestamp } -${ metadata.groupName }.csv | ||
| const r2Key = `imports/${timestamp}_${data.groupUUID}_${data.groupName}.csv`; | ||
| await s3Client.send( |
There was a problem hiding this comment.
The R2 object key includes unsanitized groupName, which can contain slashes, control characters, very long strings, etc. This can create unexpected key layouts and operational issues. Sanitize/slugify groupName (and consider length limits) before constructing r2Key.
| const sortField = query?.sort || 'createdAt'; | ||
| const sortOrder = query?.order || 'desc'; | ||
|
|
||
| return paginate(this.prisma.beneficiaryImport, { | ||
| where, | ||
| orderBy: { [sortField]: sortOrder }, | ||
| }, { |
There was a problem hiding this comment.
Using a raw sort query string directly as a Prisma orderBy key can throw runtime errors for invalid/unknown fields and effectively exposes internal schema details. Prefer an allowlist mapping of permitted sort fields (e.g., createdAt, updatedAt, status, source) and fall back to createdAt when an invalid value is provided.
| const source = origin | ||
| ? new URL(Array.isArray(origin) ? origin[0] : origin).host | ||
| : req.ip; | ||
|
|
There was a problem hiding this comment.
new URL(...) will throw if origin/referer is present but not a valid absolute URL (e.g., unexpected header formats). Wrap parsing in a try/catch (or validate with a safe parser) and fall back to req.ip/a default when parsing fails to avoid 500s on create-import.
| const source = origin | |
| ? new URL(Array.isArray(origin) ? origin[0] : origin).host | |
| : req.ip; | |
| let source = req.ip; | |
| if (origin) { | |
| try { | |
| const originValue = Array.isArray(origin) ? origin[0] : origin; | |
| source = new URL(originValue).host; | |
| } catch { | |
| // If origin/referer is not a valid absolute URL, fall back to req.ip | |
| source = req.ip; | |
| } | |
| } |
| @Sse(':uuid/progress') | ||
| @ApiParam({ name: 'uuid', required: true }) | ||
| progress(@Param('uuid') uuid: string): Observable<MessageEvent> { | ||
| return interval(1500).pipe( |
There was a problem hiding this comment.
The SSE progress endpoint is currently unauthenticated/unguarded, which can leak import status and validation errors to anyone who guesses a UUID. Apply the same auth/ability guards as the other import read endpoints (or otherwise restrict access) to prevent information disclosure.
| console.log(req.body); | ||
| console.log('Received request to import beneficiaries from external tool'); |
There was a problem hiding this comment.
Logging full request bodies can expose PII and secrets in logs. Remove these console.log statements or replace with structured logging that redacts sensitive fields (and uses Nest Logger).
| case 'age': | ||
| return parseInt(value, 10) || undefined; | ||
| case 'latitude': | ||
| case 'longitude': | ||
| return parseFloat(value) || undefined; | ||
| case 'birthDate': | ||
| return new Date(value) || undefined; |
There was a problem hiding this comment.
The numeric conversions treat valid 0 values as undefined because of || undefined. Also, new Date(value) is always truthy even when invalid, so Invalid Date can slip through. Use explicit Number.isNaN(...) checks for numbers, and validate dates via const d = new Date(value); return isNaN(d.getTime()) ? undefined : d;.
| case 'age': | |
| return parseInt(value, 10) || undefined; | |
| case 'latitude': | |
| case 'longitude': | |
| return parseFloat(value) || undefined; | |
| case 'birthDate': | |
| return new Date(value) || undefined; | |
| case 'age': { | |
| const age = parseInt(value, 10); | |
| return Number.isNaN(age) ? undefined : age; | |
| } | |
| case 'latitude': | |
| case 'longitude': { | |
| const coord = parseFloat(value); | |
| return Number.isNaN(coord) ? undefined : coord; | |
| } | |
| case 'birthDate': { | |
| const d = new Date(value); | |
| return isNaN(d.getTime()) ? undefined : d; | |
| } |
| import { UUID } from 'crypto'; | ||
|
|
||
| type ImportStatus = 'NEW' | 'PROCESSING' | 'IMPORTED' | 'FAILED'; |
There was a problem hiding this comment.
Importing UUID from Node's crypto makes this client Node-specific and can break browser builds of the SDK. Prefer string (or a project-defined UUID type) for UUIDs. Also, ImportStatus here omits REJECTED even though the Prisma enum includes it—align the SDK union with the backend enum to avoid type mismatches.
|
|
||
| constructor( | ||
| private readonly prisma: PrismaService, | ||
| private readonly httpService: HttpService, | ||
| @InjectQueue(BQUEUE.RAHAT_IMPORT) private readonly importQueue: Queue, | ||
| ) {} | ||
|
|
||
| private async getR2Setting(key: string) { | ||
| const settings = new SettingsService(this.prisma); | ||
| const r2Settings: any = await settings.getSettingsByName('CLOUDFLARE_R2'); | ||
| return r2Settings.value[key]; |
There was a problem hiding this comment.
This constructs SettingsService and fetches settings on every call (and getS3Client calls it multiple times). Cache the settings/service instance (or fetch the whole CLOUDFLARE_R2 config once and reuse it) to reduce repeated DB reads and improve throughput under concurrent imports.
| constructor( | |
| private readonly prisma: PrismaService, | |
| private readonly httpService: HttpService, | |
| @InjectQueue(BQUEUE.RAHAT_IMPORT) private readonly importQueue: Queue, | |
| ) {} | |
| private async getR2Setting(key: string) { | |
| const settings = new SettingsService(this.prisma); | |
| const r2Settings: any = await settings.getSettingsByName('CLOUDFLARE_R2'); | |
| return r2Settings.value[key]; | |
| private readonly settingsService: SettingsService; | |
| private r2SettingsCache: any; | |
| constructor( | |
| private readonly prisma: PrismaService, | |
| private readonly httpService: HttpService, | |
| @InjectQueue(BQUEUE.RAHAT_IMPORT) private readonly importQueue: Queue, | |
| ) { | |
| this.settingsService = new SettingsService(this.prisma); | |
| } | |
| private async getR2Setting(key: string) { | |
| if (!this.r2SettingsCache) { | |
| const r2Settings: any = await this.settingsService.getSettingsByName('CLOUDFLARE_R2'); | |
| this.r2SettingsCache = r2Settings?.value ?? {}; | |
| } | |
| return this.r2SettingsCache[key]; |
| CREATE TYPE "ImportStatus" AS ENUM ('NEW', 'PROCESSING', 'IMPORTED', 'REJECTED', 'FAILED'); | ||
|
|
||
| -- CreateTable | ||
| CREATE TABLE "tbl_beneficiary_imports" ( |
There was a problem hiding this comment.
The migration directory name contains a typo: beneficairy_import → beneficiary_import. While Prisma migrations can be sensitive to renames, consider correcting this before it’s widely adopted to avoid long-term confusion in migration history.
The beneficiary import service now uses Cloudflare R2 as the object storage backend for CSV file uploads. When a beneficiary CSV is imported, it is downloaded from the source URL, uploaded to the R2 bucket, and the import record is persisted for tracking.