diff --git a/.gitignore b/.gitignore index ac31eb41a..97af509ea 100644 --- a/.gitignore +++ b/.gitignore @@ -216,3 +216,5 @@ cython_debug/ outputs evaluation/data/temporal_locomo +test_add_pipeline.py +test_file_pipeline.py diff --git a/src/memos/chunkers/base.py b/src/memos/chunkers/base.py index c2a783baa..e858132e1 100644 --- a/src/memos/chunkers/base.py +++ b/src/memos/chunkers/base.py @@ -1,3 +1,5 @@ +import re + from abc import ABC, abstractmethod from memos.configs.chunker import BaseChunkerConfig @@ -22,3 +24,42 @@ def __init__(self, config: BaseChunkerConfig): @abstractmethod def chunk(self, text: str) -> list[Chunk]: """Chunk the given text into smaller chunks.""" + + def protect_urls(self, text: str) -> tuple[str, dict[str, str]]: + """ + Protect URLs in text from being split during chunking. + + Args: + text: Text to process + + Returns: + tuple: (Text with URLs replaced by placeholders, URL mapping dictionary) + """ + url_pattern = r'https?://[^\s<>"{}|\\^`\[\]]+' + url_map = {} + + def replace_url(match): + url = match.group(0) + placeholder = f"__URL_{len(url_map)}__" + url_map[placeholder] = url + return placeholder + + protected_text = re.sub(url_pattern, replace_url, text) + return protected_text, url_map + + def restore_urls(self, text: str, url_map: dict[str, str]) -> str: + """ + Restore protected URLs in text back to their original form. + + Args: + text: Text with URL placeholders + url_map: URL mapping dictionary from protect_urls + + Returns: + str: Text with URLs restored + """ + restored_text = text + for placeholder, url in url_map.items(): + restored_text = restored_text.replace(placeholder, url) + + return restored_text diff --git a/src/memos/chunkers/charactertext_chunker.py b/src/memos/chunkers/charactertext_chunker.py index 15c0958ba..25739d96f 100644 --- a/src/memos/chunkers/charactertext_chunker.py +++ b/src/memos/chunkers/charactertext_chunker.py @@ -36,6 +36,8 @@ def __init__( def chunk(self, text: str, **kwargs) -> list[str] | list[Chunk]: """Chunk the given text into smaller chunks based on sentences.""" - chunks = self.chunker.split_text(text) + protected_text, url_map = self.protect_urls(text) + chunks = self.chunker.split_text(protected_text) + chunks = [self.restore_urls(chunk, url_map) for chunk in chunks] logger.debug(f"Generated {len(chunks)} chunks from input text") return chunks diff --git a/src/memos/chunkers/markdown_chunker.py b/src/memos/chunkers/markdown_chunker.py index b7771ac35..a37370200 100644 --- a/src/memos/chunkers/markdown_chunker.py +++ b/src/memos/chunkers/markdown_chunker.py @@ -1,3 +1,5 @@ +import re + from memos.configs.chunker import MarkdownChunkerConfig from memos.dependency import require_python_package from memos.log import get_logger @@ -22,6 +24,7 @@ def __init__( chunk_size: int = 1000, chunk_overlap: int = 200, recursive: bool = False, + auto_fix_headers: bool = True, ): from langchain_text_splitters import ( MarkdownHeaderTextSplitter, @@ -29,6 +32,7 @@ def __init__( ) self.config = config + self.auto_fix_headers = auto_fix_headers self.chunker = MarkdownHeaderTextSplitter( headers_to_split_on=config.headers_to_split_on if config @@ -46,17 +50,110 @@ def __init__( def chunk(self, text: str, **kwargs) -> list[str] | list[Chunk]: """Chunk the given text into smaller chunks based on sentences.""" - md_header_splits = self.chunker.split_text(text) + # Protect URLs first + protected_text, url_map = self.protect_urls(text) + # Auto-detect and fix malformed header hierarchy if enabled + if self.auto_fix_headers and self._detect_malformed_headers(protected_text): + logger.info("[Chunker:] detected malformed header hierarchy, attempting to fix...") + protected_text = self._fix_header_hierarchy(protected_text) + logger.info("[Chunker:] Header hierarchy fix completed") + + md_header_splits = self.chunker.split_text(protected_text) chunks = [] if self.chunker_recursive: md_header_splits = self.chunker_recursive.split_documents(md_header_splits) for doc in md_header_splits: try: chunk = " ".join(list(doc.metadata.values())) + "\n" + doc.page_content + chunk = self.restore_urls(chunk, url_map) chunks.append(chunk) except Exception as e: logger.warning(f"warning chunking document: {e}") - chunks.append(doc.page_content) + restored_chunk = self.restore_urls(doc.page_content, url_map) + chunks.append(restored_chunk) logger.info(f"Generated chunks: {chunks[:5]}") logger.debug(f"Generated {len(chunks)} chunks from input text") return chunks + + def _detect_malformed_headers(self, text: str) -> bool: + """Detect if markdown has improper header hierarchy usage.""" + # Extract all valid markdown header lines + header_levels = [] + pattern = re.compile(r"^#{1,6}\s+.+") + for line in text.split("\n"): + stripped_line = line.strip() + if pattern.match(stripped_line): + hash_match = re.match(r"^(#+)", stripped_line) + if hash_match: + level = len(hash_match.group(1)) + header_levels.append(level) + + total_headers = len(header_levels) + if total_headers == 0: + logger.debug("No valid headers detected, skipping check") + return False + + # Calculate level-1 header ratio + level1_count = sum(1 for level in header_levels if level == 1) + + # Determine if malformed: >90% are level-1 when total > 5 + # OR all headers are level-1 when total ≤ 5 + if total_headers > 5: + level1_ratio = level1_count / total_headers + if level1_ratio > 0.9: + logger.warning( + f"Detected header hierarchy issue: {level1_count}/{total_headers} " + f"({level1_ratio:.1%}) of headers are level 1" + ) + return True + elif total_headers <= 5 and level1_count == total_headers: + logger.warning( + f"Detected header hierarchy issue: all {total_headers} headers are level 1" + ) + return True + return False + + def _fix_header_hierarchy(self, text: str) -> str: + """ + Fix markdown header hierarchy by adjusting levels. + + Strategy: + 1. Keep the first header unchanged as level-1 parent + 2. Increment all subsequent headers by 1 level (max level 6) + """ + header_pattern = re.compile(r"^(#{1,6})\s+(.+)$") + lines = text.split("\n") + fixed_lines = [] + first_valid_header = False + + for line in lines: + stripped_line = line.strip() + # Match valid header lines (invalid # lines kept as-is) + header_match = header_pattern.match(stripped_line) + if header_match: + current_hashes, title_content = header_match.groups() + current_level = len(current_hashes) + + if not first_valid_header: + # First valid header: keep original level unchanged + fixed_line = f"{current_hashes} {title_content}" + first_valid_header = True + logger.debug( + f"Keep first header at level {current_level}: {title_content[:50]}..." + ) + else: + # Subsequent headers: increment by 1, cap at level 6 + new_level = min(current_level + 1, 6) + new_hashes = "#" * new_level + fixed_line = f"{new_hashes} {title_content}" + logger.debug( + f"Adjust header level: {current_level} -> {new_level}: {title_content[:50]}..." + ) + fixed_lines.append(fixed_line) + else: + fixed_lines.append(line) + + # Join with newlines to preserve original formatting + fixed_text = "\n".join(fixed_lines) + logger.info(f"[Chunker:] Header hierarchy fix completed: {fixed_text[:50]}...") + return fixed_text diff --git a/src/memos/chunkers/sentence_chunker.py b/src/memos/chunkers/sentence_chunker.py index f39dfb8e2..e695d0d9a 100644 --- a/src/memos/chunkers/sentence_chunker.py +++ b/src/memos/chunkers/sentence_chunker.py @@ -43,11 +43,13 @@ def __init__(self, config: SentenceChunkerConfig): def chunk(self, text: str) -> list[str] | list[Chunk]: """Chunk the given text into smaller chunks based on sentences.""" - chonkie_chunks = self.chunker.chunk(text) + protected_text, url_map = self.protect_urls(text) + chonkie_chunks = self.chunker.chunk(protected_text) chunks = [] for c in chonkie_chunks: chunk = Chunk(text=c.text, token_count=c.token_count, sentences=c.sentences) + chunk = self.restore_urls(chunk.text, url_map) chunks.append(chunk) logger.debug(f"Generated {len(chunks)} chunks from input text") diff --git a/src/memos/chunkers/simple_chunker.py b/src/memos/chunkers/simple_chunker.py index cc0dc40d0..58e12e2f1 100644 --- a/src/memos/chunkers/simple_chunker.py +++ b/src/memos/chunkers/simple_chunker.py @@ -20,12 +20,15 @@ def _simple_split_text(self, text: str, chunk_size: int, chunk_overlap: int) -> Returns: List of text chunks """ - if not text or len(text) <= chunk_size: - return [text] if text.strip() else [] + protected_text, url_map = self.protect_urls(text) + + if not protected_text or len(protected_text) <= chunk_size: + chunks = [protected_text] if protected_text.strip() else [] + return [self.restore_urls(chunk, url_map) for chunk in chunks] chunks = [] start = 0 - text_len = len(text) + text_len = len(protected_text) while start < text_len: # Calculate end position @@ -35,16 +38,16 @@ def _simple_split_text(self, text: str, chunk_size: int, chunk_overlap: int) -> if end < text_len: # Try to break at newline, sentence end, or space for separator in ["\n\n", "\n", "。", "!", "?", ". ", "! ", "? ", " "]: - last_sep = text.rfind(separator, start, end) + last_sep = protected_text.rfind(separator, start, end) if last_sep != -1: end = last_sep + len(separator) break - chunk = text[start:end].strip() + chunk = protected_text[start:end].strip() if chunk: chunks.append(chunk) # Move start position with overlap start = max(start + 1, end - chunk_overlap) - return chunks + return [self.restore_urls(chunk, url_map) for chunk in chunks] diff --git a/src/memos/mem_reader/read_multi_modal/file_content_parser.py b/src/memos/mem_reader/read_multi_modal/file_content_parser.py index 1b4add398..00e02abda 100644 --- a/src/memos/mem_reader/read_multi_modal/file_content_parser.py +++ b/src/memos/mem_reader/read_multi_modal/file_content_parser.py @@ -51,8 +51,11 @@ class FileContentParser(BaseMessageParser): """Parser for file content parts.""" def _get_doc_llm_response( - self, chunk_text: str, custom_tags: list[str] | None = None - ) -> dict | list: + self, + chunk_text: str, + custom_tags: list[str] | None = None, + message_text_context: str | None = None, + ) -> dict: """ Call LLM to extract memory from document chunk. Uses doc prompts from DOC_PROMPT_DICT. @@ -60,6 +63,8 @@ def _get_doc_llm_response( Args: chunk_text: Text chunk to extract memory from custom_tags: Optional list of custom tags for LLM extraction + message_text_context: Optional text from the same message that + provides user intent / context for understanding this document Returns: Parsed JSON response from LLM (dict or list) or empty dict if failed @@ -79,6 +84,10 @@ def _get_doc_llm_response( ) prompt = prompt.replace("{custom_tags_prompt}", custom_tags_prompt) + # Inject sibling text context into prompt placeholder + context_text = message_text_context.strip() if message_text_context else "" + prompt = prompt.replace("{context}", context_text) + messages = [{"role": "user", "content": prompt}] try: response_text = self.llm.generate(messages) @@ -109,14 +118,25 @@ def _handle_url(self, url_str: str, filename: str) -> tuple[str, str | None, boo return response.text, None, True file_ext = os.path.splitext(filename)[1].lower() - if file_ext in [".md", ".markdown", ".txt"]: + if file_ext in [".md", ".markdown", ".txt"] or self._is_oss_md(url_str): return response.text, None, True with tempfile.NamedTemporaryFile(mode="wb", delete=False, suffix=file_ext) as temp_file: temp_file.write(response.content) return "", temp_file.name, False except Exception as e: logger.error(f"[FileContentParser] URL processing error: {e}") - return f"[File URL download failed: {url_str}]", None + return f"[File URL download failed: {url_str}]", None, False + + def _is_oss_md(self, url: str) -> bool: + """Check if URL is an OSS markdown file based on pattern.""" + loose_pattern = re.compile(r"^https?://[^/]*\.aliyuncs\.com/.*/([^/?#]+)") + match = loose_pattern.search(url) + if not match: + return False + + file_name = match.group(1) + lower_name = file_name.lower() + return lower_name.endswith((".md", ".markdown", ".txt")) def _is_base64(self, data: str) -> bool: """Quick heuristic to check base64-like string.""" @@ -139,7 +159,12 @@ def _handle_local(self, data: str) -> str: return "" def _process_single_image( - self, image_url: str, original_ref: str, info: dict[str, Any], **kwargs + self, + image_url: str, + original_ref: str, + info: dict[str, Any], + header_context: list[str] | None = None, + **kwargs, ) -> tuple[str, str]: """ Process a single image and return (original_ref, replacement_text). @@ -148,6 +173,7 @@ def _process_single_image( image_url: URL of the image to process original_ref: Original markdown image reference to replace info: Dictionary containing user_id and session_id + header_context: Optional list of header titles providing context for the image **kwargs: Additional parameters for ImageParser Returns: @@ -173,20 +199,31 @@ def _process_single_image( if hasattr(item, "memory") and item.memory: extracted_texts.append(str(item.memory)) + # Prepare header context string if available + header_context_str = "" + if header_context: + # Join headers with " > " to show hierarchy + header_hierarchy = " > ".join(header_context) + header_context_str = f"[Section: {header_hierarchy}]\n\n" + if extracted_texts: # Combine all extracted texts extracted_content = "\n".join(extracted_texts) + # build final replacement text + replacement_text = ( + f"{header_context_str}[Image Content from {image_url}]:\n{extracted_content}\n" + ) # Replace image with extracted content return ( original_ref, - f"\n[Image Content from {image_url}]:\n{extracted_content}\n", + replacement_text, ) else: # If no content extracted, keep original with a note logger.warning(f"[FileContentParser] No content extracted from image: {image_url}") return ( original_ref, - f"\n[Image: {image_url} - No content extracted]\n", + f"{header_context_str}[Image: {image_url} - No content extracted]\n", ) except Exception as e: @@ -194,7 +231,9 @@ def _process_single_image( # On error, keep original image reference return (original_ref, original_ref) - def _extract_and_process_images(self, text: str, info: dict[str, Any], **kwargs) -> str: + def _extract_and_process_images( + self, text: str, info: dict[str, Any], headers: dict[int, dict] | None = None, **kwargs + ) -> str: """ Extract all images from markdown text and process them using ImageParser in parallel. Replaces image references with extracted text content. @@ -202,6 +241,7 @@ def _extract_and_process_images(self, text: str, info: dict[str, Any], **kwargs) Args: text: Markdown text containing image references info: Dictionary containing user_id and session_id + headers: Optional dictionary mapping line numbers to header info **kwargs: Additional parameters for ImageParser Returns: @@ -225,7 +265,13 @@ def _extract_and_process_images(self, text: str, info: dict[str, Any], **kwargs) for match in image_matches: image_url = match.group(2) original_ref = match.group(0) - tasks.append((image_url, original_ref)) + image_position = match.start() + + header_context = None + if headers: + header_context = self._get_header_context(text, image_position, headers) + + tasks.append((image_url, original_ref, header_context)) # Process images in parallel replacements = {} @@ -234,9 +280,14 @@ def _extract_and_process_images(self, text: str, info: dict[str, Any], **kwargs) with ContextThreadPoolExecutor(max_workers=max_workers) as executor: futures = { executor.submit( - self._process_single_image, image_url, original_ref, info, **kwargs + self._process_single_image, + image_url, + original_ref, + info, + header_context, + **kwargs, ): (image_url, original_ref) - for image_url, original_ref in tasks + for image_url, original_ref, header_context in tasks } # Collect results with progress tracking @@ -603,6 +654,18 @@ def parse_fine( # Extract custom_tags from kwargs (for LLM extraction) custom_tags = kwargs.get("custom_tags") + # Extract sibling text context . + message_text_context = None + context_items = kwargs.get("context_items") + if context_items: + sibling_texts = [] + for ctx_item in context_items: + for src in getattr(ctx_item.metadata, "sources", None) or []: + if src.type == "chat" and src.content: + sibling_texts.append(src.content.strip()) + if sibling_texts: + message_text_context = "\n".join(sibling_texts) + # Use parser from utils parser = self.parser or get_parser() if not parser: @@ -663,9 +726,20 @@ def parse_fine( ) if not parsed_text: return [] + + # Extract markdown headers if applicable + headers = {} + if is_markdown: + headers = self._extract_markdown_headers(parsed_text) + logger.info( + f"[Chunker: FileContentParser] Extracted {len(headers)} headers from markdown" + ) + # Extract and process images from parsed_text if is_markdown and parsed_text and self.image_parser: - parsed_text = self._extract_and_process_images(parsed_text, info, **kwargs) + parsed_text = self._extract_and_process_images( + parsed_text, info, headers=headers if headers else None, **kwargs + ) # Extract info fields if not info: @@ -782,7 +856,9 @@ def _make_fallback( def _process_chunk(chunk_idx: int, chunk_text: str) -> list[TextualMemoryItem]: """Process chunk with LLM, fallback to raw on failure. Returns list of memory items.""" try: - response_json = self._get_doc_llm_response(chunk_text, custom_tags) + response_json = self._get_doc_llm_response( + chunk_text, custom_tags, message_text_context=message_text_context + ) if response_json: # Handle list format response response_list = response_json.get("memory list", []) @@ -932,3 +1008,94 @@ def get_chunk_idx(item: TextualMemoryItem) -> int: chunk_idx=None, ) ] + + def _extract_markdown_headers(self, text: str) -> dict[int, dict]: + """ + Extract markdown headers and their positions. + + Args: + text: Markdown text to parse + """ + if not text: + return {} + + headers = {} + # Pattern to match markdown headers: # Title, ## Title, etc. + header_pattern = r"^(#{1,6})\s+(.+)$" + + lines = text.split("\n") + char_position = 0 + + for line_num, line in enumerate(lines): + # Match header pattern (must be at start of line) + match = re.match(header_pattern, line.strip()) + if match: + level = len(match.group(1)) # Number of # symbols (1-6) + title = match.group(2).strip() # Extract title text + + # Store header info with its position + headers[line_num] = {"level": level, "title": title, "position": char_position} + + logger.debug(f"[FileContentParser] Found H{level} at line {line_num}: {title}") + + # Update character position for next line (+1 for newline character) + char_position += len(line) + 1 + + logger.info(f"[Chunker: FileContentParser] Extracted {len(headers)} headers from markdown") + return headers + + def _get_header_context( + self, text: str, image_position: int, headers: dict[int, dict] + ) -> list[str]: + """ + Get all header levels above an image position in hierarchical order. + + Finds the image's line number, then identifies all preceding headers + and constructs the hierarchical path to the image location. + + Args: + text: Full markdown text + image_position: Character position of the image in text + headers: Dict of headers from _extract_markdown_headers + """ + if not headers: + return [] + + # Find the line number corresponding to the image position + lines = text.split("\n") + char_count = 0 + image_line = 0 + + for i, line in enumerate(lines): + if char_count >= image_position: + image_line = i + break + char_count += len(line) + 1 # +1 for newline + + # Filter headers that appear before the image + preceding_headers = { + line_num: info for line_num, info in headers.items() if line_num < image_line + } + + if not preceding_headers: + return [] + + # Build hierarchical header stack + header_stack = [] + + for line_num in sorted(preceding_headers.keys()): + header = preceding_headers[line_num] + level = header["level"] + title = header["title"] + + # Pop headers of same or lower level + while header_stack and header_stack[-1]["level"] >= level: + removed = header_stack.pop() + logger.debug(f"[FileContentParser] Popped H{removed['level']}: {removed['title']}") + + # Push current header onto stack + header_stack.append({"level": level, "title": title}) + + # Return titles in order + result = [h["title"] for h in header_stack] + return result diff --git a/src/memos/mem_reader/read_multi_modal/utils.py b/src/memos/mem_reader/read_multi_modal/utils.py index a6d910e54..be82587bf 100644 --- a/src/memos/mem_reader/read_multi_modal/utils.py +++ b/src/memos/mem_reader/read_multi_modal/utils.py @@ -346,7 +346,8 @@ def detect_lang(text): r"\b(user|assistant|query|answer)\s*:", "", cleaned_text, flags=re.IGNORECASE ) cleaned_text = re.sub(r"\[[\d\-:\s]+\]", "", cleaned_text) - + # remove URLs to prevent the dilution of Chinese characters + cleaned_text = re.sub(r'https?://[^\s<>"{}|\\^`\[\]]+', "", cleaned_text) # extract chinese characters chinese_pattern = r"[\u4e00-\u9fff\u3400-\u4dbf\U00020000-\U0002a6df\U0002a700-\U0002b73f\U0002b740-\U0002b81f\U0002b820-\U0002ceaf\uf900-\ufaff]" chinese_chars = re.findall(chinese_pattern, cleaned_text) diff --git a/src/memos/memories/textual/tree_text_memory/organize/history_manager.py b/src/memos/memories/textual/tree_text_memory/organize/history_manager.py index 132582a0d..98094877c 100644 --- a/src/memos/memories/textual/tree_text_memory/organize/history_manager.py +++ b/src/memos/memories/textual/tree_text_memory/organize/history_manager.py @@ -128,7 +128,7 @@ def resolve_history_via_nli( ) new_item.metadata.history.append(archived) logger.info( - f"[MemoryHistoryManager] Archived related memory {r_item.id} as {update_type} for new item {new_item.id}" + f"[Chunker: MemoryHistoryManager] Archived related memory {r_item.id} as {update_type} for new item {new_item.id}" ) # 3. Concat duplicate/conflict memories to new_item.memory diff --git a/src/memos/templates/mem_reader_prompts.py b/src/memos/templates/mem_reader_prompts.py index f431bd041..63e4c1538 100644 --- a/src/memos/templates/mem_reader_prompts.py +++ b/src/memos/templates/mem_reader_prompts.py @@ -263,6 +263,10 @@ {custom_tags_prompt} +If given context, use it as a supplement to the document information extraction; if no context is given, directly process the document information. +Reference context: +{context} + Document chunk: {chunk_text} @@ -307,6 +311,10 @@ {custom_tags_prompt} +如果给定了上下文,就结合上下文信息作为文档信息提取的补充,如果没有给定上下文,请直接处理文档信息。 +参考的上下文: +{context} + 示例: 输入的文本片段: 在Kalamang语中,亲属名词在所有格构式中的行为并不一致。名词 esa“父亲”和 ema“母亲”只能在技术称谓(teknonym)中与第三人称所有格后缀共现,而在非技术称谓用法中,带有所有格后缀是不合语法的。相比之下,大多数其他亲属名词并不允许所有格构式,只有极少数例外。 diff --git a/tests/chunkers/test_sentence_chunker.py b/tests/chunkers/test_sentence_chunker.py index 28aaeabb9..7ff6b2ccd 100644 --- a/tests/chunkers/test_sentence_chunker.py +++ b/tests/chunkers/test_sentence_chunker.py @@ -47,6 +47,17 @@ def test_sentence_chunker(self): self.assertEqual(len(chunks), 2) # Validate the properties of the first chunk mock_chunker.chunk.assert_called_once_with(text) - self.assertEqual(chunks[0].text, "This is the first sentence.") - self.assertEqual(chunks[0].token_count, 6) - self.assertEqual(chunks[0].sentences, ["This is the first sentence."]) + + # Handle both return types: list[str] | list[Chunk] + if isinstance(chunks[0], str): + # If returns list[str], check the string value + self.assertEqual(chunks[0], "This is the first sentence.") + self.assertEqual(chunks[1], "This is the second sentence.") + else: + # If returns list[Chunk], check the Chunk properties + from memos.chunkers.base import Chunk + + self.assertIsInstance(chunks[0], Chunk) + self.assertEqual(chunks[0].text, "This is the first sentence.") + self.assertEqual(chunks[0].token_count, 6) + self.assertEqual(chunks[0].sentences, ["This is the first sentence."]) diff --git a/tests/utils.py b/tests/utils.py index 132cd7138..ec8a32799 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -14,7 +14,8 @@ def check_module_base_class(cls: Any) -> None: General function to test the correctness of an abstract base class. - It should inherit from ABC. - It should define at least one method. - - All methods should be marked as @abstractmethod. + - It should have at least one abstract method. + - Abstract methods (those in __abstractmethods__) should be marked as @abstractmethod. - It should not be instantiable. - All methods should have docstrings. @@ -31,14 +32,25 @@ def check_module_base_class(cls: Any) -> None: assert all_class_methods, f"{cls.__name__} should define at least one method" # Check 3: Verify abstract methods + # Get the set of abstract methods from the class + abstract_methods = getattr(cls, "__abstractmethods__", set()) + + # Ensure there is at least one abstract method + assert len(abstract_methods) > 0, f"{cls.__name__} should have at least one abstract method" + + # Verify that all methods in __abstractmethods__ are actually marked as abstract for method_name in all_class_methods: method = getattr(cls, method_name) # Skip private methods (starting with _) as they are typically helper methods if method_name.startswith("_") and method_name != "__init__": continue - assert getattr(method, "__isabstractmethod__", False), ( - f"The method '{method_name}' in {cls.__name__} should be marked as @abstractmethod" - ) + + # If the method is in __abstractmethods__, it must be marked as abstract + if method_name in abstract_methods: + assert getattr(method, "__isabstractmethod__", False), ( + f"The method '{method_name}' in {cls.__name__} is in __abstractmethods__ " + f"but should be marked as @abstractmethod" + ) # Check 4: Test that the class cannot be instantiated directly with pytest.raises(TypeError) as excinfo: