1- //! DaemonEngine: in-memory wheel installation engine.
1+ //! DaemonEngine: GET+pipeline wheel installation engine.
22//!
3- //! Core state is all in-memory — completion tracking, demand signaling,
4- //! and stats use Arc+Mutex+Condvar. No file-based IPC, no tokio sync
5- //! primitives (so it works across separate tokio runtimes).
3+ //! Downloads full wheels via single GET requests (maximizing bandwidth),
4+ //! then extracts each wheel immediately as it finishes downloading.
5+ //! This pipelines download and extraction: while wheel N extracts,
6+ //! wheels N+1..N+K are still downloading.
67//!
7- //! Used by both the CLI and PyO3 bindings.
8+ //! Total time ≈ max(download_time, extract_time) instead of sum.
9+ //!
10+ //! All completion state is in-memory — Arc+Mutex+Condvar for cross-runtime
11+ //! wake. Used by both the CLI and PyO3 bindings.
812
913use anyhow:: { Context , Result } ;
14+ use futures:: StreamExt ;
1015use std:: collections:: { HashMap , HashSet } ;
1116use std:: path:: { Path , PathBuf } ;
1217use std:: sync:: atomic:: Ordering ;
@@ -16,12 +21,6 @@ use std::time::{Duration, Instant};
1621use crate :: extract:: { self , cleanup_stale_staging, ExtractStats } ;
1722use crate :: manifest:: WheelSpec ;
1823use crate :: queue:: InstallQueue ;
19- use crate :: streaming;
20-
21- /// Streaming threshold: wheels above this size use Range request streaming.
22- /// Streaming overlaps download+extraction via chunked Range requests, which
23- /// is faster than downloading the entire file before extracting.
24- const STREAM_THRESHOLD : u64 = 5 * 1024 * 1024 ; // 5MB
2524
2625/// Configuration for the daemon engine.
2726pub struct DaemonConfig {
@@ -58,11 +57,16 @@ struct CompletionState {
5857 all_finished : bool ,
5958}
6059
60+ /// A downloaded wheel ready for extraction.
61+ struct DownloadedWheel {
62+ spec : WheelSpec ,
63+ path : PathBuf ,
64+ }
65+
6166/// In-memory engine that downloads and extracts wheels progressively.
6267///
63- /// All state is in-memory. Use `signal_demand()` to prioritize a package,
64- /// `is_done()` / `wait_done()` to check completion, `wait_all()` to block
65- /// until everything finishes.
68+ /// Downloads full wheels via single GET (maximize bandwidth), extracts each
69+ /// immediately as it finishes (pipeline download + extraction).
6670pub struct DaemonEngine {
6771 queue : Arc < Mutex < InstallQueue > > ,
6872 /// Completion tracking — protected by Mutex + Condvar for cross-runtime wake
@@ -99,7 +103,10 @@ impl DaemonEngine {
99103
100104 /// Start downloading and extracting all wheels. Returns when all are done.
101105 ///
102- /// Call this from a tokio runtime (or spawn on a background thread).
106+ /// Architecture: GET+pipeline
107+ /// - Download workers (tokio tasks): single GET per wheel → temp file → channel
108+ /// - Extract worker (spawn_blocking): receives from channel → extract to site-packages
109+ /// - Backpressure: channel capacity limits temp disk usage
103110 pub async fn run ( & self , config : & DaemonConfig ) -> Result < ( ) > {
104111 let start = Instant :: now ( ) ;
105112
@@ -114,9 +121,19 @@ impl DaemonEngine {
114121 . pool_max_idle_per_host ( config. parallel_downloads )
115122 . build ( ) ?;
116123
124+ // Channel: downloaded wheels flow from download workers → extract worker.
125+ // Small capacity (4) provides backpressure — if extraction is slow,
126+ // downloads pause rather than filling disk with temp files.
127+ let ( tx, rx) = tokio:: sync:: mpsc:: channel :: < DownloadedWheel > ( 4 ) ;
128+
129+ let tmp_dir = tempfile:: tempdir ( ) . context ( "failed to create temp dir" ) ?;
130+ let tmp_path = tmp_dir. path ( ) . to_path_buf ( ) ;
131+
132+ // === Download stage ===
133+ // Spawn one task per wheel, bounded by semaphore.
134+ // Each task: GET wheel → write to temp file → send through channel.
117135 let sem = Arc :: new ( tokio:: sync:: Semaphore :: new ( config. parallel_downloads ) ) ;
118- let mut handles = Vec :: new ( ) ;
119- let total_wheels = self . total_wheels ;
136+ let mut download_handles = Vec :: new ( ) ;
120137
121138 loop {
122139 let wheel = {
@@ -130,50 +147,85 @@ impl DaemonEngine {
130147 } ;
131148
132149 let client = client. clone ( ) ;
133- let site_packages = config. site_packages . clone ( ) ;
134- let stats = self . stats . clone ( ) ;
150+ let tx = tx. clone ( ) ;
135151 let sem = sem. clone ( ) ;
136- let ext_threads = config. extract_threads ;
137- let completion = self . completion . clone ( ) ;
138- let queue = self . queue . clone ( ) ;
152+ let tmp_path = tmp_path. clone ( ) ;
139153
140154 let handle = tokio:: spawn ( async move {
141155 let _permit = sem. acquire ( ) . await . context ( "semaphore closed" ) ?;
142156
143157 let dist = wheel. distribution . clone ( ) ;
144- let wheel_start = Instant :: now ( ) ;
145-
146- tracing:: info!( "[{dist}] starting download ({} bytes)" , wheel. size) ;
147-
148- let result = if wheel. size >= STREAM_THRESHOLD {
149- streaming:: stream_extract_wheel_atomic (
150- & client,
151- & wheel. url ,
152- & site_packages,
153- & dist,
154- 8 ,
155- & stats,
156- )
157- . await
158- . map ( |_| ( ) )
159- } else {
160- download_and_extract_atomic (
161- & client,
162- & wheel. url ,
163- & site_packages,
164- & dist,
165- ext_threads,
166- & stats,
167- )
168- . await
169- } ;
158+ let dl_start = Instant :: now ( ) ;
159+
160+ tracing:: info!( "[{dist}] downloading ({} bytes)" , wheel. size) ;
161+
162+ let result =
163+ download_wheel_to_file ( & client, & wheel. url , & tmp_path, & dist) . await ;
164+
165+ match result {
166+ Ok ( path) => {
167+ let elapsed = dl_start. elapsed ( ) ;
168+ tracing:: info!(
169+ "[{dist}] downloaded in {:.1}s" ,
170+ elapsed. as_secs_f64( )
171+ ) ;
172+ // Send to extract worker — blocks if channel is full (backpressure)
173+ tx. send ( DownloadedWheel { spec : wheel, path } )
174+ . await
175+ . ok ( ) ;
176+ }
177+ Err ( e) => {
178+ tracing:: error!( "[{dist}] download failed: {e:#}" ) ;
179+ // Return error info so we can mark it failed
180+ return Err ( e) ;
181+ }
182+ }
183+
184+ Ok :: < String , anyhow:: Error > ( dist)
185+ } ) ;
186+
187+ download_handles. push ( handle) ;
188+ }
189+
190+ // Drop our copy of tx so channel closes when all download tasks finish
191+ drop ( tx) ;
192+
193+ // === Extract stage ===
194+ // Single blocking loop: receives downloaded wheels, extracts each immediately.
195+ // Extraction uses all extract_threads for parallelism within a single wheel.
196+ let site_packages = config. site_packages . clone ( ) ;
197+ let ext_threads = config. extract_threads ;
198+ let stats = self . stats . clone ( ) ;
199+ let completion = self . completion . clone ( ) ;
200+ let queue = self . queue . clone ( ) ;
201+ let total_wheels = self . total_wheels ;
202+
203+ let extract_handle = tokio:: task:: spawn_blocking ( move || {
204+ let rx = rx;
205+ // blocking_recv in a loop — channel closes when all downloads finish
206+ let mut rx = rx;
207+ while let Some ( downloaded) = rx. blocking_recv ( ) {
208+ let dist = downloaded. spec . distribution . clone ( ) ;
209+ let extract_start = Instant :: now ( ) ;
210+
211+ let result = extract:: extract_wheel_atomic (
212+ & downloaded. path ,
213+ & site_packages,
214+ & dist,
215+ ext_threads,
216+ true ,
217+ & stats,
218+ ) ;
170219
171220 let ( lock, cvar) = & * completion;
172221
173222 match result {
174223 Ok ( ( ) ) => {
175- let elapsed = wheel_start. elapsed ( ) ;
176- tracing:: info!( "[{dist}] done in {:.1}s" , elapsed. as_secs_f64( ) ) ;
224+ let elapsed = extract_start. elapsed ( ) ;
225+ tracing:: info!(
226+ "[{dist}] extracted in {:.1}s" ,
227+ elapsed. as_secs_f64( )
228+ ) ;
177229
178230 {
179231 let mut q = queue. lock ( ) . unwrap ( ) ;
@@ -189,7 +241,7 @@ impl DaemonEngine {
189241 }
190242 Err ( e) => {
191243 let err_msg = format ! ( "{e:#}" ) ;
192- tracing:: error!( "[{dist}] failed: {err_msg}" ) ;
244+ tracing:: error!( "[{dist}] extraction failed: {err_msg}" ) ;
193245
194246 {
195247 let mut q = queue. lock ( ) . unwrap ( ) ;
@@ -205,18 +257,29 @@ impl DaemonEngine {
205257 }
206258 }
207259
208- Ok :: < _ , anyhow:: Error > ( ( ) )
209- } ) ;
210-
211- handles. push ( handle) ;
212- }
213-
214- for handle in handles {
215- if let Err ( e) = handle. await ? {
216- tracing:: error!( "worker error: {e}" ) ;
260+ // Clean up temp file
261+ let _ = std:: fs:: remove_file ( & downloaded. path ) ;
262+ }
263+ } ) ;
264+
265+ // === Wait for download failures ===
266+ // Collect download errors and mark them as failed
267+ for handle in download_handles {
268+ match handle. await {
269+ Ok ( Ok ( _dist) ) => { }
270+ Ok ( Err ( e) ) => {
271+ // Download failed — error already logged, but we need to mark completion
272+ tracing:: error!( "download worker error: {e}" ) ;
273+ }
274+ Err ( e) => {
275+ tracing:: error!( "download task panicked: {e}" ) ;
276+ }
217277 }
218278 }
219279
280+ // Wait for extract worker to finish
281+ extract_handle. await ?;
282+
220283 // Mark all finished
221284 {
222285 let ( lock, cvar) = & * self . completion ;
@@ -291,7 +354,6 @@ impl DaemonEngine {
291354 return Ok ( false ) ;
292355 }
293356 if state. all_finished {
294- // All wheels done but this one isn't in done or failed — not in our set
295357 return Ok ( true ) ;
296358 }
297359
@@ -303,7 +365,6 @@ impl DaemonEngine {
303365 let ( guard, timeout_result) = cvar. wait_timeout ( state, remaining) . unwrap ( ) ;
304366 state = guard;
305367 if timeout_result. timed_out ( ) {
306- // Check one more time
307368 if state. done . contains ( distribution) {
308369 return Ok ( true ) ;
309370 }
@@ -348,7 +409,6 @@ impl DaemonEngine {
348409 let done = state. done . len ( ) ;
349410 let failed = state. failed . len ( ) ;
350411 let pending_plus_in_progress = self . total_wheels . saturating_sub ( done + failed) ;
351- // Approximate split — queue tracks the precise counts
352412 let q = self . queue . lock ( ) . unwrap ( ) ;
353413 let pending = q. pending_count ( ) ;
354414 let in_progress = pending_plus_in_progress. saturating_sub ( pending) ;
@@ -373,34 +433,34 @@ impl DaemonEngine {
373433 }
374434}
375435
376- /// Download a whole wheel file, then extract atomically .
377- async fn download_and_extract_atomic (
436+ /// Download a single wheel via GET to a temp file .
437+ async fn download_wheel_to_file (
378438 client : & reqwest:: Client ,
379439 url : & str ,
380- site_packages : & Path ,
440+ tmp_dir : & Path ,
381441 pkg_name : & str ,
382- threads : usize ,
383- stats : & Arc < ExtractStats > ,
384- ) -> Result < ( ) > {
385- let tmp_dir = tempfile:: tempdir ( ) . context ( "failed to create temp dir" ) ?;
442+ ) -> Result < PathBuf > {
386443 let filename = url. rsplit ( '/' ) . next ( ) . unwrap_or ( "wheel.whl" ) ;
387444 let filename = urlencoding:: decode ( filename)
388445 . map ( |s| s. into_owned ( ) )
389446 . unwrap_or_else ( |_| filename. to_string ( ) ) ;
390- let dest = tmp_dir. path ( ) . join ( & filename) ;
447+ // Use pkg_name prefix to avoid collisions
448+ let dest = tmp_dir. join ( format ! ( "{pkg_name}_{filename}" ) ) ;
391449
392450 let resp = client. get ( url) . send ( ) . await ?. error_for_status ( ) ?;
393- let bytes = resp. bytes ( ) . await ?;
394- tokio:: fs:: write ( & dest, & bytes) . await ?;
395451
396- let site_packages = site_packages. to_path_buf ( ) ;
397- let pkg_name = pkg_name. to_string ( ) ;
398- let stats = stats. clone ( ) ;
452+ // Stream to file — don't buffer entire wheel in memory
453+ let mut stream = resp. bytes_stream ( ) ;
454+ let file = tokio:: fs:: File :: create ( & dest) . await ?;
455+ let mut writer = tokio:: io:: BufWriter :: with_capacity ( 1024 * 1024 , file) ;
456+
457+ while let Some ( chunk) = stream. next ( ) . await {
458+ let chunk = chunk?;
459+ tokio:: io:: AsyncWriteExt :: write_all ( & mut writer, & chunk) . await ?;
460+ }
461+ tokio:: io:: AsyncWriteExt :: flush ( & mut writer) . await ?;
399462
400- tokio:: task:: spawn_blocking ( move || {
401- extract:: extract_wheel_atomic ( & dest, & site_packages, & pkg_name, threads, true , & stats)
402- } )
403- . await ?
463+ Ok ( dest)
404464}
405465
406466#[ cfg( test) ]
@@ -438,7 +498,6 @@ mod tests {
438498 ] ;
439499 let engine = DaemonEngine :: new ( wheels) ;
440500
441- // Signal demand for torch — should move it to front
442501 engine. signal_demand ( "torch" ) ;
443502
444503 let mut q = engine. queue . lock ( ) . unwrap ( ) ;
@@ -467,15 +526,13 @@ mod tests {
467526 site_packages : PathBuf :: from ( "/tmp/zs-test-empty-engine" ) ,
468527 ..Default :: default ( )
469528 } ;
470- // Should complete immediately
471529 engine. run ( & config) . await . unwrap ( ) ;
472530 let _ = std:: fs:: remove_dir_all ( "/tmp/zs-test-empty-engine" ) ;
473531 }
474532
475533 #[ test]
476534 fn test_wait_done_not_in_set ( ) {
477535 let engine = DaemonEngine :: new ( vec ! [ ] ) ;
478- // Distribution not in our set — should return Ok(true) immediately
479536 let result = engine. wait_done ( "nonexistent" , Duration :: from_secs ( 1 ) ) . unwrap ( ) ;
480537 assert ! ( result) ;
481538 }
0 commit comments