Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
118 changes: 114 additions & 4 deletions raganything/batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
"""

import asyncio
import hashlib
import json
import logging
from pathlib import Path
from typing import List, Dict, Any, Optional, TYPE_CHECKING
Expand All @@ -31,6 +33,47 @@ async def process_document_complete(self, file_path: str, **kwargs) -> None: ...
# ORIGINAL BATCH PROCESSING METHOD (RESTORED)
# ==========================================

# ------------------------------------------------------------------
# Incremental scan helpers
# ------------------------------------------------------------------

@staticmethod
def _file_md5(file_path: Path, block_size: int = 65536) -> str:
"""Return the hex MD5 digest of *file_path*."""
h = hashlib.md5()
with open(file_path, "rb") as fh:
for block in iter(lambda: fh.read(block_size), b""):
h.update(block)
return h.hexdigest()

def _manifest_path(self, folder_path: Path) -> Path:
"""Return the path to the incremental-scan manifest for *folder_path*.

The manifest is stored inside the RAG working directory so it is not
mixed in with the source documents.
"""
working_dir = Path(self.config.working_dir)
working_dir.mkdir(parents=True, exist_ok=True)
# Use a deterministic name derived from the folder path so that
# multiple source folders can each have their own manifest.
folder_hash = hashlib.md5(str(folder_path.resolve()).encode()).hexdigest()[:8]
return working_dir / f".folder_manifest_{folder_hash}.json"

def _load_manifest(self, manifest_path: Path) -> Dict[str, Any]:
if manifest_path.exists():
try:
with open(manifest_path, "r", encoding="utf-8") as fh:
return json.load(fh)
except (json.JSONDecodeError, OSError):
return {}
return {}

def _save_manifest(self, manifest_path: Path, manifest: Dict[str, Any]) -> None:
with open(manifest_path, "w", encoding="utf-8") as fh:
json.dump(manifest, fh, indent=2)

# ------------------------------------------------------------------

async def process_folder_complete(
self,
folder_path: str,
Expand All @@ -42,9 +85,10 @@ async def process_folder_complete(
file_extensions: Optional[List[str]] = None,
recursive: bool = None,
max_workers: int = None,
incremental: bool = False,
):
"""
Process all supported files in a folder
Process all supported files in a folder.

Args:
folder_path: Path to the folder containing files to process
Expand All @@ -56,6 +100,11 @@ async def process_folder_complete(
file_extensions: List of file extensions to process (optional)
recursive: Whether to process folders recursively (optional)
max_workers: Maximum number of workers for concurrent processing (optional)
incremental: When *True*, skip files whose MD5 digest has not changed
since the last successful run. A manifest file is stored inside
``config.working_dir`` to track previously processed files.
New files and files whose content has changed are always
(re-)processed.
"""
if output_dir is None:
output_dir = self.config.parser_output_dir
Expand Down Expand Up @@ -90,6 +139,51 @@ async def process_folder_complete(
self.logger.warning(f"No supported files found in {folder_path}")
return

# ---- Incremental filtering ----------------------------------------
manifest: Dict[str, Any] = {}
manifest_path: Optional[Path] = None
skipped_files: List[str] = []

if incremental:
manifest_path = self._manifest_path(folder_path_obj)
manifest = self._load_manifest(manifest_path)
filtered: List[Path] = []
for fp in files_to_process:
key = str(fp.resolve())
try:
current_md5 = self._file_md5(fp)
current_mtime = fp.stat().st_mtime
except OSError:
# File disappeared between glob and now — skip it.
continue
entry = manifest.get(key)
if entry and entry.get("md5") == current_md5:
skipped_files.append(str(fp))
else:
# Store prospective MD5 so we can update after success.
manifest[key] = {
"md5": current_md5,
"mtime": current_mtime,
"processed": False,
}
filtered.append(fp)
files_to_process = filtered
if skipped_files:
self.logger.info(
f"Incremental scan: skipping {len(skipped_files)} unchanged "
f"file(s), processing {len(files_to_process)} new/changed file(s)"
)
# ---- End incremental filtering ------------------------------------

if not files_to_process:
if incremental:
self.logger.info(
"Incremental scan: all files are up to date, nothing to process"
)
else:
self.logger.warning(f"No supported files found in {folder_path}")
return

self.logger.info(
f"Found {len(files_to_process)} files to process in {folder_path}"
)
Expand All @@ -105,10 +199,9 @@ async def process_folder_complete(
async def process_single_file(file_path: Path):
async with semaphore:
is_in_subdir = (
lambda file_path, dir_path: len(
file_path.relative_to(dir_path).parents
lambda file_path, dir_path: (
len(file_path.relative_to(dir_path).parents) > 1
)
> 1
)(file_path, folder_path_obj)

try:
Expand Down Expand Up @@ -157,11 +250,28 @@ async def process_single_file(file_path: Path):
else:
failed_files.append((file_path, error))

# ---- Update incremental manifest ----------------------------------
if incremental and manifest_path is not None:
successful_set = set(successful_files)
for key in list(manifest.keys()):
# Only mark as processed if it was actually in this run.
if key in successful_set or Path(key).resolve() in {
Path(f).resolve() for f in successful_set
}:
manifest[key]["processed"] = True
elif not manifest[key].get("processed"):
# Failed file: remove from manifest so it is retried next run.
del manifest[key]
self._save_manifest(manifest_path, manifest)
# ---- End manifest update ------------------------------------------

# Display statistics if requested
if display_stats:
self.logger.info("Processing complete!")
self.logger.info(f" Successful: {len(successful_files)} files")
self.logger.info(f" Failed: {len(failed_files)} files")
if incremental and skipped_files:
self.logger.info(f" Skipped (unchanged): {len(skipped_files)} files")
if failed_files:
self.logger.warning("Failed files:")
for file_path, error in failed_files:
Expand Down
Loading