33 HttpException ,
44 HttpStatus ,
55 Injectable ,
6+ RequestTimeoutException ,
67 UnprocessableEntityException ,
78} from '@nestjs/common' ;
89import { ModuleRef } from '@nestjs/core' ;
@@ -12,6 +13,7 @@ import { IdentityState } from '~/management/identities/_enums/states.enum';
1213import { Identities } from '~/management/identities/_schemas/identities.schema' ;
1314import { IdentitiesService } from '~/management/identities/identities.service' ;
1415import { JobState } from '../jobs/_enums/state.enum' ;
16+ import { JobState as StateOfJob } from 'bullmq' ;
1517import { Jobs } from '../jobs/_schemas/jobs.schema' ;
1618import { JobsService } from '../jobs/jobs.service' ;
1719import { Tasks } from '../tasks/_schemas/tasks.schema' ;
@@ -218,7 +220,10 @@ export class BackendsService extends AbstractQueueProcessor {
218220 concernedTo,
219221 payload,
220222 } ,
221- options ?. job ,
223+ {
224+ ...options ?. job ,
225+ attempts : 1 ,
226+ } ,
222227 ) ;
223228 const optionals = { } ;
224229 if ( ! options ?. async ) {
@@ -255,8 +260,17 @@ export class BackendsService extends AbstractQueueProcessor {
255260
256261 if ( ! options ?. async ) {
257262 let error : Error ;
263+
258264 try {
259265 const response = await job . waitUntilFinished ( this . queueEvents , options . syncTimeout || DEFAULT_SYNC_TIMEOUT ) ;
266+
267+ if ( response ?. status > 0 ) {
268+ const jobError : Error & { response : any } = new Error ( ) as unknown as Error & { response : any } ;
269+ jobError . response = response ;
270+
271+ throw jobError ;
272+ }
273+
260274 let jobStoreUpdated : ModifyResult < Query < Jobs , Jobs > > = null ;
261275
262276 if ( ! options ?. disableLogs ) {
@@ -280,9 +294,12 @@ export class BackendsService extends AbstractQueueProcessor {
280294
281295 return [ jobStoreUpdated as unknown as Jobs , response ] ;
282296 } catch ( err ) {
297+ this . logger . error ( `Error while executing job ${ job . id } ` , err ) ;
283298 error = err ;
284299 }
285300
301+ const stateOfJob = await job . getState ( ) ;
302+
286303 let jobFailed : ModifyResult < Query < Jobs , Jobs > > = null ;
287304 if ( ! options ?. disableLogs ) {
288305 jobFailed = await this . jobsService . update < Jobs > ( jobStore . _id , {
@@ -297,29 +314,42 @@ export class BackendsService extends AbstractQueueProcessor {
297314 } ,
298315 } ) ;
299316 }
317+
300318 if ( concernedTo && ! ! options ?. updateStatus ) {
301319 await this . identitiesService . model . findByIdAndUpdate ( concernedTo , {
302320 $set : {
303321 state : IdentityState . ON_ERROR ,
304322 } ,
305323 } ) ;
306324 }
307- if ( options ?. timeoutDiscard !== false ) {
325+
326+ if ( options ?. timeoutDiscard && stateOfJob !== 'completed' && stateOfJob !== 'failed' ) {
308327 job . discard ( ) ;
309- throw new BadRequestException ( {
310- status : HttpStatus . BAD_REQUEST ,
328+
329+ throw new RequestTimeoutException ( {
330+ status : HttpStatus . REQUEST_TIMEOUT ,
311331 message : `Sync job ${ job . id } failed to finish in time` ,
312332 error,
313333 job : jobFailed as unknown as Jobs ,
314334 } ) ;
315335 }
316- throw new UnprocessableEntityException ( {
317- status : HttpStatus . UNPROCESSABLE_ENTITY ,
318- message : `Job now continue to run in background ${ job . id } , timeout wait until finish reached` ,
336+
337+ if ( error && stateOfJob !== 'completed' && stateOfJob !== 'failed' ) {
338+ throw new UnprocessableEntityException ( {
339+ status : HttpStatus . UNPROCESSABLE_ENTITY ,
340+ message : `Job now continue to run in background ${ job . id } , timeout wait until finish reached` ,
341+ error,
342+ job : jobFailed as unknown as Jobs ,
343+ } ) ;
344+ }
345+
346+ throw new BadRequestException ( {
347+ status : HttpStatus . BAD_REQUEST ,
319348 error,
320- job : jobFailed as unknown as Jobs ,
349+ job : ( error as any ) . response ,
321350 } ) ;
322351 }
352+
323353 return [ jobStore ?. toObject ( ) || null , null ] ;
324354 }
325355}
0 commit comments