Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion handwritten/storage/src/bucket.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import * as http from 'http';
import * as path from 'path';
import {promisify} from 'util';
import AsyncRetry from 'async-retry';
import {randomUUID} from 'crypto';
import {convertObjKeysToSnakeCase, handleContextValidation} from './util.js';

import {Acl, AclMetadata} from './acl.js';
Expand Down Expand Up @@ -4436,6 +4437,7 @@ class Bucket extends ServiceObject<Bucket, BucketMetadata> {
optionsOrCallback?: UploadOptions | UploadCallback,
callback?: UploadCallback,
): Promise<UploadResponse> | void {
const persistentInvocationId = randomUUID();
const upload = (numberOfRetries: number | undefined) => {
const returnValue = AsyncRetry(
async (bail: (err: GaxiosError | Error) => void) => {
Expand All @@ -4446,7 +4448,10 @@ class Bucket extends ServiceObject<Bucket, BucketMetadata> {
) {
newFile.storage.retryOptions.autoRetry = false;
}
const writable = newFile.createWriteStream(options);
const writable = newFile.createWriteStream({
...options,
invocationId: persistentInvocationId,
});
if (options.onUploadProgress) {
writable.on('progress', options.onUploadProgress);
}
Expand Down
10 changes: 9 additions & 1 deletion handwritten/storage/src/file.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import * as resumableUpload from './resumable-upload.js';
import {Writable, Readable, pipeline, Transform, PipelineSource} from 'stream';
import * as zlib from 'zlib';
import * as http from 'http';
import {randomUUID} from 'crypto';

import {
ExceptionMessages,
Expand Down Expand Up @@ -248,6 +249,7 @@ export interface CreateResumableUploadOptions
* @see {@link CRC32C.from} for possible values.
*/
resumeCRC32C?: Parameters<(typeof CRC32C)['from']>[0];
invocationId?: string;
preconditionOpts?: PreconditionOptions;
[GCCL_GCS_CMD_KEY]?: resumableUpload.UploadConfig[typeof GCCL_GCS_CMD_KEY];
}
Expand Down Expand Up @@ -4172,13 +4174,17 @@ class File extends ServiceObject<File, FileMetadata> {
) {
maxRetries = 0;
}
const persistentInvocationId = randomUUID();
const returnValue = AsyncRetry(
async (bail: (err: Error) => void) => {
return new Promise<void>((resolve, reject) => {
if (maxRetries === 0) {
this.storage.retryOptions.autoRetry = false;
}
const writable = this.createWriteStream(options);
const writable = this.createWriteStream({
...options,
invocationId: persistentInvocationId,
});

if (options.onUploadProgress) {
writable.on('progress', options.onUploadProgress);
Expand Down Expand Up @@ -4440,6 +4446,7 @@ class File extends ServiceObject<File, FileMetadata> {
chunkSize: options?.chunkSize,
highWaterMark: options?.highWaterMark,
universeDomain: this.bucket.storage.universeDomain,
invocationId: options.invocationId,
[GCCL_GCS_CMD_KEY]: options[GCCL_GCS_CMD_KEY],
};

Expand Down Expand Up @@ -4499,6 +4506,7 @@ class File extends ServiceObject<File, FileMetadata> {
uploadType: 'multipart',
},
url,
invocationId: options.invocationId,
[GCCL_GCS_CMD_KEY]: options[GCCL_GCS_CMD_KEY],
method: 'POST',
responseType: 'json',
Expand Down
171 changes: 107 additions & 64 deletions handwritten/storage/src/storage-transport.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,13 @@ import {
getModuleFormat,
getRuntimeTrackingString,
getUserAgentString,
} from './util';
} from './util.js';
import {randomUUID} from 'crypto';
// eslint-disable-next-line @typescript-eslint/ban-ts-comment
// @ts-ignore
import {getPackageJSON} from './package-json-helper.cjs';
import {GCCL_GCS_CMD_KEY} from './nodejs-common/util';
import {RetryOptions} from './storage';
import {GCCL_GCS_CMD_KEY} from './nodejs-common/util.js';
import {RETRYABLE_ERR_FN_DEFAULT, RetryOptions} from './storage.js';

export interface StandardStorageQueryParams {
alt?: 'json' | 'media';
Expand All @@ -49,6 +49,7 @@ export interface StorageQueryParameters extends StandardStorageQueryParams {

export interface StorageRequestOptions extends GaxiosOptions {
[GCCL_GCS_CMD_KEY]?: string;
invocationId?: string;
interceptors?: GaxiosInterceptor<GaxiosOptionsPrepared>[];
autoPaginate?: boolean;
autoPaginateVal?: boolean;
Expand Down Expand Up @@ -87,7 +88,6 @@ export interface StorageTransportCallback<T> {
fullResponse?: GaxiosResponse,
): void;
}
let projectId: string;

export class StorageTransport {
authClient: GoogleAuth<AuthClient>;
Expand All @@ -113,7 +113,11 @@ export class StorageTransport {
}
this.providedUserAgent = options.userAgent;
this.packageJson = getPackageJSON();
this.retryOptions = options.retryOptions;
this.retryOptions = {
...options.retryOptions,
retryableErrorFn:
options.retryOptions?.retryableErrorFn || RETRYABLE_ERR_FN_DEFAULT,
};
this.baseUrl = options.baseUrl;
this.timeout = options.timeout;
this.projectId = options.projectId;
Expand All @@ -124,76 +128,108 @@ export class StorageTransport {
reqOpts: StorageRequestOptions,
callback?: StorageTransportCallback<T>,
): Promise<void | T> {
const headers = this.#buildRequestHeaders(reqOpts.headers);
if (reqOpts[GCCL_GCS_CMD_KEY]) {
headers.set(
'x-goog-api-client',
`${headers.get('x-goog-api-client')} gccl-gcs-cmd/${reqOpts[GCCL_GCS_CMD_KEY]}`,
);
const resolvedProjectId =
reqOpts.projectId ||
this.projectId ||
(await this.authClient.getProjectId());

if (!this.projectId) {
this.projectId = resolvedProjectId;
}

const queryParameters = {
project: resolvedProjectId,
...reqOpts.queryParameters,
};

// Header Construction
const headers = this.#prepareHeaders(reqOpts);

// Interceptor Management
this.gaxiosInstance.interceptors.request.clear();
if (reqOpts.interceptors) {
this.gaxiosInstance.interceptors.request.clear();
for (const inter of reqOpts.interceptors) {
this.gaxiosInstance.interceptors.request.add(inter);
}
}
Comment thread
thiyaguk09 marked this conversation as resolved.

try {
const getProjectId = async () => {
if (reqOpts.projectId) return reqOpts.projectId;
projectId = await this.authClient.getProjectId();
return projectId;
};
const _projectId = await getProjectId();
if (_projectId) {
projectId = _projectId;
this.projectId = projectId;
}
const urlString = reqOpts.url?.toString() || '';
const isAbsolute = this.#isValidUrl(urlString);

// Determine the base URL for the request
const requestUrl = isAbsolute
? urlString
: new URL(urlString, this.baseUrl).toString();

try {
const requestPromise = this.authClient.request<T>({
retryConfig: {
retry: this.retryOptions.maxRetries,
noResponseRetries: this.retryOptions.maxRetries,
maxRetryDelay: this.retryOptions.maxRetryDelay,
retryDelayMultiplier: this.retryOptions.retryDelayMultiplier,
shouldRetry: this.retryOptions.retryableErrorFn,
totalTimeout: this.retryOptions.totalTimeout,
shouldRetry: err => !!this.retryOptions.retryableErrorFn?.(err),
},
...reqOpts,
params: queryParameters,
paramsSerializer: this.#paramsSerializer,
headers,
url: this.#buildUrl(reqOpts.url?.toString(), reqOpts.queryParameters),
url: requestUrl,
timeout: this.timeout,
validateStatus: (status: number): boolean => {
const isResumable = !!(
reqOpts.queryParameters?.uploadType === 'resumable' ||
reqOpts.url?.toString().includes('uploadType=resumable')
);
return (
(status >= 200 && status < 300) || (isResumable && status === 308)
);
},
});

return callback
? requestPromise
.then(resp => callback(null, resp.data, resp))
.catch(err => callback(err, null, err.response))
: (requestPromise.then(resp => resp.data) as Promise<T>);
// Response Handling
const responseHandler = (resp: GaxiosResponse<T>) => {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
const data = resp.data as any;
if (data !== null && typeof data === 'object') {
data.headers = resp.headers;
data.status = resp.status;
return data;
}
return resp;
};
Comment thread
thiyaguk09 marked this conversation as resolved.

if (callback) {
requestPromise
.then(resp => callback(null, responseHandler(resp), resp))
.catch(err => callback(err, null, err.response));
return;
}

return requestPromise.then(responseHandler);
} catch (e) {
if (callback) return callback(e as GaxiosError);
throw e;
}
}

#buildUrl(pathUri = '', queryParameters: StorageQueryParameters = {}): URL {
if (
'project' in queryParameters &&
(queryParameters.project !== this.projectId ||
queryParameters.project !== projectId)
) {
queryParameters.project = this.projectId;
}
const qp = this.#buildRequestQueryParams(queryParameters);
let url: URL;
if (this.#isValidUrl(pathUri)) {
url = new URL(pathUri);
} else {
url = new URL(`${this.baseUrl}${pathUri}`);
#prepareHeaders(reqOpts: StorageRequestOptions): Record<string, string> {
const headersObj = this.#buildRequestHeaders(reqOpts);

if (reqOpts[GCCL_GCS_CMD_KEY]) {
const current = headersObj.get('x-goog-api-client') || '';
headersObj.set(
'x-goog-api-client',
`${current} gccl-gcs-cmd/${reqOpts[GCCL_GCS_CMD_KEY]}`,
);
}
url.search = qp;

return url;
const finalHeaders: Record<string, string> = {};
headersObj.forEach((v, k) => {
finalHeaders[k] = v;
});
return finalHeaders;
}

#isValidUrl(url: string): boolean {
Expand All @@ -204,32 +240,39 @@ export class StorageTransport {
}
}

#buildRequestHeaders(requestHeaders = {}) {
const headers = new Headers(requestHeaders);
/**
* Serializes query parameters into a string.
* Specifically handles arrays by appending each value individually
* to satisfy GCS "repeated key" requirements (e.g., for IAM permissions).
*/
// eslint-disable-next-line @typescript-eslint/no-explicit-any
#paramsSerializer = (params: Record<string, any>): string => {
const searchParams = new URLSearchParams();
for (const [key, value] of Object.entries(params)) {
if (value === undefined) continue;

if (Array.isArray(value)) {
value.forEach(v => searchParams.append(key, String(v)));
} else {
searchParams.set(key, String(value));
}
}
return searchParams.toString();
};

#buildRequestHeaders(reqOpts: StorageRequestOptions) {
const headers = new Headers(reqOpts.headers);
headers.set('User-Agent', this.#getUserAgentString());
const invocationId = reqOpts.invocationId || randomUUID();
headers.set(
'x-goog-api-client',
`${getRuntimeTrackingString()} gccl/${this.packageJson.version}-${getModuleFormat()} gccl-invocation-id/${randomUUID()}`,
`${getRuntimeTrackingString()} gccl/${this.packageJson.version}-${getModuleFormat()} gccl-invocation-id/${invocationId}`,
);

return headers;
}

#buildRequestQueryParams(queryParameters: StorageQueryParameters): string {
const qp = new URLSearchParams(
queryParameters as unknown as Record<string, string>,
);

return qp.toString();
}

#getUserAgentString(): string {
let userAgent = getUserAgentString();
if (this.providedUserAgent) {
userAgent = `${this.providedUserAgent} ${userAgent}`;
}

return userAgent;
const base = getUserAgentString();
return this.providedUserAgent ? `${this.providedUserAgent} ${base}` : base;
}
}
36 changes: 36 additions & 0 deletions handwritten/storage/system-test/storage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3248,6 +3248,42 @@ describe('storage', function () {

assert.strictEqual(called, true);
});

it('should maintain the same invocationId across the upload lifecycle', async () => {
const invocationIds: string[] = [];

const originalRequest = bucket.storageTransport.authClient.request.bind(
bucket.storageTransport.authClient,
);

// eslint-disable-next-line @typescript-eslint/no-explicit-any
bucket.storageTransport.authClient.request = async (config: any) => {
const headers = config.headers || {};
const apiHeaderKey = Object.keys(headers).find(
key => key.toLowerCase() === 'x-goog-api-client',
);

if (apiHeaderKey) {
const val = headers[apiHeaderKey];
const match = val.match(/gccl-invocation-id\/([a-f0-9-]+)/);
if (match) {
invocationIds.push(match[1]);
}
}
return originalRequest(config);
};

try {
const destination = `test-id-${Date.now()}.txt`;
await bucket.upload(FILES.big.path, {destination, resumable: false});

assert.ok(invocationIds.length >= 1);
const uniqueIds = [...new Set(invocationIds)];
assert.strictEqual(uniqueIds.length, 1);
} finally {
bucket.storageTransport.authClient.request = originalRequest;
}
});
});

describe('channels', () => {
Expand Down
Loading
Loading