diff --git a/reflex/.templates/web/utils/helpers/upload.js b/reflex/.templates/web/utils/helpers/upload.js new file mode 100644 index 00000000000..6bbfc746ed6 --- /dev/null +++ b/reflex/.templates/web/utils/helpers/upload.js @@ -0,0 +1,170 @@ +import JSON5 from "json5"; +import env from "$/env.json"; + +/** + * Upload files to the server. + * + * @param state The state to apply the delta to. + * @param handler The handler to use. + * @param upload_id The upload id to use. + * @param on_upload_progress The function to call on upload progress. + * @param socket the websocket connection + * @param extra_headers Extra headers to send with the request. + * @param refs The refs object to store the abort controller in. + * @param getBackendURL Function to get the backend URL. + * @param getToken Function to get the Reflex token. + * + * @returns The response from posting to the UPLOADURL endpoint. + */ +export const uploadFiles = async ( + handler, + files, + upload_id, + on_upload_progress, + extra_headers, + socket, + refs, + getBackendURL, + getToken, +) => { + // return if there's no file to upload + if (files === undefined || files.length === 0) { + return false; + } + + const upload_ref_name = `__upload_controllers_${upload_id}`; + + if (refs[upload_ref_name]) { + console.log("Upload already in progress for ", upload_id); + return false; + } + + // Track how many partial updates have been processed for this upload. + let resp_idx = 0; + const eventHandler = (progressEvent) => { + const event_callbacks = socket._callbacks.$event; + // Whenever called, responseText will contain the entire response so far. + const chunks = progressEvent.event.target.responseText.trim().split("\n"); + // So only process _new_ chunks beyond resp_idx. + chunks.slice(resp_idx).map((chunk_json) => { + try { + const chunk = JSON5.parse(chunk_json); + event_callbacks.map((f, ix) => { + f(chunk) + .then(() => { + if (ix === event_callbacks.length - 1) { + // Mark this chunk as processed. + resp_idx += 1; + } + }) + .catch((e) => { + if (progressEvent.progress === 1) { + // Chunk may be incomplete, so only report errors when full response is available. + console.log("Error processing chunk", chunk, e); + } + return; + }); + }); + } catch (e) { + if (progressEvent.progress === 1) { + console.log("Error parsing chunk", chunk_json, e); + } + return; + } + }); + }; + + const controller = new AbortController(); + const formdata = new FormData(); + + // Add the token and handler to the file name. + files.forEach((file) => { + formdata.append("files", file, file.path || file.name); + }); + + // Send the file to the server. + refs[upload_ref_name] = controller; + + return new Promise((resolve, reject) => { + const xhr = new XMLHttpRequest(); + + // Set up event handlers + xhr.onload = function () { + if (xhr.status >= 200 && xhr.status < 300) { + resolve({ + data: xhr.responseText, + status: xhr.status, + statusText: xhr.statusText, + headers: { + get: (name) => xhr.getResponseHeader(name), + }, + }); + } else { + reject(new Error(`HTTP error! status: ${xhr.status}`)); + } + }; + + xhr.onerror = function () { + reject(new Error("Network error")); + }; + + xhr.onabort = function () { + reject(new Error("Upload aborted")); + }; + + // Handle upload progress + if (on_upload_progress) { + xhr.upload.onprogress = function (event) { + if (event.lengthComputable) { + const progressEvent = { + loaded: event.loaded, + total: event.total, + progress: event.loaded / event.total, + }; + on_upload_progress(progressEvent); + } + }; + } + + // Handle download progress with streaming response parsing + xhr.onprogress = function (event) { + if (eventHandler) { + const progressEvent = { + event: { + target: { + responseText: xhr.responseText, + }, + }, + progress: event.lengthComputable ? event.loaded / event.total : 0, + }; + eventHandler(progressEvent); + } + }; + + // Handle abort controller + controller.signal.addEventListener("abort", () => { + xhr.abort(); + }); + + // Configure and send request + xhr.open("POST", getBackendURL(env.UPLOAD)); + xhr.setRequestHeader("Reflex-Client-Token", getToken()); + xhr.setRequestHeader("Reflex-Event-Handler", handler); + for (const [key, value] of Object.entries(extra_headers || {})) { + xhr.setRequestHeader(key, value); + } + + try { + xhr.send(formdata); + } catch (error) { + reject(error); + } + }) + .catch((error) => { + console.log("Upload error:", error.message); + return false; + }) + .finally(() => { + delete refs[upload_ref_name]; + }); +}; diff --git a/reflex/.templates/web/utils/state.js b/reflex/.templates/web/utils/state.js index 01eaa4dd915..a63fa4f09c8 100644 --- a/reflex/.templates/web/utils/state.js +++ b/reflex/.templates/web/utils/state.js @@ -20,10 +20,10 @@ import { } from "$/utils/context"; import debounce from "$/utils/helpers/debounce"; import throttle from "$/utils/helpers/throttle"; +import { uploadFiles } from "$/utils/helpers/upload"; // Endpoint URLs. const EVENTURL = env.EVENT; -const UPLOADURL = env.UPLOAD; // These hostnames indicate that the backend and frontend are reachable via the same domain. const SAME_DOMAIN_HOSTNAMES = ["localhost", "0.0.0.0", "::", "0:0:0:0:0:0:0:0"]; @@ -432,7 +432,11 @@ export const applyRestEvent = async (event, socket, navigate, params) => { event.payload.files, event.payload.upload_id, event.payload.on_upload_progress, + event.payload.extra_headers, socket, + refs, + getBackendURL, + getToken, ); return false; } @@ -614,163 +618,6 @@ export const connect = async ( document.addEventListener("visibilitychange", checkVisibility); }; -/** - * Upload files to the server. - * - * @param state The state to apply the delta to. - * @param handler The handler to use. - * @param upload_id The upload id to use. - * @param on_upload_progress The function to call on upload progress. - * @param socket the websocket connection - * - * @returns The response from posting to the UPLOADURL endpoint. - */ -export const uploadFiles = async ( - handler, - files, - upload_id, - on_upload_progress, - socket, -) => { - // return if there's no file to upload - if (files === undefined || files.length === 0) { - return false; - } - - const upload_ref_name = `__upload_controllers_${upload_id}`; - - if (refs[upload_ref_name]) { - console.log("Upload already in progress for ", upload_id); - return false; - } - - // Track how many partial updates have been processed for this upload. - let resp_idx = 0; - const eventHandler = (progressEvent) => { - const event_callbacks = socket._callbacks.$event; - // Whenever called, responseText will contain the entire response so far. - const chunks = progressEvent.event.target.responseText.trim().split("\n"); - // So only process _new_ chunks beyond resp_idx. - chunks.slice(resp_idx).map((chunk_json) => { - try { - const chunk = JSON5.parse(chunk_json); - event_callbacks.map((f, ix) => { - f(chunk) - .then(() => { - if (ix === event_callbacks.length - 1) { - // Mark this chunk as processed. - resp_idx += 1; - } - }) - .catch((e) => { - if (progressEvent.progress === 1) { - // Chunk may be incomplete, so only report errors when full response is available. - console.log("Error processing chunk", chunk, e); - } - return; - }); - }); - } catch (e) { - if (progressEvent.progress === 1) { - console.log("Error parsing chunk", chunk_json, e); - } - return; - } - }); - }; - - const controller = new AbortController(); - const formdata = new FormData(); - - // Add the token and handler to the file name. - files.forEach((file) => { - formdata.append("files", file, file.path || file.name); - }); - - // Send the file to the server. - refs[upload_ref_name] = controller; - - return new Promise((resolve, reject) => { - const xhr = new XMLHttpRequest(); - - // Set up event handlers - xhr.onload = function () { - if (xhr.status >= 200 && xhr.status < 300) { - resolve({ - data: xhr.responseText, - status: xhr.status, - statusText: xhr.statusText, - headers: { - get: (name) => xhr.getResponseHeader(name), - }, - }); - } else { - reject(new Error(`HTTP error! status: ${xhr.status}`)); - } - }; - - xhr.onerror = function () { - reject(new Error("Network error")); - }; - - xhr.onabort = function () { - reject(new Error("Upload aborted")); - }; - - // Handle upload progress - if (on_upload_progress) { - xhr.upload.onprogress = function (event) { - if (event.lengthComputable) { - const progressEvent = { - loaded: event.loaded, - total: event.total, - progress: event.loaded / event.total, - }; - on_upload_progress(progressEvent); - } - }; - } - - // Handle download progress with streaming response parsing - xhr.onprogress = function (event) { - if (eventHandler) { - const progressEvent = { - event: { - target: { - responseText: xhr.responseText, - }, - }, - progress: event.lengthComputable ? event.loaded / event.total : 0, - }; - eventHandler(progressEvent); - } - }; - - // Handle abort controller - controller.signal.addEventListener("abort", () => { - xhr.abort(); - }); - - // Configure and send request - xhr.open("POST", getBackendURL(UPLOADURL)); - xhr.setRequestHeader("Reflex-Client-Token", getToken()); - xhr.setRequestHeader("Reflex-Event-Handler", handler); - - try { - xhr.send(formdata); - } catch (error) { - reject(error); - } - }) - .catch((error) => { - console.log("Upload error:", error.message); - return false; - }) - .finally(() => { - delete refs[upload_ref_name]; - }); -}; - /** * Create an event object. * @param {string} name The name of the event. diff --git a/reflex/environment.py b/reflex/environment.py index 9cef0565cae..a9d2384a489 100644 --- a/reflex/environment.py +++ b/reflex/environment.py @@ -204,7 +204,7 @@ def interpret_env_var_value( The interpreted value. Raises: - ValueError: If the value is invalid. + ValueError: If the environment variable type is invalid. """ field_type = value_inside_optional(field_type) diff --git a/reflex/event.py b/reflex/event.py index ff4b84e9616..8cce59e4e78 100644 --- a/reflex/event.py +++ b/reflex/event.py @@ -846,6 +846,7 @@ class FileUpload: upload_id: str | None = None on_upload_progress: EventHandler | Callable | None = None + extra_headers: dict[str, str] | None = None @staticmethod def on_upload_progress_args_spec(_prog: Var[dict[str, int | float | bool]]): @@ -887,6 +888,12 @@ def as_event_spec(self, handler: EventHandler) -> EventSpec: Var(_js_expr="upload_id"), LiteralVar.create(upload_id), ), + ( + Var(_js_expr="extra_headers"), + LiteralVar.create( + self.extra_headers if self.extra_headers is not None else {} + ), + ), ] if self.on_upload_progress is not None: on_upload_progress = self.on_upload_progress