Skip to content

Add bulk operations engine with concurrent download workers#745

Open
jjjake wants to merge 29 commits into
masterfrom
bulk-engine
Open

Add bulk operations engine with concurrent download workers#745
jjjake wants to merge 29 commits into
masterfrom
bulk-engine

Conversation

@jjjake
Copy link
Copy Markdown
Owner

@jjjake jjjake commented Feb 10, 2026

Summary

  • Adds a generic bulk operations engine (internetarchive.bulk) with JSONL job logging, bitmap-based resume (1.2MB for 10M items), retry with backoff, multi-disk routing, and graceful shutdown (Ctrl+C)
  • Implements DownloadWorker as the first operation, wrapping Item.download() with per-thread sessions, DiskPool integration, and cancel event support
  • Wires into CLI: ia --workers N --joblog PATH download --search "query" for concurrent bulk downloads; ia --joblog PATH --status for progress summaries
  • Adds cancel_event parameter to File.download() and Item.download() for cooperative cancellation during chunk streaming

Architecture

[input source] --> [joblog] --> [bitmap filter] --> [thread pool] --> [joblog append]
 (search/itemlist)  (job lines)  (skip completed)   (N workers)     (event lines)

Components: JobLog (JSONL append-only log), BulkEngine (orchestrator), BaseWorker (ABC), DiskPool (multi-disk router), PlainUI (stderr handler), DownloadWorker

Usage

# Bulk download from search
ia --workers 4 --joblog nasa.jsonl download --search "collection:nasa"

# Resume interrupted session
ia --joblog nasa.jsonl download

# Multi-disk routing
ia --workers 8 --joblog j.jsonl download --search "collection:nasa" \
    --destdir /mnt/disk1 --destdir /mnt/disk2 --disk-margin 2G

# Job status summary
ia --joblog nasa.jsonl --status

Test plan

  • 90 new tests across 8 test files (unit + integration)
  • All 265 existing tests pass (0 regressions)
  • ruff check passes
  • All pre-commit hooks pass (ruff, black, codespell, mypy, setup-cfg-fmt)
  • Manual smoke test with live archive.org search
  • Verify resume works after Ctrl+C interrupt

Future PRs (out of scope)

  • PR 2: Textual TUI (--ui flag, same UIHandler interface)
  • PR 3: Auto-joblog paths, interactive startup wizard
  • PR 4+: Upload worker, metadata worker, CSV/JSONL input sources

🤖 Generated with Claude Code

jjjake and others added 29 commits February 9, 2026 20:26
Introduces a generic bulk operations framework for the `ia` CLI with
JSONL job logging, bitmap-based resume, multi-disk routing, and a
pluggable worker interface. Download is the first operation supported.

New packages: internetarchive.bulk (engine, joblog, worker, disk, ui)
and internetarchive.workers (download). CLI gains --workers, --joblog,
--batch-retries, and --status global options, plus --destdir (repeatable),
--disk-margin, and --no-disk-check on `ia download`.

90 new tests covering all components and end-to-end integration.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Replace pending_jobs list mutation with a separate retry_queue so
retried/backoff jobs are never lost when the job iterator is already
exhausted. Previously, appending to pending_jobs while iterating it
via job_iter caused both lost retries (iterator already done) and
duplicate re-processing (stale items still in the list).

Also removes unused parameters from _handle_exception.

Adds regression test with max_workers=4 where all jobs fail on first
attempt and must succeed on retry.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Extract download logic into _do_download() with a single finally
block in execute() for disk pool release. Previously both the
except and finally blocks called release(), causing double-release
on exceptions. Also eliminates scattered release() calls for dark
item, nonexistent item, and get_item failure early returns.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Record submission time alongside the future instead of measuring
after future.result() returns (which was always ~0 since the
future is already completed when polled from the done set).

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Move get_item() before disk_pool.route() so the reservation uses
the real item size from metadata instead of hardcoded 0. This makes
DiskPool's reservation tracking meaningful for concurrent workers.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Only fsync event lines (started/completed/failed) where crash
recovery correctness matters. Job lines during the resolve phase
skip fsync for performance — with 10M items this avoids 10M
disk flushes. Job lines can be regenerated from the source.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
New load() method computes max_seq, bitmap, pending jobs, and status
in one file scan. Previously each was a separate _iter_records() call,
meaning a 10M-item joblog was re-read 3-4 times on resume. Engine now
uses load() for both initial and resume paths.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Change BaseWorker.execute(identifier, job, cancel_event) to
execute(job, cancel_event). Workers now extract their own primary
key from the job dict (e.g. job["id"] for download), making the
interface suitable for future upload/metadata/delete workers where
the primary key may not be an identifier.

The engine still reads job["id"] for UI display but no longer
passes it separately to the worker.

Resolve phase now accepts "id" as the canonical key (with
"identifier" fallback for search API compatibility).

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
DownloadCancelled from File.download() is an intentional abort via
cancel_event, not a transient error. Previously it hit the generic
except Exception handler and was marked retry=True.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
- Remove unused _skipped counter (set but never read)
- Guard signal.signal() for non-main thread (would raise ValueError)
- Use rotating submit_count for worker_idx instead of len(futures)
  which had no relation to actual thread assignment

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…tegration tests

- _parse_size() now raises ArgumentTypeError on empty, negative, or
  non-numeric input instead of crashing with IndexError/ValueError
- _run_bulk() errors on --dry-run, --stdout, and positional file args
  which are not supported in bulk mode (previously silently ignored)
- Add TestRunBulkIntegration: end-to-end tests for _run_bulk() → engine
  wiring including kwarg passthrough, joblog correctness, and resume

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Add NullUI handler that discards all events and use it when
--quiet is passed, so bulk downloads produce no stderr output.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Thread progress_callback and file_callback through File.download()
and Item.download() into DownloadWorker, which emits file-level
UIEvents. Rewrite ProgressBarUI from a single item-count bar to a
multi-bar tqdm display: one "Batch" bar for overall progress plus
per-worker bars showing file name and byte-level throughput. Defer
bar resets so completed small files stay visible at 100%.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Two-bar-per-worker layout: header (identifier + destdir) and progress
(file + bytes). Color hierarchy: green bullet for active, dim green
check for done, red cross for failed; bold identifier, dim path and
filename. Progress bars use dimmed [#...] style with [] delimiters.

New job_routed UIEvent carries destdir from DiskPool routing to UI.
Fix missing worker field on completion/failure events. Replace
submit_count modulo with free_slots set to prevent UI slot collisions.

Shutdown: emit UIEvent instead of raw print (fixes tqdm collision),
os._exit(1) on second ctrl-c for reliable hard exit.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Python's negative indexing means Bitmap.set(-1) silently corrupts the
last byte of the backing array instead of raising an error. Add a guard
on set() and __contains__().

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Replace list with collections.deque so popleft() is O(1) instead of
list.pop(0) which is O(n).

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…kers limit

- Cap Bitmap at 100M entries to prevent unbounded allocation from
  malformed JSONL; build_resume_bitmap/load skip out-of-range seqs
- Skip and warn on empty identifiers during resolve phase
- Cap --workers at 20 with warning (each spawns its own session)
- Sleep 30s after disk backoff clears to avoid busy-polling

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Add docs/source/batch.rst with full guide for the batch engine
(job logging, resume, multi-disk routing, graceful shutdown,
tuning, filtering, CLI reference, Python API). Nest it under
the CLI page in the sidebar via hidden toctree.

Update cli.rst with batch options in ia --help output, a batch
downloading subsection, and updated performance tips. Add batch
operations autodoc section to modules.rst. Add cross-references
from parallel.rst. Add quick start entry in index.rst.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
The type annotation allowed None but default_factory=dict never
produces None. Align the annotation to dict and remove the
unnecessary defensive `or {}` guard in the engine.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Replace except Exception with specific exception types:
- get_item: (RequestException, OSError, ValueError)
- item.download: (RequestException, OSError)

The engine's orchestration-level catch remains broad as a
safety net to prevent worker crashes from killing the engine.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
The except clauses in execute() only caught RequestException/OSError/ValueError,
but any other exception type (KeyError, TypeError, JSONDecodeError, etc.) would
propagate uncaught and kill the worker thread. Workers in a thread pool must
catch all exceptions and record them in WorkerResult so the engine can handle
retries. Also added pre-push test requirement to CLAUDE.md.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
# Conflicts:
#	internetarchive/__version__.py
Remove Connection: close header on bulk worker sessions to enable
keep-alive, mount pooled HTTPAdapters for both archive.org and redirect
hosts (ia*.us.archive.org), and neuter per-file mount_http_adapter()
calls that were destroying the connection pool on every file download.

Add aggregate throughput display (MB/s) to the overall progress bar and
PlainUI. Throttle progress callbacks from ~768/s to ~6/s by buffering
bytes and emitting every 2MB, and throttle overall bar refresh to once
per second.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Replace dotted/hash progress bars with clean dashed green bars
inspired by uv's terminal output. Changes apply to the batch
downloader UI, single-item downloads, and item headers/status
messages. Degrades gracefully on non-TTY and NO_COLOR.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Drop "downloading" prefix from single-item progress bars —
just show the filename (truncated to 50 chars if needed, dimmed),
matching the bulk engine's per-worker file display.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant