Add bulk operations engine with concurrent download workers#745
Open
jjjake wants to merge 29 commits into
Open
Add bulk operations engine with concurrent download workers#745jjjake wants to merge 29 commits into
jjjake wants to merge 29 commits into
Conversation
Introduces a generic bulk operations framework for the `ia` CLI with JSONL job logging, bitmap-based resume, multi-disk routing, and a pluggable worker interface. Download is the first operation supported. New packages: internetarchive.bulk (engine, joblog, worker, disk, ui) and internetarchive.workers (download). CLI gains --workers, --joblog, --batch-retries, and --status global options, plus --destdir (repeatable), --disk-margin, and --no-disk-check on `ia download`. 90 new tests covering all components and end-to-end integration. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Replace pending_jobs list mutation with a separate retry_queue so retried/backoff jobs are never lost when the job iterator is already exhausted. Previously, appending to pending_jobs while iterating it via job_iter caused both lost retries (iterator already done) and duplicate re-processing (stale items still in the list). Also removes unused parameters from _handle_exception. Adds regression test with max_workers=4 where all jobs fail on first attempt and must succeed on retry. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Extract download logic into _do_download() with a single finally block in execute() for disk pool release. Previously both the except and finally blocks called release(), causing double-release on exceptions. Also eliminates scattered release() calls for dark item, nonexistent item, and get_item failure early returns. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Record submission time alongside the future instead of measuring after future.result() returns (which was always ~0 since the future is already completed when polled from the done set). Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Move get_item() before disk_pool.route() so the reservation uses the real item size from metadata instead of hardcoded 0. This makes DiskPool's reservation tracking meaningful for concurrent workers. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Only fsync event lines (started/completed/failed) where crash recovery correctness matters. Job lines during the resolve phase skip fsync for performance — with 10M items this avoids 10M disk flushes. Job lines can be regenerated from the source. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
New load() method computes max_seq, bitmap, pending jobs, and status in one file scan. Previously each was a separate _iter_records() call, meaning a 10M-item joblog was re-read 3-4 times on resume. Engine now uses load() for both initial and resume paths. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Change BaseWorker.execute(identifier, job, cancel_event) to execute(job, cancel_event). Workers now extract their own primary key from the job dict (e.g. job["id"] for download), making the interface suitable for future upload/metadata/delete workers where the primary key may not be an identifier. The engine still reads job["id"] for UI display but no longer passes it separately to the worker. Resolve phase now accepts "id" as the canonical key (with "identifier" fallback for search API compatibility). Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
DownloadCancelled from File.download() is an intentional abort via cancel_event, not a transient error. Previously it hit the generic except Exception handler and was marked retry=True. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
- Remove unused _skipped counter (set but never read) - Guard signal.signal() for non-main thread (would raise ValueError) - Use rotating submit_count for worker_idx instead of len(futures) which had no relation to actual thread assignment Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…tegration tests - _parse_size() now raises ArgumentTypeError on empty, negative, or non-numeric input instead of crashing with IndexError/ValueError - _run_bulk() errors on --dry-run, --stdout, and positional file args which are not supported in bulk mode (previously silently ignored) - Add TestRunBulkIntegration: end-to-end tests for _run_bulk() → engine wiring including kwarg passthrough, joblog correctness, and resume Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Add NullUI handler that discards all events and use it when --quiet is passed, so bulk downloads produce no stderr output. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Thread progress_callback and file_callback through File.download() and Item.download() into DownloadWorker, which emits file-level UIEvents. Rewrite ProgressBarUI from a single item-count bar to a multi-bar tqdm display: one "Batch" bar for overall progress plus per-worker bars showing file name and byte-level throughput. Defer bar resets so completed small files stay visible at 100%. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Two-bar-per-worker layout: header (identifier + destdir) and progress (file + bytes). Color hierarchy: green bullet for active, dim green check for done, red cross for failed; bold identifier, dim path and filename. Progress bars use dimmed [#...] style with [] delimiters. New job_routed UIEvent carries destdir from DiskPool routing to UI. Fix missing worker field on completion/failure events. Replace submit_count modulo with free_slots set to prevent UI slot collisions. Shutdown: emit UIEvent instead of raw print (fixes tqdm collision), os._exit(1) on second ctrl-c for reliable hard exit. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Python's negative indexing means Bitmap.set(-1) silently corrupts the last byte of the backing array instead of raising an error. Add a guard on set() and __contains__(). Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Replace list with collections.deque so popleft() is O(1) instead of list.pop(0) which is O(n). Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…kers limit - Cap Bitmap at 100M entries to prevent unbounded allocation from malformed JSONL; build_resume_bitmap/load skip out-of-range seqs - Skip and warn on empty identifiers during resolve phase - Cap --workers at 20 with warning (each spawns its own session) - Sleep 30s after disk backoff clears to avoid busy-polling Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Add docs/source/batch.rst with full guide for the batch engine (job logging, resume, multi-disk routing, graceful shutdown, tuning, filtering, CLI reference, Python API). Nest it under the CLI page in the sidebar via hidden toctree. Update cli.rst with batch options in ia --help output, a batch downloading subsection, and updated performance tips. Add batch operations autodoc section to modules.rst. Add cross-references from parallel.rst. Add quick start entry in index.rst. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
The type annotation allowed None but default_factory=dict never
produces None. Align the annotation to dict and remove the
unnecessary defensive `or {}` guard in the engine.
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Replace except Exception with specific exception types: - get_item: (RequestException, OSError, ValueError) - item.download: (RequestException, OSError) The engine's orchestration-level catch remains broad as a safety net to prevent worker crashes from killing the engine. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
The except clauses in execute() only caught RequestException/OSError/ValueError, but any other exception type (KeyError, TypeError, JSONDecodeError, etc.) would propagate uncaught and kill the worker thread. Workers in a thread pool must catch all exceptions and record them in WorkerResult so the engine can handle retries. Also added pre-push test requirement to CLAUDE.md. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
# Conflicts: # internetarchive/__version__.py
Remove Connection: close header on bulk worker sessions to enable keep-alive, mount pooled HTTPAdapters for both archive.org and redirect hosts (ia*.us.archive.org), and neuter per-file mount_http_adapter() calls that were destroying the connection pool on every file download. Add aggregate throughput display (MB/s) to the overall progress bar and PlainUI. Throttle progress callbacks from ~768/s to ~6/s by buffering bytes and emitting every 2MB, and throttle overall bar refresh to once per second. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Replace dotted/hash progress bars with clean dashed green bars inspired by uv's terminal output. Changes apply to the batch downloader UI, single-item downloads, and item headers/status messages. Degrades gracefully on non-TTY and NO_COLOR. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Drop "downloading" prefix from single-item progress bars — just show the filename (truncated to 50 chars if needed, dimmed), matching the bulk engine's per-worker file display. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
internetarchive.bulk) with JSONL job logging, bitmap-based resume (1.2MB for 10M items), retry with backoff, multi-disk routing, and graceful shutdown (Ctrl+C)DownloadWorkeras the first operation, wrappingItem.download()with per-thread sessions,DiskPoolintegration, and cancel event supportia --workers N --joblog PATH download --search "query"for concurrent bulk downloads;ia --joblog PATH --statusfor progress summariescancel_eventparameter toFile.download()andItem.download()for cooperative cancellation during chunk streamingArchitecture
Components: JobLog (JSONL append-only log), BulkEngine (orchestrator), BaseWorker (ABC), DiskPool (multi-disk router), PlainUI (stderr handler), DownloadWorker
Usage
Test plan
ruff checkpassesFuture PRs (out of scope)
--uiflag, same UIHandler interface)🤖 Generated with Claude Code