diff --git a/packages/transformers/docs/source/_toctree.yml b/packages/transformers/docs/source/_toctree.yml index bfbff8134..84beea532 100644 --- a/packages/transformers/docs/source/_toctree.yml +++ b/packages/transformers/docs/source/_toctree.yml @@ -84,4 +84,13 @@ title: Data Structures title: Utilities isExpanded: false + - sections: + - local: api/worker + title: Overview + - local: api/worker/worker_pipeline + title: Worker Pipeline + - local: api/worker/worker_pipeline_handler + title: Worker Pipeline Handler + title: Web Workers + isExpanded: false title: API Reference diff --git a/packages/transformers/src/transformers.js b/packages/transformers/src/transformers.js index 66a3b5385..26a8a89f1 100644 --- a/packages/transformers/src/transformers.js +++ b/packages/transformers/src/transformers.js @@ -41,6 +41,9 @@ export * from './models/auto/processing_auto.js'; // Configs export { PretrainedConfig, AutoConfig } from './configs.js'; +// Worker +export * from './worker/worker.js'; + // Additional exports export * from './generation/streamers.js'; export * from './generation/stopping_criteria.js'; diff --git a/packages/transformers/src/worker/constants.js b/packages/transformers/src/worker/constants.js new file mode 100644 index 000000000..1b4c3728f --- /dev/null +++ b/packages/transformers/src/worker/constants.js @@ -0,0 +1,28 @@ +/** + * @file Constants for worker pipeline communication. + * @module worker/constants + */ + +/** + * Message type for transformer pipeline operations in web workers. + * @constant {string} + */ +export const REQUEST_MESSAGE_TYPE = 'transformersjs_worker_pipeline'; + +/** + * Message type for invoking callbacks from worker to main thread. + * @constant {string} + */ +export const RESPONSE_MESSAGE_TYPE_INVOKE_CALLBACK = 'transformersjs_worker_invokeCallback'; + +/** + * Message type for pipeline ready notification. + * @constant {string} + */ +export const RESPONSE_MESSAGE_TYPE_READY = 'transformersjs_worker_ready'; + +/** + * Message type for pipeline result. + * @constant {string} + */ +export const RESPONSE_MESSAGE_TYPE_RESULT = 'transformersjs_worker_result'; diff --git a/packages/transformers/src/worker/worker.js b/packages/transformers/src/worker/worker.js new file mode 100644 index 000000000..1666f1c99 --- /dev/null +++ b/packages/transformers/src/worker/worker.js @@ -0,0 +1,7 @@ +/** + * @file Web Worker utilities for running pipelines in worker threads. + * @module worker + */ + +export { worker_pipeline } from './worker_pipeline.js'; +export { worker_pipeline_handler } from './worker_pipeline_handler.js'; diff --git a/packages/transformers/src/worker/worker_pipeline.js b/packages/transformers/src/worker/worker_pipeline.js new file mode 100644 index 000000000..144a57882 --- /dev/null +++ b/packages/transformers/src/worker/worker_pipeline.js @@ -0,0 +1,120 @@ +/** + * @file Web Worker pipeline wrapper for executing pipelines in a worker thread. + * @module worker/worker_pipeline + */ + +import { + REQUEST_MESSAGE_TYPE, + RESPONSE_MESSAGE_TYPE_INVOKE_CALLBACK, + RESPONSE_MESSAGE_TYPE_RESULT, +} from './constants.js'; + +/** + * @typedef {import('../pipelines.js').PipelineType} PipelineType + */ + +/** + * Creates a pipeline that runs in a Web Worker. + * @param {Worker} worker - The Web Worker instance to use for pipeline execution + * @param {PipelineType} task - The pipeline task type + * @param {string} model_id - The model identifier to load + * @param {Record} [options={}] - Options for pipeline initialization + * @returns {Promise} A function that executes the pipeline and returns a Promise with the result + */ +export const worker_pipeline = (worker, task, model_id, options = {}) => + new Promise((resolve, reject) => { + /** + * Map storing callback functions by their ID. + * @type {Map} + */ + const callback_map = new Map(); + + /** + * Map storing promise resolvers/rejecters for each message. + * @type {Map} + */ + const messages_resolvers_map = new Map(); + + /** + * Counter for generating unique message IDs. + * @type {number} + */ + let message_id_counter = 0; + + /** + * Serializes options, converting functions to references that can be invoked via postMessage. + * @param {Record} options - The options object to serialize + * @returns {Record} The serialized options object + */ + const serialize_options = (options) => { + const out = {}; + Object.entries(options ?? {}).forEach(([key, value]) => { + if (typeof value === 'function') { + const function_id = `cb_${key}`; + callback_map.set(function_id, value); + out[key] = { __fn: true, functionId: function_id }; + } else { + out[key] = value; + } + }); + return out; + }; + + /** + * Message event handler for processing worker responses. + * @param {MessageEvent} e - The message event from the worker + */ + worker.onmessage = (e) => { + const msg = e.data; + + // Handle callback invocations from the worker + if (msg?.type === RESPONSE_MESSAGE_TYPE_INVOKE_CALLBACK) { + const { functionId, args } = msg; + const fn = callback_map.get(functionId); + if (fn) { + fn(...args); + } + } + + // Handle result messages + if (msg?.type === RESPONSE_MESSAGE_TYPE_RESULT) { + if (msg?.id === 'init') { + // Initial setup complete - resolve with the pipeline function + resolve((data, pipe_options) => { + return new Promise((resolve, reject) => { + const id = message_id_counter++; + messages_resolvers_map.set(id, { resolve, reject }); + worker.postMessage({ + id, + type: REQUEST_MESSAGE_TYPE, + data, + task, + model_id, + options: options ? serialize_options(options) : {}, + pipe_options, + }); + }); + }); + } else { + // Regular pipeline execution result + const resolver = messages_resolvers_map.get(msg.id); + if (resolver) { + if (msg.error) resolver.reject(msg.error); + else resolver.resolve(msg.result); + messages_resolvers_map.delete(msg.id); + } + } + } + }; + + // Initialize the pipeline in the worker + messages_resolvers_map.set('init', { resolve, reject }); + worker.postMessage({ + id: 'init', + type: REQUEST_MESSAGE_TYPE, + data: null, + task: task ?? '', + model_id: model_id ?? '', + options: options ? serialize_options(options) : {}, + }); + }); diff --git a/packages/transformers/src/worker/worker_pipeline_handler.js b/packages/transformers/src/worker/worker_pipeline_handler.js new file mode 100644 index 000000000..748d92daf --- /dev/null +++ b/packages/transformers/src/worker/worker_pipeline_handler.js @@ -0,0 +1,74 @@ +/** + * @file Web Worker pipeline handler for managing transformer pipelines in a worker context. + * @module worker/worker_pipeline_handler + */ + +import { pipeline } from '../pipelines.js'; +import { + REQUEST_MESSAGE_TYPE, + RESPONSE_MESSAGE_TYPE_INVOKE_CALLBACK, + RESPONSE_MESSAGE_TYPE_READY, + RESPONSE_MESSAGE_TYPE_RESULT, +} from './constants.js'; + +/** + * Cache for storing initialized pipelines by their configuration. + * @type {Map} + */ +const pipelines = new Map(); + +/** + * Creates a web worker pipeline handler that manages pipeline creation and execution. + * @returns {{onmessage: (event: MessageEvent) => Promise}} Handler object with onmessage method + */ +export const worker_pipeline_handler = () => { + /** + * Converts serialized options back to their original form. + * Handles special cases like callback functions that were serialized with __fn marker. + * @param {Record} options - The options object to unserialize + * @returns {Record} The unserialized options object + */ + const unserialize_options = (options) => { + const out = {}; + Object.entries(options ?? {}).forEach(([key, value]) => { + if (typeof value === 'object' && value && '__fn' in value && value.__fn) { + out[key] = (...args) => + self.postMessage({ + type: RESPONSE_MESSAGE_TYPE_INVOKE_CALLBACK, + function_id: 'functionId' in value ? value.functionId : null, + args, + }); + } else { + out[key] = value; + } + }); + return out; + }; + + return { + /** + * Message event handler for processing pipeline requests. + * @param {MessageEvent} event - The message event from the main thread + * @returns {Promise} + */ + onmessage: async (event) => { + if (!event?.data || event.data?.type !== REQUEST_MESSAGE_TYPE) return; + + const { id, data, task, model_id, options, pipe_options = {} } = event.data; + + const key = JSON.stringify({ task, model_id, options }); + let pipe = pipelines.get(key); + + if (!pipe) { + pipe = await pipeline(task, model_id, unserialize_options(options)); + pipelines.set(key, pipe); + } + + self.postMessage({ id, type: RESPONSE_MESSAGE_TYPE_READY }); + + const result = data ? await pipe(data, pipe_options) : null; + + self.postMessage({ id, type: RESPONSE_MESSAGE_TYPE_RESULT, result }); + }, + }; +};