Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 88 additions & 16 deletions backend/api_endpoints/documents/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,12 @@
from flask import Request, jsonify
from flask.typing import ResponseReturnValue
from services.audio_service import transcribe_audio
from services.tabular_service import ingest_plaintext, ingest_tabular
from services.video_service import describe_video
from services.vision_service import describe_image

# ---------------------------------------------------------------------------
# MIME-type helpers
# MIME-type classification
# ---------------------------------------------------------------------------

_IMAGE_MIMES = {
Expand All @@ -31,16 +32,50 @@
"audio/mpeg", "audio/mp4", "audio/ogg", "audio/wav", "audio/webm",
"audio/x-m4a", "audio/aac", "audio/flac",
}
# Spreadsheet formats that lose structure when run through Tika
_TABULAR_MIMES = {
"text/csv",
"text/tab-separated-values",
"application/vnd.ms-excel", # .xls
"application/vnd.openxmlformats-officedocument.spreadsheetml.sheet", # .xlsx
"application/vnd.oasis.opendocument.spreadsheet", # .ods
}
# Plain-text formats — read directly, no Tika overhead
_PLAINTEXT_MIMES = {
"text/plain",
"text/markdown",
"text/x-markdown",
"text/x-rst",
"text/x-python",
"text/javascript",
"text/html",
"text/xml",
"application/json",
"application/xml",
}

# Extension → MIME fallback when the browser doesn't send a Content-Type
# Extension → MIME fallback when the browser omits or sends octet-stream
_EXT_TO_MIME: dict[str, str] = {
# images
"jpg": "image/jpeg", "jpeg": "image/jpeg", "png": "image/png",
"gif": "image/gif", "webp": "image/webp", "bmp": "image/bmp",
"tiff": "image/tiff", "tif": "image/tiff",
# video
"mp4": "video/mp4", "mov": "video/quicktime", "avi": "video/x-msvideo",
"mkv": "video/x-matroska", "webm": "video/webm",
# audio
"mp3": "audio/mpeg", "m4a": "audio/x-m4a", "ogg": "audio/ogg",
"wav": "audio/wav", "aac": "audio/aac", "flac": "audio/flac",
# tabular
"csv": "text/csv", "tsv": "text/tab-separated-values",
"xls": "application/vnd.ms-excel",
"xlsx": "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
"ods": "application/vnd.oasis.opendocument.spreadsheet",
# plaintext
"txt": "text/plain", "md": "text/markdown", "markdown": "text/markdown",
"rst": "text/x-rst", "py": "text/x-python", "js": "text/javascript",
"json": "application/json", "xml": "application/xml",
"html": "text/html", "htm": "text/html",
}


Expand All @@ -53,8 +88,23 @@ def _resolve_mime(file) -> str:
return _EXT_TO_MIME.get(ext, "application/octet-stream")


def _text_subcategory(mime: str, filename: str) -> str:
"""Within the broad 'text' category return a finer subcategory."""
if mime in _TABULAR_MIMES:
return "tabular"
if mime in _PLAINTEXT_MIMES:
return "plaintext"
# Extension-based fallback (MIME might be generic text/plain for .csv)
ext = (filename or "").rsplit(".", 1)[-1].lower()
if ext in ("csv", "tsv", "xls", "xlsx", "ods"):
return "tabular"
if ext in ("txt", "md", "markdown", "rst", "py", "js", "json", "xml", "html", "htm"):
return "plaintext"
return "document" # PDF, DOCX, DOC, RTF → Tika


def _media_category(mime: str) -> str:
"""Map a MIME type to one of: 'text', 'image', 'video', 'audio'."""
"""Map a MIME type to a top-level category."""
if mime in _IMAGE_MIMES:
return "image"
if mime in _VIDEO_MIMES:
Expand Down Expand Up @@ -88,18 +138,42 @@ def IngestDocumentsHandler(
category = _media_category(mime)

if category == "text":
# Existing text-extraction path via Apache Tika
result = parser_module.from_buffer(file)
text = (result.get("content") or "").strip()
doc_id, does_exist = add_document(
text, filename, chat_id=chat_id, media_type="text", mime_type=mime
)
if not does_exist:
chunk_document_fn.remote(text, max_chunk_size, doc_id)
subcategory = _text_subcategory(mime, filename)

if subcategory == "tabular":
# Native CSV / Excel parsing — preserves column structure
raw = file.read()
print(f"Ingesting tabular file: {filename} ({len(raw)} bytes)")
text = ingest_tabular(raw, filename=filename, mime_type=mime)
print(f"Tabular text ({len(text)} chars): {text[:120]}…")
doc_id, does_exist = add_document(
text, filename, chat_id=chat_id, media_type="text", mime_type=mime
)
if not does_exist:
chunk_document_fn.remote(text, max_chunk_size, doc_id)

elif subcategory == "plaintext":
# Direct UTF-8 decode — faster and cleaner than Tika for raw text
raw = file.read()
print(f"Ingesting plain-text file: {filename} ({len(raw)} bytes)")
text = ingest_plaintext(raw, filename=filename)
doc_id, does_exist = add_document(
text, filename, chat_id=chat_id, media_type="text", mime_type=mime
)
if not does_exist:
chunk_document_fn.remote(text, max_chunk_size, doc_id)

else:
# PDF, DOCX, DOC, RTF, PPT … → Apache Tika (original path)
result = parser_module.from_buffer(file)
text = (result.get("content") or "").strip()
doc_id, does_exist = add_document(
text, filename, chat_id=chat_id, media_type="text", mime_type=mime
)
if not does_exist:
chunk_document_fn.remote(text, max_chunk_size, doc_id)

elif category == "image":
# Vision-LLM path: generate a rich text description and index it
# so the image is fully searchable via RAG.
image_bytes = file.read()
print(f"Generating vision description for image: {filename} ({len(image_bytes)} bytes)")
description = describe_image(image_bytes, mime_type=mime)
Expand All @@ -111,7 +185,6 @@ def IngestDocumentsHandler(
chunk_document_fn.remote(description, max_chunk_size, doc_id)

elif category == "audio":
# Whisper transcription path: convert audio to text and index it.
audio_bytes = file.read()
print(f"Transcribing audio: {filename} ({len(audio_bytes)} bytes)")
transcript = transcribe_audio(audio_bytes, filename=filename)
Expand All @@ -122,8 +195,7 @@ def IngestDocumentsHandler(
if not does_exist and transcript:
chunk_document_fn.remote(transcript, max_chunk_size, doc_id)

else:
# Video: extract frames + audio track, generate a structured document
else: # video
video_bytes = file.read()
print(f"Analysing video: {filename} ({len(video_bytes)} bytes)")
analysis = describe_video(video_bytes, filename=filename, mime_type=mime)
Expand Down
206 changes: 206 additions & 0 deletions backend/services/tabular_service.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,206 @@
"""Tabular data ingestion service: CSV, TSV, Excel (XLSX/XLS/ODS).

Tika extracts spreadsheet data as a flat whitespace-separated dump that loses
all column/row structure — making the data nearly unsearchable. This service
reads spreadsheets natively and converts each sheet into a Markdown table so
that the column headers, values, and relationships are preserved in the index.

Supported formats
-----------------
- CSV (text/csv)
- TSV (text/tab-separated-values)
- XLSX (application/vnd.openxmlformats-officedocument.spreadsheetml.sheet)
- XLS (application/vnd.ms-excel)
- ODS (application/vnd.oasis.opendocument.spreadsheet)

Dependencies: ``openpyxl`` (already in most ML stacks) for XLSX/ODS,
``xlrd`` for legacy XLS. Both are lightweight; CSV/TSV uses stdlib only.
"""
import csv
import io
from typing import Optional

# Maximum rows rendered into Markdown per sheet. Beyond this we summarise.
_MAX_ROWS_FULL = 500
# Maximum columns rendered per sheet (very wide sheets become unreadable).
_MAX_COLS = 50


# ---------------------------------------------------------------------------
# Public API
# ---------------------------------------------------------------------------

def ingest_tabular(
file_bytes: bytes,
filename: str,
mime_type: str = "text/csv",
) -> str:
"""Parse tabular data and return a structured Markdown document.

The document is suitable for direct storage as ``document_text`` and
feeding into the chunking + embedding pipeline.

Returns a plain string; never raises so the document record is always
created even on parse failure.
"""
ext = filename.rsplit(".", 1)[-1].lower() if "." in filename else ""

try:
if ext in ("csv",) or mime_type == "text/csv":
return _ingest_delimited(file_bytes, filename, delimiter=",")
if ext in ("tsv",) or mime_type == "text/tab-separated-values":
return _ingest_delimited(file_bytes, filename, delimiter="\t")
if ext in ("xlsx", "ods") or "spreadsheetml" in mime_type or "opendocument.spreadsheet" in mime_type:
return _ingest_excel(file_bytes, filename, engine="openpyxl")
if ext == "xls" or mime_type == "application/vnd.ms-excel":
return _ingest_excel(file_bytes, filename, engine="xlrd")
# Fallback: try CSV
return _ingest_delimited(file_bytes, filename, delimiter=",")
except Exception as exc:
return f"[Tabular parsing failed for '{filename}': {exc}]"


# ---------------------------------------------------------------------------
# Delimited (CSV / TSV)
# ---------------------------------------------------------------------------

def _ingest_delimited(file_bytes: bytes, filename: str, delimiter: str) -> str:
text = _decode(file_bytes)
reader = csv.reader(io.StringIO(text), delimiter=delimiter)
rows = list(reader)
if not rows:
return f"['{filename}' appears to be empty.]"

return _format_sheet(filename, rows, sheet_name=None)


# ---------------------------------------------------------------------------
# Excel (XLSX / XLS / ODS)
# ---------------------------------------------------------------------------

def _ingest_excel(file_bytes: bytes, filename: str, engine: str) -> str:
try:
import openpyxl # noqa: F401 — presence check for xlsx/ods
except ImportError:
return (
"[openpyxl is not installed. "
"Run `pip install openpyxl` to enable Excel ingestion.]"
)

try:
import pandas as pd
except ImportError:
return (
"[pandas is not installed. "
"Run `pip install pandas openpyxl` to enable Excel ingestion.]"
)

buf = io.BytesIO(file_bytes)
sections: list[str] = [f"# Spreadsheet: {filename}\n"]

try:
xf = pd.ExcelFile(buf, engine=engine)
except Exception as exc:
return f"[Failed to open '{filename}': {exc}]"

for sheet_name in xf.sheet_names:
df = xf.parse(sheet_name, header=None, dtype=str)
df = df.fillna("")
rows = [list(r) for r in df.itertuples(index=False, name=None)]
sections.append(_format_sheet(filename, rows, sheet_name=str(sheet_name)))

return "\n\n".join(sections)


# ---------------------------------------------------------------------------
# Shared formatting helpers
# ---------------------------------------------------------------------------

def _format_sheet(
filename: str,
rows: list[list],
sheet_name: Optional[str],
) -> str:
if not rows:
label = f"Sheet '{sheet_name}'" if sheet_name else filename
return f"[{label} is empty.]"

# Truncate very wide tables
if len(rows[0]) > _MAX_COLS:
rows = [r[:_MAX_COLS] + ["…"] for r in rows]

header_row = rows[0]
data_rows = rows[1:]

total_rows = len(data_rows)
truncated = total_rows > _MAX_ROWS_FULL
display_rows = data_rows[:_MAX_ROWS_FULL]

lines: list[str] = []

if sheet_name:
lines.append(f"## Sheet: {sheet_name}")
lines.append(
f"*{total_rows} data row(s), {len(header_row)} column(s)"
+ (" — showing first 500 rows" if truncated else "") + "*\n"
)

# Markdown table
lines.append(_md_table(header_row, display_rows))

if truncated:
lines.append(
f"\n*…{total_rows - _MAX_ROWS_FULL} additional rows not shown. "
"The full data is embedded in the index.*"
)
# Also append all remaining rows as plain CSV so they are indexed
# (they won't display nicely but will be found by semantic search).
lines.append("\n### Full data (indexed)\n```")
for r in data_rows[_MAX_ROWS_FULL:]:
lines.append(",".join(str(c) for c in r))
lines.append("```")

return "\n".join(lines)


def _md_table(headers: list, rows: list[list]) -> str:
"""Render a list of rows as a Markdown table."""
col_count = max(len(headers), max((len(r) for r in rows), default=0))

def pad(row: list, n: int) -> list:
return list(row) + [""] * (n - len(row))

h = pad(headers, col_count)
header_line = "| " + " | ".join(str(c) for c in h) + " |"
sep_line = "| " + " | ".join("---" for _ in h) + " |"
data_lines = [
"| " + " | ".join(str(c) for c in pad(r, col_count)) + " |"
for r in rows
]
return "\n".join([header_line, sep_line] + data_lines)


# ---------------------------------------------------------------------------
# Plain-text ingestion (TXT, MD, RST, code files, etc.)
# ---------------------------------------------------------------------------

def ingest_plaintext(file_bytes: bytes, filename: str) -> str:
"""Decode a plain-text file and return its contents with a filename header.

Tries UTF-8 first, then falls back to latin-1 so that legacy files with
8-bit characters are never silently dropped.
"""
content = _decode(file_bytes)
ext = filename.rsplit(".", 1)[-1].lower() if "." in filename else "txt"
return f"# {filename}\n\n```{ext}\n{content}\n```"


# ---------------------------------------------------------------------------
# Internal helpers
# ---------------------------------------------------------------------------

def _decode(b: bytes) -> str:
try:
return b.decode("utf-8")
except UnicodeDecodeError:
return b.decode("latin-1")
Loading