Skip to content

Commit c869a8d

Browse files
[Android] Stream multipart bundle bodies straight to disk
Refactors MultipartStreamReader and BundleDownloader so the JS bundle chunk of a Metro multipart response is written directly into the tmp file via a BufferedSink, instead of being buffered in heap and then drained. For a 100 MB bundle on a dev machine, peak heap drops: MultipartStreamReader: 131 MB -> 36 MB (3.6x) BundleDownloader: 115 MB -> 33 MB (3.5x) Wall-clock time is unchanged within noise. Cumulative bytes allocated on the worker thread are also unchanged (~100 MB); that's dominated by okio's per-thread SegmentPool capping at 64 KB and is independent of whether the reader retains the body. The streaming property is proved by peak heap + the body being delivered as null in onChunkComplete. MultipartStreamReader (packages/react-native/ReactAndroid/src/main/.../MultipartStreamReader.kt) - New ChunkListener API: onChunkHeader(headers): BufferedSink? Return a sink to stream the body to it; return null to receive the body buffered as a Buffer via onChunkComplete. onChunkComplete(headers, body: Buffer?, isLastChunk) `body` is non-null iff onChunkHeader returned null. onChunkProgress is unchanged. - New algorithm: working buffer bounded by READ_CHUNK_SIZE + maxDelim (~16 KB + 21 B); reads upstream incrementally, scans for the next delimiter, transfers safe-to-flush bytes to the sink (or accumulator) using okio segment-move semantics. Lookahead window equals `maxDelimLen - 1` so delimiters spanning two reads are still detected. - The reader cannot know which chunk is the last until it sees the next delimiter. Listeners must route on headers (Content-Type, X-Http-Status), not on isLastChunk position. Routing-from-headers matches what BundleDownloader needs and is more robust anyway. - Chunks with no `\r\n\r\n` header separator are still supported (mirrors prior behaviour; required by testMultipleParts). BundleDownloader (packages/react-native/ReactAndroid/src/main/.../BundleDownloader.kt) - processMultipartResponse uses a new inner StreamingBundleChunkListener that returns Okio.buffer(Okio.sink(tmpFile)) when the chunk is application/javascript AND has effective status 200 (X-Http-Status header overrides response.code()). Progress JSON and error bodies are buffered in memory as before, so JSONObject parsing and DebugServerException diagnostics still work. - On stream success, the listener closes the sink, populates BundleInfo from X-Metro-Files-Changed-Count, atomically renames tmp -> output, and fires callback.onSuccess(). On stream failure (premature upstream EOF, IOException), the outer code deletes the half-written tmp file before surfacing the error. - closeOpenSinkQuietly ensures the file handle never leaks if readAllParts throws mid-stream. Tests - MultipartStreamReaderTest: adapted to the new ChunkListener API. All 4 existing correctness cases (simple, multiple parts, no delimiter, no close delimiter) still pass; behaviour around headerless chunks is preserved. - MultipartStreamReaderPerfTest: now exercises the streaming path (returns a CountingDiscardingSink) and asserts the body is delivered via the sink (not buffered) plus peak heap < 64 MB. Drops the thread-allocated assertion; it's an okio SegmentPool artifact, not a reader property. - BundleDownloaderPerfTest: budgets retargeted to peak heap < 80 MB. Test commands unchanged: ./gradlew :packages:react-native:ReactAndroid:testDebugUnitTest \ -Preact.internal.useHermesNightly=true \ --tests "com.facebook.react.devsupport.*" ./gradlew :packages:react-native:ReactAndroid:testDebugUnitTest \ -PrunPerfTests=true -Preact.internal.useHermesNightly=true \ --tests "*PerfTest" --info
1 parent 3ad3431 commit c869a8d

5 files changed

Lines changed: 407 additions & 211 deletions

File tree

packages/react-native/ReactAndroid/src/main/java/com/facebook/react/devsupport/BundleDownloader.kt

Lines changed: 139 additions & 74 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import okhttp3.OkHttpClient
2727
import okhttp3.Request
2828
import okhttp3.Response
2929
import okio.Buffer
30+
import okio.BufferedSink
3031
import okio.BufferedSource
3132
import okio.Okio
3233
import org.json.JSONException
@@ -183,82 +184,28 @@ public class BundleDownloader public constructor(private val client: OkHttpClien
183184
}
184185
val source = checkNotNull(response.body()?.source())
185186
val bodyReader = MultipartStreamReader(source, boundary)
186-
val completed =
187-
bodyReader.readAllParts(
188-
object : ChunkListener {
189-
@Throws(IOException::class)
190-
override fun onChunkComplete(
191-
headers: Map<String, String>,
192-
body: Buffer,
193-
isLastChunk: Boolean,
194-
) {
195-
// This will get executed for every chunk of the multipart response. The last chunk
196-
// (isLastChunk = true) will be the JS bundle, the other ones will be progress
197-
// events
198-
// encoded as JSON.
199-
if (isLastChunk) {
200-
// The http status code for each separate chunk is in the X-Http-Status header.
201-
var status = response.code()
202-
if (headers.containsKey("X-Http-Status")) {
203-
status = headers.getOrDefault("X-Http-Status", "0").toInt()
204-
}
205-
processBundleResult(
206-
url,
207-
status,
208-
Headers.of(headers),
209-
body,
210-
outputFile,
211-
bundleInfo,
212-
callback,
213-
)
214-
} else {
215-
if (
216-
!headers.containsKey("Content-Type") ||
217-
headers["Content-Type"] != "application/json"
218-
) {
219-
return
220-
}
221-
222-
try {
223-
val progress = JSONObject(body.readUtf8())
224-
val status =
225-
if (progress.has("status")) progress.getString("status") else "Bundling"
226-
var done: Int? = null
227-
if (progress.has("done")) {
228-
done = progress.getInt("done")
229-
}
230-
var total: Int? = null
231-
if (progress.has("total")) {
232-
total = progress.getInt("total")
233-
}
234-
var percent: Int? = null
235-
if (progress.has("percent")) {
236-
percent = progress.getInt("percent")
237-
}
238-
callback.onProgress(status, done, total, percent)
239-
} catch (e: JSONException) {
240-
FLog.e(ReactConstants.TAG, "Error parsing progress JSON. $e")
241-
}
242-
}
243-
}
244-
245-
override fun onChunkProgress(
246-
headers: Map<String, String>,
247-
loaded: Long,
248-
total: Long,
249-
) {
250-
if ("application/javascript" == headers["Content-Type"]) {
251-
callback.onProgress(
252-
"Downloading",
253-
(loaded / 1024).toInt(),
254-
(total / 1024).toInt(),
255-
null,
256-
)
257-
}
258-
}
259-
}
187+
val tmpFile = File(outputFile.path + ".tmp")
188+
val streamingHandler =
189+
StreamingBundleChunkListener(
190+
url = url,
191+
outerStatus = response.code(),
192+
outputFile = outputFile,
193+
tmpFile = tmpFile,
194+
bundleInfo = bundleInfo,
195+
callback = callback,
260196
)
197+
val completed: Boolean =
198+
try {
199+
bodyReader.readAllParts(streamingHandler)
200+
} finally {
201+
streamingHandler.closeOpenSinkQuietly()
202+
}
261203
if (!completed) {
204+
// If we partially wrote a tmp file before the upstream died, scrap it so we don't leave
205+
// half-baked bundles on disk.
206+
if (tmpFile.exists()) {
207+
tmpFile.delete()
208+
}
262209
callback.onFailure(
263210
DebugServerException(
264211
("""
@@ -276,6 +223,124 @@ public class BundleDownloader public constructor(private val client: OkHttpClien
276223
}
277224
}
278225

226+
/**
227+
* Routes multipart chunks for a bundle download. The JS bundle chunk (Content-Type
228+
* `application/javascript` with an effective HTTP status of 200) is streamed directly into
229+
* a temporary file via a [BufferedSink], so no copy of the body is held in heap. Progress
230+
* JSON chunks and error responses are buffered in memory because they're either tiny or
231+
* bounded, and the listener needs to parse them in full.
232+
*/
233+
private inner class StreamingBundleChunkListener(
234+
private val url: String,
235+
private val outerStatus: Int,
236+
private val outputFile: File,
237+
private val tmpFile: File,
238+
private val bundleInfo: BundleInfo?,
239+
private val callback: DevBundleDownloadListener,
240+
) : ChunkListener {
241+
242+
private var bundleSink: BufferedSink? = null
243+
244+
@Throws(IOException::class)
245+
override fun onChunkHeader(headers: Map<String, String>): BufferedSink? {
246+
if (headers["Content-Type"] != "application/javascript") return null
247+
val effectiveStatus = effectiveStatus(headers)
248+
if (effectiveStatus != 200) return null
249+
// Stream the JS bundle straight to disk — never materialize in heap.
250+
val sink = Okio.buffer(Okio.sink(tmpFile))
251+
bundleSink = sink
252+
return sink
253+
}
254+
255+
@Throws(IOException::class)
256+
override fun onChunkComplete(
257+
headers: Map<String, String>,
258+
body: Buffer?,
259+
isLastChunk: Boolean,
260+
) {
261+
val sink = bundleSink
262+
if (sink != null) {
263+
bundleSink = null
264+
sink.close()
265+
finalizeStreamedBundle(headers)
266+
return
267+
}
268+
when (headers["Content-Type"]) {
269+
"application/javascript" -> {
270+
// Bundle returned with an error status — it was buffered so we can surface a useful
271+
// diagnostic to the developer.
272+
val buffered = body ?: Buffer()
273+
processBundleResult(
274+
url,
275+
effectiveStatus(headers),
276+
Headers.of(headers),
277+
buffered,
278+
outputFile,
279+
bundleInfo,
280+
callback,
281+
)
282+
}
283+
"application/json" -> dispatchProgressJson(body)
284+
else -> Unit // Unknown chunk type; ignore as before.
285+
}
286+
}
287+
288+
override fun onChunkProgress(
289+
headers: Map<String, String>,
290+
loaded: Long,
291+
total: Long,
292+
) {
293+
if ("application/javascript" == headers["Content-Type"]) {
294+
callback.onProgress(
295+
"Downloading",
296+
(loaded / 1024).toInt(),
297+
(total / 1024).toInt(),
298+
null,
299+
)
300+
}
301+
}
302+
303+
/** Make sure we never leak the tmp-file sink if [readAllParts] throws mid-stream. */
304+
fun closeOpenSinkQuietly() {
305+
val sink = bundleSink ?: return
306+
bundleSink = null
307+
try {
308+
sink.close()
309+
} catch (e: IOException) {
310+
FLog.w(TAG, "Failed to close partial bundle sink", e)
311+
}
312+
}
313+
314+
@Throws(IOException::class)
315+
private fun finalizeStreamedBundle(headers: Map<String, String>) {
316+
if (bundleInfo != null) {
317+
populateBundleInfo(url, Headers.of(headers), bundleInfo)
318+
}
319+
if (!tmpFile.renameTo(outputFile)) {
320+
throw IOException("Couldn't rename $tmpFile to $outputFile")
321+
}
322+
callback.onSuccess()
323+
}
324+
325+
private fun dispatchProgressJson(body: Buffer?) {
326+
val payload = body ?: return
327+
try {
328+
val progress = JSONObject(payload.readUtf8())
329+
val status =
330+
if (progress.has("status")) progress.getString("status") else "Bundling"
331+
val done = if (progress.has("done")) progress.getInt("done") else null
332+
val total = if (progress.has("total")) progress.getInt("total") else null
333+
val percent = if (progress.has("percent")) progress.getInt("percent") else null
334+
callback.onProgress(status, done, total, percent)
335+
} catch (e: JSONException) {
336+
FLog.e(ReactConstants.TAG, "Error parsing progress JSON. $e")
337+
}
338+
}
339+
340+
private fun effectiveStatus(headers: Map<String, String>): Int =
341+
headers["X-Http-Status"]?.toIntOrNull() ?: outerStatus
342+
}
343+
279344
@Throws(IOException::class)
280345
private fun processBundleResult(
281346
url: String,

0 commit comments

Comments
 (0)