diff --git a/ppt_to_markdown_convertor/enhance_with_llm.py b/ppt_to_markdown_convertor/enhance_with_llm.py new file mode 100644 index 0000000..cd6040a --- /dev/null +++ b/ppt_to_markdown_convertor/enhance_with_llm.py @@ -0,0 +1,282 @@ +import argparse +import logging +import os +import re +import sys +import time +from typing import List, Tuple + +import google.generativeai as genai +from tqdm import tqdm + +def configure_logging(verbosity: int) -> None: + level = logging.INFO + if verbosity >= 2: + level = logging.DEBUG + elif verbosity <= 0: + level = logging.WARNING + + logging.basicConfig( + level=level, + format="%(asctime)s | %(levelname)s | %(message)s", + datefmt="%Y-%m-%d %H:%M:%S", + ) + +def parse_args(): + parser = argparse.ArgumentParser(description="Enhance text file to markdown using Gemini.") + parser.add_argument( + "--verbose", + "-v", + action="count", + default=1, + help="Increase log verbosity (-v for INFO, -vv for DEBUG).", + ) + return parser.parse_args() + +def read_file_content(file_path: str) -> str: + """Reads the entire content of a file.""" + try: + with open(file_path, "r", encoding="utf-8") as f: + return f.read() + except FileNotFoundError: + logging.error(f"File not found: {file_path}") + sys.exit(1) + except Exception as e: + logging.error(f"Error reading file {file_path}: {e}") + sys.exit(1) + +def split_content_into_sections(content: str) -> List[Tuple[int, str]]: + """Splits the raw content into slides based on slide markers using findall for robustness.""" + sections = [] + + # This pattern finds all occurrences of slide blocks + pattern = r'=== SLIDE (\d+) ===\n\n(.*?)\n\n=== END SLIDE ===' + + matches = re.findall(pattern, content, flags=re.DOTALL) + + for match in matches: + try: + slide_number = int(match[0]) + slide_content = match[1].strip() + + if slide_content: + sections.append((slide_number, slide_content)) + except (ValueError, IndexError): + logging.warning(f"Could not parse a slide block. Match found: {match}") + + return sections + +def enhance_text_with_gemini(rules: str, section_batch: List[Tuple[int, str]], model, retry_count: int = 0) -> List[str]: + """Sends a batch of raw slides to Gemini and returns the converted markdown for each slide.""" + batch_content = "\n\n".join([f"Slide {num}:\n{content}" for num, content in section_batch]) + + separator = "[SLIDE_BREAK]" + + prompt = f"""You are an expert in converting raw slide content into well-structured LaTeX/Pandoc markdown. Your primary task is to replace the raw content markers (e.g., [TITLE], [BULLET:level=0]) with the correct markdown based on the instructions below. Use the provided JSON rules as a stylistic guide for the final output. + +**CRITICAL INSTRUCTIONS:** +1. You MUST process ALL {len(section_batch)} slides provided. +2. You MUST place "{separator}" between each slide's converted content. +3. Convert all raw markers to markdown. DO NOT leave any raw markers like `[BULLET:...]` in the final output. +4. Follow the JSON rules for stylistic formatting. +5. Ensure all output is LaTeX compatible. +6. DO NOT use dollar signs ($) unless for math expressions specified in the rules. +7. Each slide should have a unique title/heading. +8. Add "* {{heading_name}}" after each major heading. + +**Raw Content Marker Conversion Guide (Examples):** +- `[TITLE]\nSome Title` → ` # ##############################################################################\n# Some Title\n# ##############################################################################\n\n* Some Title` +- `[SUBTITLE]\nSome Subtitle` → `## Some Subtitle` +- `[TEXT]\nSome text.` → `Some text.` +- `[BULLET:level=0] Item 1` → `- Item 1` +- `[BULLET:level=1] Sub-item A` → ` - Sub-item A` +- `[BOLD]text[/BOLD]` → `**text**` +- `[ITALIC]text[/ITALIC]` → `*text*` +- `[IMAGE] path/to/image.png` → `![](path/to/image.png)` +- `[TABLE 4x2]` with `[CELL:0,0|HEADER]...` lines → A full markdown table. +- DO NOT NEED SPEAKER NOTES + +**Stylistic Rules (from JSON):** +```json +{rules} +``` + +**Raw Slide Content ({len(section_batch)} slides total):** +```text +{batch_content} +``` + +IMPORTANT: Your response must contain exactly {len(section_batch)} converted slides separated by "{separator}". The output must be clean markdown with no raw markers remaining. Do not include any other text, explanations, or comments.""" + + max_retries = 3 + + for attempt in range(max_retries): + try: + response = model.generate_content(prompt) + time.sleep(1) # To avoid hitting rate limits + + if not response.text or not response.text.strip(): + logging.warning(f"Empty response from Gemini API on attempt {attempt + 1}") + continue + + # Split the response into individual slides using the unique separator + enhanced_slides = response.text.strip().split(separator) + + # Filter out any empty strings that might result from the split + enhanced_slides = [slide.strip() for slide in enhanced_slides if slide.strip()] + + logging.info(f"Batch processing attempt {attempt + 1}: Expected {len(section_batch)} slides, got {len(enhanced_slides)}") + + # Check if we got the expected number of slides + if len(enhanced_slides) == len(section_batch): + logging.info(f"Successfully processed batch of {len(section_batch)} slides") + return enhanced_slides + + # If we got close (within 1), try to fix it + elif abs(len(enhanced_slides) - len(section_batch)) <= 1: + logging.warning(f"Slide count mismatch but close enough. Expected {len(section_batch)}, got {len(enhanced_slides)}") + + # Pad with empty sections if we got fewer + while len(enhanced_slides) < len(section_batch): + enhanced_slides.append("# Content processing error - slide missing") + + # Trim if we got more + enhanced_slides = enhanced_slides[:len(section_batch)] + + return enhanced_slides + + else: + logging.warning(f"Significant slide count mismatch on attempt {attempt + 1}. Expected {len(section_batch)}, got {len(enhanced_slides)}") + if attempt == max_retries - 1: + logging.error("Max retries reached for batch processing") + break + continue + + except Exception as e: + logging.error(f"Error calling Gemini API on attempt {attempt + 1}: {e}") + if attempt == max_retries - 1: + logging.error("Max retries reached due to API errors") + break + time.sleep(2) # Wait longer before retry + continue + + # If all retries failed, try to split the batch in half (but maintain minimum batch size) + if len(section_batch) >= 10: # Only split if we have enough sections + logging.warning(f"Splitting batch of {len(section_batch)} slides into smaller batches") + mid_point = len(section_batch) // 2 + first_half = section_batch[:mid_point] + second_half = section_batch[mid_point:] + + first_results = enhance_text_with_gemini(rules, first_half, model, retry_count + 1) + second_results = enhance_text_with_gemini(rules, second_half, model, retry_count + 1) + + return first_results + second_results + + # Final fallback: return placeholder content to maintain structure + logging.error(f"Failed to process batch of {len(section_batch)} slides after all retries") + placeholder_results = [] + for num, content in section_batch: + # Return the original content with a warning header + placeholder_results.append(f"# PROCESSING ERROR - Slide {num}\n{content}") + + return placeholder_results + + +def main(): + args = parse_args() + configure_logging(args.verbose) + + logging.info("Starting enhancement process...") + + rules_file_path = "rules.txt" + + # Find the most recent raw file in output directory + output_dir = "output" + if not os.path.exists(output_dir): + logging.error("Output directory not found.") + logging.error("Please run slides_to_text.py first to generate the raw content.") + sys.exit(1) + + raw_files = [f for f in os.listdir(output_dir) if f.endswith("_raw.txt")] + + if not raw_files: + logging.error("No raw text files found in output directory.") + logging.error("Please run slides_to_text.py first to generate the raw content.") + sys.exit(1) + + # Use the most recent file + raw_files.sort(key=lambda x: os.path.getmtime(os.path.join(output_dir, x)), reverse=True) + input_file_path = os.path.join(output_dir, raw_files[0]) + output_file_path = os.path.join(output_dir, "final_enhanced_markdown.txt") + + logging.info(f"Processing raw file: {input_file_path}") + + rules = read_file_content(rules_file_path) + content = read_file_content(input_file_path) + + sections = split_content_into_sections(content) + logging.info(f"Found {len(sections)} slides to process.") + + # Configure the Gemini client + try: + # Try to get API key from environment variable first + api_key = os.getenv("GOOGLE_API_KEY") + if not api_key: + # Fallback to hardcoded key (not recommended for production) + api_key = "AIzaSyBSn66YhKlNL0oyhvaRaJmrs7GnCAj2zZI" + logging.warning("Using hardcoded API key. Consider setting GOOGLE_API_KEY environment variable.") + + genai.configure(api_key=api_key) + model = genai.GenerativeModel('gemini-2.0-flash') + logging.info("Gemini client configured successfully.") + except Exception as e: + logging.error(f"Failed to configure Gemini client: {e}") + logging.error("Please make sure the GOOGLE_API_KEY environment variable is set correctly.") + sys.exit(1) + + # Force minimum batch size of 5, preferred batch size of 8 + min_batch_size = 5 + preferred_batch_size = 8 + + enhanced_content = [] + + # Create batches ensuring minimum size + section_batches = [] + for i in range(0, len(sections), preferred_batch_size): + batch = sections[i:i + preferred_batch_size] + + # If this is the last batch and it's smaller than minimum, merge with previous + if len(batch) < min_batch_size and section_batches: + section_batches[-1].extend(batch) + else: + section_batches.append(batch) + + # Ensure we have proper batches + if not section_batches and sections: + section_batches = [sections] # Process all as one batch if very few sections + + logging.info(f"Created {len(section_batches)} batches with sizes: {[len(batch) for batch in section_batches]}") + + for i, batch in enumerate(tqdm(section_batches, desc="Converting slides to markdown")): + batch_numbers = [num for num, _ in batch] + logging.info(f"Processing batch {i+1}/{len(section_batches)} with {len(batch)} slides: {batch_numbers}") + + enhanced_batch = enhance_text_with_gemini(rules, batch, model) + enhanced_content.extend(enhanced_batch) + + # Add a small delay between batches to be respectful to the API + if i < len(section_batches) - 1: + time.sleep(2) + + try: + with open(output_file_path, "w", encoding="utf-8") as f: + # Join content with appropriate spacing + f.write("\n\n".join(enhanced_content)) + logging.info(f"Successfully wrote enhanced content to {output_file_path}") + logging.info(f"Converted {len(enhanced_content)} slides total") + except Exception as e: + logging.error(f"Error writing to output file {output_file_path}: {e}") + sys.exit(1) + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/ppt_to_markdown_convertor/links.txt b/ppt_to_markdown_convertor/links.txt new file mode 100644 index 0000000..e69de29 diff --git a/ppt_to_markdown_convertor/rules.txt b/ppt_to_markdown_convertor/rules.txt new file mode 100644 index 0000000..ee5a7f9 --- /dev/null +++ b/ppt_to_markdown_convertor/rules.txt @@ -0,0 +1,143 @@ +{ + "slide": { + "title": { + "rule": "Largest text box on the slide is the title", + "output": "# {text}\n\n* {text}", + "fallback": "# Untitled Slide\n\n* Untitled Slide" + }, + "headings": { + "rule": "Font size determines heading level - but never use level 1 if title exists", + "mapping": { + "largest": "## {text}", + "second_largest": "### {text}", + "third_largest": "#### {text}", + "fourth_largest": "##### {text}" + } + }, + "section_breaks": { + "rule": "Major section dividers with visual separation", + "output": "# ##############################################################################\n# {section_name}\n# ##############################################################################\n\n* {section_name}" + }, + "subsection_breaks": { + "rule": "Minor section dividers within major sections", + "output": "## #############################################################################\n## {subsection_name}\n## #############################################################################\n\n* {subsection_name}" + }, + "lists": { + "unordered": { + "rule": "Bulleted lists become markdown list items", + "output": "- {item}", + "nesting": "indent 2 spaces per sub-level" + }, + "ordered": { + "rule": "Numbered lists become markdown numbered lists", + "output": "{n}. {item}", + "nesting": "indent 2 spaces per sub-level" + }, + "bullet_slides": { + "rule": "Slides with only bullet points use * instead of -", + "output": "* {item}", + "note": "Used for content-heavy slides" + } + }, + "text_formatting": { + "bold": "**{text}**", + "italic": "*{text}*", + "underline": "{text}", + "highlight": "=={text}==", + "math_inline": "${text}$", + "math_block": "$$\n{text}\n$$", + "colored_text": "\\{color}{{text}}", + "size_commands": { + "large": "\\begingroup \\large\n{text}\n\\endgroup", + "Large": "\\begingroup \\Large\n{text}\n\\endgroup" + } + }, + "layout": { + "columns": { + "rule": "Multi-column layouts for side-by-side content", + "output": "::: columns\n:::: {{.column width={width}%}}\n{content}\n::::\n:::", + "note": "Use when content needs to be arranged horizontally" + }, + "spacing": { + "vertical": "\\vspace{{{size}cm}}", + "note": "Use for precise vertical spacing control" + } + }, + "images": { + "rule": "Every image is extracted as a file and referenced", + "output": "![{alt_text}]({image_path})", + "with_sizing": "![{alt_text}]({image_path}){{width={percentage}%}}", + "caption": "{caption_text}" + }, + "code_blocks": { + "mermaid": "```mermaid\n{diagram_code}\n```", + "tikz": "```tikz\n{tikz_code}\n```", + "raw_latex": "```raw_latex\n{latex_code}\n```", + "dockerfile": "```dockerfile\n{code}\n```", + "yaml": "```yaml\n{code}\n```", + "json": "```json\n{code}\n```", + "bash": "```bash\n{code}\n```", + "generic": "```\n{code}\n```" + }, + "advanced_elements": { + "latex_environments": { + "rule": "Complex LaTeX structures for advanced formatting", + "align": "\\begin{{align*}}\n{equations}\n\\end{{align*}}", + "itemize_custom": "Custom spacing and formatting within lists" + }, + "instructor_info": { + "rule": "Instructor details with specific formatting", + "output": "**Instructor**: {name} - `{email}`\n\n**References**:\n\n{references}" + }, + "course_header": { + "rule": "Course identification at slide top", + "output": "{course_code}: {course_name}" + } + }, + "tables": { + "rule": "Convert PPTX tables into markdown tables", + "output": "| Header1 | Header2 |\n|---------|---------|\n| Value1 | Value2 |" + }, + "shapes_smartart": { + "default": "Convert shapes into bullet lists", + "flowchart": { + "rule": "If flow/direction is clear, represent as Mermaid diagram", + "output": "```mermaid\n{diagram}\n```" + }, + "mindmap": { + "rule": "Mind maps become Mermaid mindmap diagrams", + "output": "```mermaid\nmindmap\n{mindmap_structure}\n```" + } + }, + "structure": { + "slide_separation": { + "rule": "Each slide should be separated by slide breaks - but not always needed if using section breaks", + "output": "\n\n[SLIDE_BREAK]\n\n" + }, + "header_hierarchy": { + "rule": "Only ONE level-1 header (#) per slide - this should be the slide title", + "constraint": "All other headings must be level-2 (##) or lower", + "exception": "Section breaks can use level-1 headers for major divisions" + }, + "content_organization": { + "rule": "Group related content logically", + "patterns": { + "bullet_slide": "* {topic}\n{bullet_content}", + "mixed_content": "Combine text, images, and formatting as needed", + "technical_content": "Use appropriate code blocks and LaTeX for complex material" + } + } + }, + "consistency": { + "preserve_order": "Slides must remain in the same order", + "no_merge": "Do not merge or collapse slides", + "formatting_consistency": "Maintain consistent formatting patterns throughout the document", + "spacing_rules": "Use consistent spacing between elements" + "mathematical_symbols": { + "rule": "Always use proper LaTeX math mode for mathematical expressions (dont use approx, sim. If to be used replace it with empty string " "), + "exponents": "Use $2^{10}$ format with braces around exponents", + "currency": "Write as plain text: '50 USD' not '$50'" +} + } +} +} \ No newline at end of file diff --git a/ppt_to_markdown_convertor/slides_to_text.py b/ppt_to_markdown_convertor/slides_to_text.py new file mode 100644 index 0000000..b60454f --- /dev/null +++ b/ppt_to_markdown_convertor/slides_to_text.py @@ -0,0 +1,826 @@ +import argparse +import logging +import os +import re +import sys +import urllib.request +import urllib.parse +from typing import List, Optional, Tuple +import io +import zipfile +import json +import time + +# Google API imports +try: + from google.oauth2.credentials import Credentials + from google_auth_oauthlib.flow import InstalledAppFlow + from google.auth.transport.requests import Request + from googleapiclient.discovery import build + from google_auth_httplib2 import AuthorizedHttp +except Exception as exc: # ImportError or other issues + # Defer raising until main entrypoint to allow --help to work without deps + GOOGLE_IMPORT_ERROR: Optional[Exception] = exc +else: + GOOGLE_IMPORT_ERROR = None + + +SCOPES = [ + "https://www.googleapis.com/auth/presentations.readonly", + "https://www.googleapis.com/auth/drive.readonly", +] +CREDENTIALS_FILE = "credentials.json" # Expected to be in the working directory +TOKEN_FILE = "token.json" + + +def configure_logging(verbosity: int) -> None: + level = logging.INFO + if verbosity >= 2: + level = logging.DEBUG + elif verbosity <= 0: + level = logging.WARNING + + logging.basicConfig( + level=level, + format="%(asctime)s | %(levelname)s | %(message)s", + datefmt="%Y-%m-%d %H:%M:%S", + ) + + +def sanitize_filename(name: str, max_length: int = 120) -> str: + sanitized = re.sub(r"[\\/:*?\"<>|]", "_", name) + sanitized = re.sub(r"\s+", " ", sanitized).strip() + if len(sanitized) > max_length: + sanitized = sanitized[: max_length - 3].rstrip() + "..." + return sanitized or "presentation" + + +def extract_presentation_id(url_or_id: str) -> Optional[str]: + # Accept raw ID or any of common Google Slides URL variants + url = url_or_id.strip() + if not url: + return None + + # If it's already an ID-like token + if re.fullmatch(r"[a-zA-Z0-9_-]+", url): + return url + + # Common patterns for Google Slides URLs + patterns = [ + r"https?://docs\.google\.com/presentation/d/([a-zA-Z0-9_-]+)", + r"https?://drive\.google\.com/file/d/([a-zA-Z0-9_-]+)", + r"https?://drive\.google\.com/open\?id=([a-zA-Z0-9_-]+)", + ] + for pattern in patterns: + m = re.search(pattern, url) + if m: + return m.group(1) + + return None + + +def authenticate_and_get_creds() -> "Credentials": + """Handles user authentication and returns credentials.""" + if GOOGLE_IMPORT_ERROR is not None: + raise RuntimeError( + "Google API libraries are not available: " f"{GOOGLE_IMPORT_ERROR}" + ) + + creds: Optional[Credentials] = None + + if os.path.exists(TOKEN_FILE): + try: + creds = Credentials.from_authorized_user_file(TOKEN_FILE, SCOPES) + except Exception as exc: + logging.warning("Failed to load existing token: %s", exc) + creds = None + + if not creds or not creds.valid: + if creds and creds.expired and creds.refresh_token: + try: + creds.refresh(Request()) + except Exception as exc: + logging.warning("Token refresh failed, falling back to new flow: %s", exc) + # If refresh fails, delete the token and re-authenticate + if os.path.exists(TOKEN_FILE): + os.remove(TOKEN_FILE) + creds = None + + if not creds: + if not os.path.exists(CREDENTIALS_FILE): + raise FileNotFoundError( + f"Missing {CREDENTIALS_FILE}. Download OAuth client credentials (Desktop app) " + "from Google Cloud Console and place the JSON file alongside this script." + ) + flow = InstalledAppFlow.from_client_secrets_file(CREDENTIALS_FILE, SCOPES) + creds = flow.run_local_server(port=0) + + # Save the credentials for the next run + try: + with open(TOKEN_FILE, "w", encoding="utf-8") as token: + token.write(creds.to_json()) + except Exception as exc: + logging.warning("Failed to write token file: %s", exc) + + return creds + + +def sanitize_lecture_number_for_filename(lecture_number: float) -> str: + """Convert lecture number to a filename-safe string.""" + return str(lecture_number).replace('.', '_') + + +def download_images_in_order(creds: "Credentials", slide_data: List[dict], images_dir: str, lecture_number: float) -> dict: + """ + Downloads images directly from the API in the order they appear in the slides. + Returns a dictionary mapping image object IDs to their local file paths. + """ + image_map = {} + http = AuthorizedHttp(creds) + + for slide_index, slide in enumerate(slide_data): + slide_image_counter = 1 + for element in slide.get("elements", []): + if element.get("type") == "image": + object_id = element.get("object_id") + image_props = element.get("image_properties", {}) + content_url = image_props.get("contentUrl") + + if content_url: + try: + response, content = http.request(content_url) + if response.status == 200: + # Guess extension from content-type or default to png + content_type = response.get("content-type", "image/png") + extension = f".{content_type.split('/')[-1]}" + + lecture_safe = sanitize_lecture_number_for_filename(lecture_number) + image_filename = f"lec_{lecture_safe}_slide_{slide_index + 1}_image_{slide_image_counter}{extension}" + image_path = os.path.join(images_dir, image_filename) + + with open(image_path, "wb") as img_file: + img_file.write(content) + + relative_path = os.path.join( + "images", f"lecture_{lecture_safe}", image_filename + ) + image_map[object_id] = relative_path + logging.info(f"Downloaded image: {relative_path}") + + slide_image_counter += 1 + time.sleep(0.5) # Avoid rate limiting + else: + logging.warning(f"Failed to download image from {content_url}, status: {response.status}") + image_map[object_id] = "image_download_failed" + except Exception as e: + logging.error(f"Error downloading image from {content_url}: {e}") + image_map[object_id] = "image_download_failed" + return image_map + + +def extract_text_with_formatting(text_elements: List[dict]) -> List[dict]: + """Extract text with formatting information, preserving paragraph structure.""" + if not text_elements: + return [] + + paragraphs = [] + # Start with a default paragraph, in case the text doesn't begin with a paragraphMarker. + current_paragraph = {"type": "paragraph", "bullet": None, "content_parts": []} + + for element in text_elements: + if "paragraphMarker" in element: + # If the current paragraph has content, store it before starting a new one. + if any(part.get("content", "").strip() for part in current_paragraph["content_parts"]): + paragraphs.append(current_paragraph) + + bullet_info = element.get("paragraphMarker", {}).get("bullet") + current_paragraph = { + "type": "paragraph", + "bullet": None, + "content_parts": [] + } + if bullet_info: + current_paragraph["bullet"] = { + "nestingLevel": bullet_info.get("nestingLevel", 0), + "glyph": bullet_info.get("glyph", "*") + } + + text_run = element.get("textRun") + auto_text = element.get("autoText") + + if text_run: + content = text_run.get("content", "") + style = text_run.get("style", {}) + current_paragraph["content_parts"].append({ + "type": "text", + "content": content, + "bold": style.get("bold", False), + "italic": style.get("italic", False), + "underline": style.get("underline", False), + "font_size": style.get("fontSize", {}).get("magnitude", None), + "font_family": style.get("fontFamily", None), + "foreground_color": style.get("foregroundColor", {}).get("opaqueColor", {}).get("rgbColor", {}), + "link": style.get("link", {}).get("url", None) + }) + elif auto_text: + content = auto_text.get("content", "") + current_paragraph["content_parts"].append({ + "type": "auto_text", + "content": content, + "auto_text_type": auto_text.get("type", "UNSPECIFIED") + }) + + # Add the last paragraph if it has content + if any(part.get("content", "").strip() for part in current_paragraph["content_parts"]): + paragraphs.append(current_paragraph) + + return paragraphs + + +def extract_plain_text_from_paragraphs(paragraphs: List[dict]) -> str: + """Extracts plain text from a list of paragraph structures.""" + full_text_parts = [] + for para in paragraphs: + for part in para.get("content_parts", []): + full_text_parts.append(part.get("content", "")) + + full = "".join(full_text_parts) + # Normalize whitespace and remove blank-only lines + lines = [ln.strip("\u00A0 \t") for ln in full.splitlines()] + lines = [ln for ln in lines if ln] + return "\n".join(lines) + + +def extract_table_structure(table: dict) -> dict: + """Extract table with structure and formatting information.""" + n_rows = table.get("rows", 0) + n_cols = table.get("columns", 0) + + table_data = { + "type": "table", + "rows": n_rows, + "columns": n_cols, + "cells": [] + } + + for r in range(n_rows): + row_cells = table.get("tableRows", [])[r : r + 1] + if not row_cells: + continue + row = row_cells[0] + + for c in range(n_cols): + cells = row.get("tableCells", [])[c : c + 1] + if not cells: + table_data["cells"].append({ + "row": r, + "col": c, + "content": "", + "formatted_content": [] + }) + continue + + cell = cells[0] + paras = cell.get("text", {}).get("textElements") + formatted_content = extract_text_with_formatting(paras or []) + cell_text = extract_plain_text_from_paragraphs(formatted_content) + + # Check if this is a header cell (usually first row or has special styling) + is_header = (r == 0) or cell.get("tableCellProperties", {}).get("tableCellBackgroundFill", {}) + + table_data["cells"].append({ + "row": r, + "col": c, + "content": cell_text, + "formatted_content": formatted_content, + "is_header": is_header, + "cell_properties": cell.get("tableCellProperties", {}) + }) + + return table_data + + +def extract_text_from_table(table: dict) -> str: + """Legacy function for plain text table extraction.""" + # Combine cell texts row-by-row, separating cells with a tab + n_rows = table.get("rows", 0) + n_cols = table.get("columns", 0) + rows_text: List[str] = [] + for r in range(n_rows): + cells_text: List[str] = [] + row_cells = table.get("tableRows", [])[r : r + 1] + if not row_cells: + continue + row = row_cells[0] + for c in range(n_cols): + cells = row.get("tableCells", [])[c : c + 1] + if not cells: + cells_text.append("") + continue + cell = cells[0] + paras = cell.get("text", {}).get("textElements") + cell_text = extract_plain_text_from_paragraphs(extract_text_with_formatting(paras or [])) + cells_text.append(cell_text) + rows_text.append("\t".join(cells_text).strip()) + return "\n".join([rt for rt in rows_text if rt]) + + +def extract_slide_elements(slide: dict) -> List[dict]: + """Extract all slide elements with their structure and formatting.""" + elements = [] + + for element in slide.get("pageElements", []): + element_data = { + "object_id": element.get("objectId"), + "transform": element.get("transform", {}), + "size": element.get("size", {}) + } + + # Handle shapes (text boxes, titles, etc.) + if "shape" in element: + shape = element["shape"] + placeholder = shape.get("placeholder") + + if placeholder: + element_data.update({ + "type": "placeholder", + "placeholder_type": placeholder.get("type", "UNSPECIFIED"), + "index": placeholder.get("index", 0) + }) + + # Extract text content with formatting + text_elements = shape.get("text", {}).get("textElements", []) + if text_elements: + element_data.update({ + "type": "text_shape", + "content": extract_plain_text_from_paragraphs(extract_text_with_formatting(text_elements)), + "formatted_content": extract_text_with_formatting(text_elements) + }) + + # Check for images + if shape.get("shapeProperties", {}).get("shapeFill", {}).get("pictureFill"): + element_data.update({ + "type": "image", + "image_properties": shape.get("shapeProperties", {}).get("shapeFill", {}).get("pictureFill", {}) + }) + + # Handle tables + elif "table" in element: + table_data = extract_table_structure(element["table"]) + element_data.update(table_data) + + # Handle images + elif "image" in element: + element_data.update({ + "type": "image", + "image_properties": element.get("image", {}) + }) + + # Handle videos + elif "video" in element: + element_data.update({ + "type": "video", + "video_properties": element.get("video", {}) + }) + + # Handle charts + elif "sheetsChart" in element: + element_data.update({ + "type": "chart", + "chart_properties": element.get("sheetsChart", {}) + }) + + else: + element_data["type"] = "unknown" + + elements.append(element_data) + + return elements + + +def extract_slide_text(slide: dict) -> str: + """Legacy function for plain text extraction.""" + texts: List[str] = [] + + # Prefer title placeholder first if present + title_candidates: List[str] = [] + for element in slide.get("pageElements", []): + shape = element.get("shape") + if not shape: + continue + placeholder = shape.get("placeholder") + if placeholder and placeholder.get("type") == "TITLE": + text_elements = shape.get("text", {}).get("textElements", []) + title_text = extract_plain_text_from_paragraphs(extract_text_with_formatting(text_elements)) + if title_text: + title_candidates.append(title_text) + if title_candidates: + texts.append("\n".join(title_candidates)) + + # Then other shapes, tables, and images in order of appearance + for element in slide.get("pageElements", []): + if "shape" in element: + text_elements = ( + element["shape"].get("text", {}).get("textElements", []) + ) + body_text = extract_plain_text_from_paragraphs(extract_text_with_formatting(text_elements)) + if body_text: + texts.append(body_text) + elif "table" in element: + table_text = extract_text_from_table(element["table"]) + if table_text: + texts.append(table_text) + elif "image" in element: + texts.append("[image]") + + # Remove duplicates while preserving order + seen = set() + deduped: List[str] = [] + for t in texts: + key = t + if key and key not in seen: + seen.add(key) + deduped.append(t) + + combined = "\n".join([t for t in deduped if t]).strip() + return combined + + +def extract_notes_text(slide: dict) -> str: + notes_page = slide.get("notesPage") or slide.get("slideProperties", {}).get("notesPage", {}) + note_texts: List[str] = [] + for element in notes_page.get("pageElements", []): + shape = element.get("shape") + if not shape: + continue + text_elements = shape.get("text", {}).get("textElements", []) + paragraphs = extract_text_with_formatting(text_elements) + content = extract_plain_text_from_paragraphs(paragraphs) + if content: + note_texts.append(content) + return "\n".join(note_texts).strip() + + +def fetch_presentation(service, presentation_id: str) -> dict: + return ( + service.presentations().get(presentationId=presentation_id).execute() + ) + + +def extract_presentation_structure(presentation: dict) -> List[dict]: + """Extract full presentation structure with formatting and positioning.""" + slides = presentation.get("slides", []) + slide_data = [] + + for slide in slides: + slide_info = { + "slide_id": slide.get("objectId"), + "slide_index": slide.get("slideProperties", {}).get("index", 0), + "layout": slide.get("slideProperties", {}).get("layout", "UNSPECIFIED"), + "elements": extract_slide_elements(slide) + } + + notes = extract_notes_text(slide) + if notes: + slide_info["speaker_notes"] = notes + + slide_data.append(slide_info) + + return slide_data + + +def write_slides_to_txt( + out_dir: str, + base_name: str, + slide_texts: List[str], +) -> str: + os.makedirs(out_dir, exist_ok=True) + filename = sanitize_filename(base_name) + ".txt" + out_path = os.path.join(out_dir, filename) + + with open(out_path, "w", encoding="utf-8") as f: + for idx, text in enumerate(slide_texts, start=1): + f.write(f"# Slide {idx}\n") + if text: + f.write(text) + f.write("\n\n") + + return out_path + + +def load_conversion_rules(): + """Load conversion rules from rules.txt""" + try: + with open("rules.txt", "r", encoding="utf-8") as f: + return json.load(f) + except FileNotFoundError: + logging.warning("rules.txt not found, using default formatting") + return {} + +def detect_content_type(text_content): + """Detect the type of content to apply appropriate formatting""" + if not text_content.strip(): + return "empty" + + # Check for section markers + if "#" in text_content and len(text_content.split()) < 10: + return "section_header" + + # Check for instructor info patterns + if any(keyword in text_content.lower() for keyword in ["instructor", "email", "references"]): + return "instructor_info" + + # Check for course header patterns + if any(keyword in text_content for keyword in ["MSML", "Course", ":"]) and len(text_content.split()) < 8: + return "course_header" + + # Check if it's mostly bullet points + lines = text_content.strip().split('\n') + bullet_lines = sum(1 for line in lines if line.strip().startswith(('*', '-', '•'))) + if bullet_lines > len(lines) * 0.6: + return "bullet_slide" + + return "regular_content" + +def format_text_with_rules(text, formatting_info=None): + """Apply text formatting according to rules""" + if not text: + return "" + + # Handle LaTeX commands and special formatting + text = text.replace('≈', '$\\approx$') + text = text.replace('©', '\\copyright') + text = text.replace('→', '$\\rightarrow$') + text = text.replace('–', '--') + + # Apply bold/italic formatting if specified + if formatting_info: + if formatting_info.get('bold'): + text = f"**{text}**" + elif formatting_info.get('italic'): + text = f"*{text}*" + + return text + +def format_section_header(content, level="major"): + """Format section headers according to rules""" + content = content.strip().replace('#', '').strip() + + if level == "major": + return f"# ##############################################################################\n# {content}\n# ##############################################################################\n\n* {content}" + else: + return f"## #############################################################################\n## {content}\n## #############################################################################\n\n* {content}" + +def format_bullet_content(paragraphs, is_main_bullet_slide=False): + """Format bullet point content according to rules""" + output = [] + + for para in paragraphs: + para_content = "" + for part in para.get("content_parts", []): + content = part.get("content", "") + if part.get("bold"): + para_content += f"**{content}**" + elif part.get("italic"): + para_content += f"*{content}*" + else: + para_content += content + + para_content = para_content.rstrip('\n') + if not para_content.strip(): + continue + + if para.get("bullet"): + indent = " " * para["bullet"].get("nestingLevel", 0) + # Use - for normal bullets, and indent for nested + bullet_char = "-" + output.append(f"{indent}{bullet_char} {para_content}") + else: + output.append(para_content) + + return output + +def write_slides_to_enhanced_txt( + out_dir: str, + base_name: str, + slide_data: List[dict], + image_map: dict, + lecture_number: float = 1, +) -> str: + """Write raw slide content for later processing by enhance_with_llm.py.""" + os.makedirs(out_dir, exist_ok=True) + filename = sanitize_filename(base_name) + "_raw.txt" + out_path = os.path.join(out_dir, filename) + + with open(out_path, "w", encoding="utf-8") as f: + for idx, slide in enumerate(slide_data, start=1): + f.write(f"=== SLIDE {idx} ===\n\n") + + for element in slide.get("elements", []): + element_type = element.get("type", "unknown") + + if element_type == "text_shape": + is_title = element.get("placeholder_type") == "TITLE" + is_subtitle = element.get("placeholder_type") == "SUBTITLE" + + # Mark the type of text element + if is_title: + f.write("[TITLE]\n") + elif is_subtitle: + f.write("[SUBTITLE]\n") + else: + f.write("[TEXT]\n") + + # Extract raw text with minimal formatting info + paragraphs = element.get("formatted_content", []) + for para in paragraphs: + para_content = "" + for part in para.get("content_parts", []): + content = part.get("content", "") + # Keep basic formatting markers + if part.get("bold"): + para_content += f"[BOLD]{content}[/BOLD]" + elif part.get("italic"): + para_content += f"[ITALIC]{content}[/ITALIC]" + else: + para_content += content + + para_content = para_content.rstrip('\n') + if para_content.strip(): + # Mark if it's a bullet point + if para.get("bullet"): + indent_level = para["bullet"].get("nestingLevel", 0) + f.write(f"[BULLET:level={indent_level}] {para_content}\n") + else: + f.write(f"{para_content}\n") + f.write("\n") + + elif element_type == "image": + object_id = element.get("object_id") + image_path = image_map.get(object_id, "image_not_found") + f.write(f"[IMAGE] {image_path}\n\n") + + elif element_type == "table": + f.write(f"[TABLE {element.get('rows', 0)}x{element.get('columns', 0)}]\n") + for cell in element.get("cells", []): + row = cell['row'] + col = cell['col'] + content = cell.get('content', '') + is_header = cell.get("is_header", False) + f.write(f"[CELL:{row},{col}{'|HEADER' if is_header else ''}] {content}\n") + f.write("\n") + + elif element_type == "video": + video_props = element.get("video_properties", {}) + url = video_props.get('url', 'embedded video') + f.write(f"[VIDEO] {url}\n\n") + + elif element_type == "chart": + f.write("[CHART]\n\n") + + # Add speaker notes if present + if slide.get("speaker_notes"): + f.write(f"[SPEAKER_NOTES]\n{slide['speaker_notes']}\n\n") + + f.write("=== END SLIDE ===\n\n") + + return out_path + + +def process_one_link( + slides_service, + creds: "Credentials", + url_or_id: str, + out_dir: str, + lecture_number: float = 1, +) -> Optional[str]: + presentation_id = extract_presentation_id(url_or_id) + if not presentation_id: + logging.error("Could not parse presentation ID from: %s", url_or_id) + return None + + logging.info("Fetching presentation: %s", presentation_id) + try: + presentation = slides_service.presentations().get(presentationId=presentation_id).execute() + except Exception as exc: + logging.error("Failed to fetch presentation %s: %s", presentation_id, exc) + return None + + title = presentation.get("title", presentation_id) + + slide_data = extract_presentation_structure(presentation) + + # Create images directory and download images in order + lecture_safe = sanitize_lecture_number_for_filename(lecture_number) + images_dir = os.path.join(out_dir, "images", f"lecture_{lecture_safe}") + os.makedirs(images_dir, exist_ok=True) + image_map = download_images_in_order( + creds, slide_data, images_dir, lecture_number + ) + + try: + out_path = write_slides_to_enhanced_txt( + out_dir, title, slide_data, image_map, lecture_number + ) + except Exception as exc: + logging.error("Failed to write raw output for %s: %s", title, exc) + return None + + logging.info("Wrote: %s", out_path) + return out_path + + +def read_urls_from_file(path: str) -> List[str]: + with open(path, "r", encoding="utf-8") as f: + lines = [ln.strip() for ln in f.readlines()] + return [ln for ln in lines if ln] + + +def parse_args(argv: Optional[List[str]] = None) -> argparse.Namespace: + parser = argparse.ArgumentParser( + description=( + "Fetch text content from Google Slides presentations and save to `output/`." + " By default, it reads URLs from `links.txt`." + ) + ) + parser.add_argument( + "--verbose", + "-v", + action="count", + default=1, + help="Increase log verbosity (-v for INFO, -vv for DEBUG).", + ) + parser.add_argument( + "--lecture-start", + type=float, # Changed from int to float + default=2, + help="Starting lecture number (default: 2).", + ) + parser.add_argument( + "--urls", + nargs="*", + help="Google Slides links or IDs provided inline (overrides `links.txt`).", + ) + return parser.parse_args(argv) + + +def main(argv: Optional[List[str]] = None) -> int: + args = parse_args(argv) + configure_logging(args.verbose or 1) + + if GOOGLE_IMPORT_ERROR is not None: + logging.error( + "Missing Google API libraries. Install with: " + "pip install google-auth google-auth-oauthlib google-api-python-client" + ) + return 2 + + try: + creds = authenticate_and_get_creds() + slides_service = build("slides", "v1", credentials=creds, cache_discovery=False) + # drive_service = build("drive", "v3", credentials=creds, cache_discovery=False) # No longer needed for image download + except Exception as exc: + logging.error("Authentication failed: %s", exc) + return 2 + + urls: List[str] = [] + # If URLs are passed as arguments, use them. Otherwise, look for links.txt. + if args.urls: + urls.extend(args.urls) + elif os.path.exists("links.txt"): + logging.info("No URLs provided via command line, reading from links.txt...") + try: + urls.extend(read_urls_from_file("links.txt")) + except Exception as exc: + logging.error("Failed to read links.txt: %s", exc) + return 2 + + if not urls: + logging.warning( + "No URLs to process. Provide them as arguments or in a `links.txt` file." + ) + return 0 + + out_dir = "output" + os.makedirs(out_dir, exist_ok=True) + + successes = 0 + # FIX: Handle float lecture_start properly + for i, url in enumerate(urls): + lecture_num = args.lecture_start + i # Calculate the actual lecture number + out_path = process_one_link( + slides_service=slides_service, + creds=creds, + url_or_id=url, + out_dir=out_dir, + lecture_number=lecture_num, + ) + if out_path: + successes += 1 + + logging.info("Completed. %d/%d presentations processed successfully.", successes, len(urls)) + return 0 if successes == len(urls) else 1 + + +if __name__ == "__main__": + sys.exit(main()) \ No newline at end of file