@@ -39,8 +39,10 @@ const defaultConfig: CrateDBConfig = {
3939 date : 'date' ,
4040 } ,
4141 rowMode : 'array' ,
42- enableCompression : true ,
43- compressionThreshold : 1024 , // Default to 1KB
42+ compression : {
43+ request : 'gzip' ,
44+ response : 'none' ,
45+ } ,
4446} ;
4547
4648export class CrateDBClient {
@@ -84,6 +86,15 @@ export class CrateDBClient {
8486 } ;
8587 }
8688
89+ const additionalHeaders : Record < string , string > = { } ;
90+
91+ if ( cfg . compression . request == 'gzip' ) {
92+ additionalHeaders [ 'Content-Encoding' ] = 'gzip' ;
93+ }
94+ if ( cfg . compression . response == 'gzip' ) {
95+ additionalHeaders [ 'Accept-Encoding' ] = 'gzip' ;
96+ }
97+
8798 this . httpOptions = {
8899 hostname : cfg . host ,
89100 port : cfg . port ,
@@ -94,6 +105,7 @@ export class CrateDBClient {
94105 'Content-Type' : 'application/json' ,
95106 Accept : 'application/json' ,
96107 ...authHeader ,
108+ ...additionalHeaders ,
97109 ...( cfg . defaultSchema ? { 'Default-Schema' : cfg . defaultSchema } : { } ) ,
98110 } ,
99111 auth : cfg . jwt ? undefined : cfg . user && cfg . password ? `${ cfg . user } :${ cfg . password } ` : undefined ,
@@ -367,125 +379,85 @@ export class CrateDBClient {
367379 }
368380
369381 async _makeRequest ( options : http . RequestOptions & { body ?: string | Buffer } ) : Promise < CrateDBBaseResponse > {
370- return new Promise ( ( resolve , reject ) => {
382+ let requestBody = options . body ;
383+ const headers = { ...options . headers } ;
384+ const requestBodySize = requestBody ? Buffer . byteLength ( requestBody ) : 0 ;
385+ let encodedSize = requestBodySize ;
386+ let encodingDuration = 0 ;
387+ let requestDuration = 0 ;
388+ let deserializationDuration = 0 ;
389+
390+ // Compress the request body if needed.
391+ if ( this . cfg . compression . request === 'gzip' && requestBody ) {
392+ const startEncodingTime = Date . now ( ) ;
393+ requestBody = await promisify ( zlib . gzip ) ( requestBody ) ;
394+ encodingDuration = Date . now ( ) - startEncodingTime ;
395+ encodedSize = Buffer . byteLength ( requestBody ) ;
396+ }
397+
398+ // Wrap the HTTP request in a promise.
399+ const { response, data } = await new Promise < {
400+ response : http . IncomingMessage ;
401+ data : Buffer [ ] ;
402+ } > ( ( resolve , reject ) => {
403+ const protocolHandler = this . protocol === 'https' ? https : http ;
404+ const req = protocolHandler . request ( { ...options , headers } , ( response ) => {
405+ const data : Buffer [ ] = [ ] ;
406+ response . on ( 'data' , ( chunk : Buffer ) => data . push ( chunk ) ) ;
407+ response . on ( 'end' , ( ) => resolve ( { response, data } ) ) ;
408+ } ) ;
409+ req . on ( 'error' , ( err ) => reject ( new Error ( `Request failed: ${ err . message } \nStack trace: ${ err . stack } ` ) ) ) ;
410+ req . end ( requestBody ) ;
411+ } ) ;
412+
413+ requestDuration = Date . now ( ) - ( response . headers . date ? new Date ( response . headers . date ) . getTime ( ) : Date . now ( ) ) ;
414+ let rawResponse = Buffer . concat ( data ) ;
415+ const responseBodySize = rawResponse . length ;
416+
417+ // Decompress the response if gzip encoded.
418+ if ( response . headers [ 'content-encoding' ] === 'gzip' ) {
371419 try {
372- let requestBody = options . body ;
373- const headers = { ...options . headers } ;
374- const requestBodySize = requestBody ? Buffer . byteLength ( requestBody ) : 0 ;
375- let encodedSize = requestBodySize ;
376- let encodingDuration = 0 ;
377- let requestDuration = 0 ;
378- let deserializationDuration = 0 ;
379-
380- const startEncodingTime = Date . now ( ) ;
381-
382- if ( this . cfg . enableCompression && requestBody && requestBodySize > ( this . cfg . compressionThreshold ?? 1024 ) ) {
383- promisify ( zlib . gzip ) ( requestBody )
384- . then ( ( compressed ) => {
385- const startRequestTime = Date . now ( ) ;
386- encodingDuration = startRequestTime - startEncodingTime ;
387- requestBody = compressed ;
388- encodedSize = Buffer . byteLength ( requestBody ) ;
389- headers [ 'Content-Encoding' ] = 'gzip' ;
390-
391- const reqStartTime = Date . now ( ) ;
392- const req = ( this . protocol === 'https' ? https : http ) . request ( { ...options , headers } , ( response ) => {
393- const data : Buffer [ ] = [ ] ;
394- response . on ( 'data' , ( chunk : Buffer ) => data . push ( chunk ) ) ;
395- response . on ( 'end' , ( ) => {
396- const startDeserializationTime = Date . now ( ) ;
397- requestDuration = startDeserializationTime - reqStartTime ;
398- const rawResponse = Buffer . concat ( data ) ;
399- const responseBodySize = rawResponse . length ;
400-
401- try {
402- const parsedResponse = Serializer . deserialize ( rawResponse . toString ( ) , this . cfg . deserialization ) ;
403- deserializationDuration = Date . now ( ) - startDeserializationTime ;
404- if ( response . statusCode === 200 ) {
405- resolve ( {
406- ...parsedResponse ,
407- sizes : {
408- response : responseBodySize ,
409- request : encodedSize ,
410- requestUncompressed : requestBodySize ,
411- } ,
412- durations : {
413- encoding : encodingDuration ,
414- request : requestDuration ,
415- deserialization : deserializationDuration ,
416- } ,
417- } ) ;
418- } else {
419- reject ( CrateDBError . fromResponse ( parsedResponse as CrateDBErrorResponse , response . statusCode ) ) ;
420- }
421- } catch ( parseErr : unknown ) {
422- if ( parseErr instanceof Error ) {
423- reject (
424- new Error (
425- `Failed to parse response: ${ parseErr . message } . Raw response: ${ rawResponse . toString ( ) } `
426- )
427- ) ;
428- } else {
429- reject ( new Error ( `Failed to parse response. Raw response: ${ rawResponse . toString ( ) } ` ) ) ;
430- }
431- }
432- } ) ;
433- } ) ;
434-
435- req . on ( 'error' , ( err ) => reject ( new Error ( `Request failed: ${ err . message } ` ) ) ) ;
436- req . end ( requestBody ) ;
437- } )
438- . catch ( ( error ) => reject ( error ) ) ;
420+ rawResponse = await promisify ( zlib . gunzip ) ( rawResponse ) ;
421+ } catch ( error ) {
422+ if ( error instanceof Error ) {
423+ throw new Error ( `Failed to decompress response: ${ error . message } ` ) ;
439424 } else {
440- const startRequestTime = Date . now ( ) ;
441- const req = ( this . protocol === 'https' ? https : http ) . request ( { ...options , headers } , ( response ) => {
442- const data : Buffer [ ] = [ ] ;
443- response . on ( 'data' , ( chunk : Buffer ) => data . push ( chunk ) ) ;
444- response . on ( 'end' , ( ) => {
445- const startDeserializationTime = Date . now ( ) ;
446- const requestDuration = startDeserializationTime - startRequestTime ;
447- const rawResponse = Buffer . concat ( data ) ;
448- const responseBodySize = rawResponse . length ;
449-
450- try {
451- const parsedResponse = Serializer . deserialize ( rawResponse . toString ( ) , this . cfg . deserialization ) ;
452- const deserializationDuration = Date . now ( ) - startDeserializationTime ;
453- if ( response . statusCode === 200 ) {
454- resolve ( {
455- ...parsedResponse ,
456- sizes : {
457- response : responseBodySize ,
458- request : encodedSize ,
459- requestUncompressed : requestBodySize ,
460- } ,
461- durations : {
462- encoding : 0 ,
463- request : requestDuration ,
464- deserialization : deserializationDuration ,
465- } ,
466- } ) ;
467- } else {
468- reject ( CrateDBError . fromResponse ( parsedResponse as CrateDBErrorResponse , response . statusCode ) ) ;
469- }
470- } catch ( parseErr : unknown ) {
471- if ( parseErr instanceof Error ) {
472- reject (
473- new Error ( `Failed to parse response: ${ parseErr . message } . Raw response: ${ rawResponse . toString ( ) } ` )
474- ) ;
475- } else {
476- reject ( new Error ( `Failed to parse response. Raw response: ${ rawResponse . toString ( ) } ` ) ) ;
477- }
478- }
479- } ) ;
480- } ) ;
481-
482- req . on ( 'error' , ( err ) => reject ( new Error ( `Request failed: ${ err . message } ` ) ) ) ;
483- req . end ( requestBody ) ;
425+ throw new Error ( `Failed to decompress response: ${ String ( error ) } ` ) ;
484426 }
485- } catch ( error ) {
486- reject ( error ) ;
487427 }
488- } ) ;
428+ }
429+
430+ // Deserialize the response.
431+ const startDeserializationTime = Date . now ( ) ;
432+ let parsedResponse ;
433+ try {
434+ parsedResponse = Serializer . deserialize ( rawResponse . toString ( ) , this . cfg . deserialization ) ;
435+ } catch ( err ) {
436+ if ( err instanceof Error ) {
437+ throw new Error ( `Failed to parse response: ${ err . message } . Raw response: ${ rawResponse . toString ( ) } ` ) ;
438+ } else {
439+ throw new Error ( `Failed to parse response: ${ String ( err ) } . Raw response: ${ rawResponse . toString ( ) } ` ) ;
440+ }
441+ }
442+ deserializationDuration = Date . now ( ) - startDeserializationTime ;
443+
444+ if ( response . statusCode === 200 ) {
445+ return {
446+ ...parsedResponse ,
447+ sizes : {
448+ response : responseBodySize ,
449+ request : encodedSize ,
450+ requestUncompressed : requestBodySize ,
451+ } ,
452+ durations : {
453+ encoding : encodingDuration ,
454+ request : requestDuration ,
455+ deserialization : deserializationDuration ,
456+ } ,
457+ } ;
458+ } else {
459+ throw CrateDBError . fromResponse ( parsedResponse as CrateDBErrorResponse , response . statusCode ! ) ;
460+ }
489461 }
490462
491463 protected _transformResponse (
0 commit comments