From 046783c313b4b72922ccc85102cbf7a05417584d Mon Sep 17 00:00:00 2001 From: CaralHsi Date: Tue, 21 Apr 2026 20:18:16 +0800 Subject: [PATCH] Revert "feat: add upload skill logic (#1507)" This reverts commit 87cb6e521522d4f5fbc0122c86c1985945ffb430. --- src/memos/api/product_models.py | 11 - src/memos/mem_reader/multi_modal_struct.py | 18 +- .../read_skill_memory/process_skill_memory.py | 55 ++-- .../read_skill_memory/upload_skill_memory.py | 284 ------------------ .../handlers/mem_read_handler.py | 4 - src/memos/multi_mem_cube/single_cube.py | 6 +- 6 files changed, 41 insertions(+), 337 deletions(-) delete mode 100644 src/memos/mem_reader/read_skill_memory/upload_skill_memory.py diff --git a/src/memos/api/product_models.py b/src/memos/api/product_models.py index 5cb1e24c7..78dcfc797 100644 --- a/src/memos/api/product_models.py +++ b/src/memos/api/product_models.py @@ -608,17 +608,6 @@ class APIADDRequest(BaseRequest): description=("Whether this request represents user feedback. Default: False."), ) - # ==== Upload skill flag ==== - is_upload_skill: bool = Field( - False, - description=( - "Whether this request is an upload skill request. " - "When True, the messages field should contain file items " - "with zip file download URLs for pre-built skill packages. " - "Default: False." - ), - ) - # ==== Backward compatibility fields (will delete later) ==== mem_cube_id: str | None = Field( None, diff --git a/src/memos/mem_reader/multi_modal_struct.py b/src/memos/mem_reader/multi_modal_struct.py index da3d54730..2edede76d 100644 --- a/src/memos/mem_reader/multi_modal_struct.py +++ b/src/memos/mem_reader/multi_modal_struct.py @@ -1019,7 +1019,6 @@ def _process_multi_modal_data( return fast_memory_items # Stage: llm_extract — fine mode 4-way parallel LLM + per-source serial - is_upload_skill = kwargs.pop("is_upload_skill", False) non_file_url_fast_items = [ item for item in fast_memory_items if not self._is_file_url_only_item(item) ] @@ -1036,9 +1035,7 @@ def _process_multi_modal_data( ) future_skill = executor.submit( process_skill_memory_fine, - fast_memory_items=fast_memory_items - if is_upload_skill - else non_file_url_fast_items, + fast_memory_items=non_file_url_fast_items, info=info, searcher=self.searcher, graph_db=self.graph_db, @@ -1046,7 +1043,6 @@ def _process_multi_modal_data( embedder=self.embedder, oss_config=self.oss_config, skills_dir_config=self.skills_dir_config, - is_upload_skill=is_upload_skill, **kwargs, ) future_pref = executor.submit( @@ -1069,10 +1065,6 @@ def _process_multi_modal_data( fine_memory_items.extend(fine_memory_items_pref_parser) # Part B: per-source serial processing - if is_upload_skill: - # (skip for upload skill to avoid zip being parsed) - return fine_memory_items - with timed_stage("add", "per_source") as ts_ps: for fast_item in fast_memory_items: sources = fast_item.metadata.sources @@ -1106,8 +1098,6 @@ def _process_transfer_multi_modal_data( logger.warning("[MultiModalStruct] No raw nodes found.") return [] - is_upload_skill = kwargs.pop("is_upload_skill", False) - # Extract info from raw_nodes (same as simple_struct.py) info = { "user_id": raw_nodes[0].metadata.user_id, @@ -1129,7 +1119,7 @@ def _process_transfer_multi_modal_data( ) future_skill = executor.submit( process_skill_memory_fine, - raw_nodes if is_upload_skill else non_file_url_nodes, + non_file_url_nodes, info, searcher=self.searcher, llm=self.general_llm, @@ -1137,7 +1127,6 @@ def _process_transfer_multi_modal_data( graph_db=self.graph_db, oss_config=self.oss_config, skills_dir_config=self.skills_dir_config, - is_upload_skill=is_upload_skill, **kwargs, ) # Add preference memory extraction @@ -1162,9 +1151,6 @@ def _process_transfer_multi_modal_data( fine_memory_items.extend(fine_memory_items_pref_parser) # Part B: get fine multimodal items - if is_upload_skill: - # (skip for upload skill to avoid zip being parsed) - return fine_memory_items for raw_node in raw_nodes: sources = raw_node.metadata.sources for source in sources: diff --git a/src/memos/mem_reader/read_skill_memory/process_skill_memory.py b/src/memos/mem_reader/read_skill_memory/process_skill_memory.py index ec27cbae5..0b0c04252 100644 --- a/src/memos/mem_reader/read_skill_memory/process_skill_memory.py +++ b/src/memos/mem_reader/read_skill_memory/process_skill_memory.py @@ -252,6 +252,36 @@ def create_task(skill_mem, gen_type, prompt, requirements, context, **kwargs): return [item[0] for item in raw_skills_data] +def add_id_to_mysql(memory_id: str, mem_cube_id: str): + """Add id to mysql, will deprecate this function in the future""" + # TODO: tmp function, deprecate soon + import requests + + skill_mysql_url = os.getenv("SKILLS_MYSQL_URL", "") + skill_mysql_bearer = os.getenv("SKILLS_MYSQL_BEARER", "") + + if not skill_mysql_url or not skill_mysql_bearer: + logger.warning("[PROCESS_SKILLS] SKILLS_MYSQL_URL or SKILLS_MYSQL_BEARER is not set") + return None + headers = {"Authorization": skill_mysql_bearer, "Content-Type": "application/json"} + data = {"memCubeId": mem_cube_id, "skillId": memory_id} + try: + response = requests.post(skill_mysql_url, headers=headers, json=data) + + logger.info(f"[PROCESS_SKILLS] response: \n\n{response.json()}") + logger.info(f"[PROCESS_SKILLS] memory_id: \n\n{memory_id}") + logger.info(f"[PROCESS_SKILLS] mem_cube_id: \n\n{mem_cube_id}") + logger.info(f"[PROCESS_SKILLS] skill_mysql_url: \n\n{skill_mysql_url}") + logger.info(f"[PROCESS_SKILLS] skill_mysql_bearer: \n\n{skill_mysql_bearer}") + logger.info(f"[PROCESS_SKILLS] headers: \n\n{headers}") + logger.info(f"[PROCESS_SKILLS] data: \n\n{data}") + + return response.json() + except Exception as e: + logger.warning(f"[PROCESS_SKILLS] Error adding id to mysql: {e}") + return None + + @require_python_package( import_name="alibabacloud_oss_v2", install_command="pip install alibabacloud-oss-v2", @@ -918,7 +948,6 @@ def create_skill_memory_item( scripts=skill_memory.get("scripts"), others=skill_memory.get("others"), url=skill_memory.get("url", ""), - skill_source=skill_memory.get("skill_source"), manager_user_id=manager_user_id, project_id=project_id, ) @@ -995,21 +1024,6 @@ def process_skill_memory_fine( complete_skill_memory: bool = True, **kwargs, ) -> list[TextualMemoryItem]: - is_upload_skill = kwargs.pop("is_upload_skill", False) - if is_upload_skill: - from memos.mem_reader.read_skill_memory.upload_skill_memory import ( - process_upload_skill_memory, - ) - - return process_upload_skill_memory( - fast_memory_items=fast_memory_items, - info=info, - embedder=embedder, - oss_config=oss_config, - skills_dir_config=skills_dir_config, - **kwargs, - ) - skills_repo_backend = _get_skill_file_storage_location() oss_client, _missing_keys, flag = _skill_init( skills_repo_backend, oss_config, skills_dir_config @@ -1239,7 +1253,6 @@ def _full_extract(): if source: skill_sources.append(source) - skill_memory["skill_source"] = "auto_create" memory_item = create_skill_memory_item( skill_memory, info, embedder, sources=skill_sources, **kwargs ) @@ -1248,4 +1261,12 @@ def _full_extract(): logger.warning(f"[PROCESS_SKILLS] Error creating skill memory item: {e}") continue + # TODO: deprecate this funtion and call + for skill_memory, skill_memory_item in zip(skill_memories, skill_memory_items, strict=False): + if skill_memory.get("update", False) and skill_memory.get("old_memory_id", ""): + continue + add_id_to_mysql( + memory_id=skill_memory_item.id, + mem_cube_id=kwargs.get("user_name", info.get("user_id", "")), + ) return skill_memory_items diff --git a/src/memos/mem_reader/read_skill_memory/upload_skill_memory.py b/src/memos/mem_reader/read_skill_memory/upload_skill_memory.py deleted file mode 100644 index 2f8f6b42e..000000000 --- a/src/memos/mem_reader/read_skill_memory/upload_skill_memory.py +++ /dev/null @@ -1,284 +0,0 @@ -import re -import shutil -import tempfile -import zipfile - -from pathlib import Path -from typing import Any -from uuid import uuid4 - -import requests - -from memos.embedders.base import BaseEmbedder -from memos.log import get_logger -from memos.mem_reader.read_skill_memory.process_skill_memory import ( - create_skill_memory_item, -) -from memos.memories.textual.item import TextualMemoryItem -from memos.utils import timed - - -logger = get_logger(__name__) - -_TEXT_MAX_LEN = 20 - - -def _truncate(text: str) -> str: - """Truncate a string to at most ``_TEXT_MAX_LEN`` characters.""" - return text[:_TEXT_MAX_LEN] - - -def _extract_zip_url_from_items(items: list[TextualMemoryItem]) -> str | None: - """ - Extract the zip download URL from fast-stage memory items. - - FileContentParser.parse_fast stores the URL in source.file_info["file_data"]. - Each upload-skill request contains exactly one zip URL. - """ - for item in items: - for source in getattr(item.metadata, "sources", None) or []: - file_info = getattr(source, "file_info", None) - if not isinstance(file_info, dict): - continue - file_data = file_info.get("file_data", "") - if ( - isinstance(file_data, str) - and file_data.startswith(("http://", "https://")) - and file_data.lower().endswith(".zip") - ): - return file_data - return None - - -def _download_zip(url: str, tmp_dir: Path) -> Path: - """Download a zip file to a local temporary directory.""" - try: - resp = requests.get(url, stream=True, timeout=60) - resp.raise_for_status() - except Exception as e: - raise ValueError(f"Failed to download zip from {url}: {e}") from e - - zip_path = tmp_dir / f"{uuid4()}.zip" - with open(zip_path, "wb") as f: - for chunk in resp.iter_content(chunk_size=8192): - f.write(chunk) - - if not zipfile.is_zipfile(zip_path): - raise ValueError(f"Downloaded file is not a valid zip: {url}") - - return zip_path - - -def _extract_and_parse_skill_zip(zip_path: Path) -> dict[str, Any]: - """ - Extract a skill zip and parse SKILL.md + directory contents into a skill_memory dict. - - The SKILL.md format mirrors the output of ``_write_skills_to_file`` in - ``process_skill_memory.py``. Section headings at any level (``#`` through - ``######``) are matched by title text (case-insensitive). - """ - # Step 1: extract & locate SKILL.md - extract_dir = zip_path.parent / zip_path.stem - with zipfile.ZipFile(zip_path, "r") as zf: - zf.extractall(extract_dir) - - skill_md_path = None - for candidate in extract_dir.rglob("SKILL.md"): - skill_md_path = candidate - break - - if skill_md_path is None: - raise FileNotFoundError(f"SKILL.md not found in zip: {zip_path.name}") - - skill_root = skill_md_path.parent - raw_text = skill_md_path.read_text(encoding="utf-8") - - # Step 2: parse frontmatter → name, description - name = "" - description = "" - fm_match = re.match(r"^---\s*\n(.*?)\n---", raw_text, re.DOTALL) - if fm_match: - for line in fm_match.group(1).splitlines(): - if line.startswith("name:"): - name = line[len("name:") :].strip() - elif line.startswith("description:"): - description = line[len("description:") :].strip() - - if not name: - name = zip_path.stem - - # Step 3: split body by any-level heading and parse each section - trigger: str = "" - procedure: str = "" - experience: list[str] = [] - preference: list[str] = [] - examples: list[str] = [] - tool: str | None = None - others_inline: dict[str, str] = {} - - known_sections = { - "trigger", - "procedure", - "experience", - "user preferences", - "examples", - "scripts", - "tool usage", - "additional information", - } - - body = raw_text[fm_match.end() :] if fm_match else raw_text - sections = re.split(r"\n(?=#{1,6}\s)", body) - - for section in sections: - section = section.strip() - if not section: - continue - - heading_match = re.match(r"^(#{1,6})\s+(.*)", section) - if not heading_match: - continue - - title = heading_match.group(2).strip() - content = section[heading_match.end() :].strip() - title_lower = title.lower() - - if title_lower not in known_sections: - logger.warning("[UPLOAD_SKILL] Unknown section '%s' in SKILL.md, skipping", title) - continue - - if title_lower == "trigger": - trigger = content - - elif title_lower == "procedure": - procedure = content - - elif title_lower == "experience": - items = re.findall(r"^\d+\.\s+(.+)$", content, re.MULTILINE) - experience = [item.strip() for item in items] if items else [] - - elif title_lower == "user preferences": - items = re.findall(r"^-\s+(.+)$", content, re.MULTILINE) - preference = [item.strip() for item in items] if items else [] - - elif title_lower == "examples": - blocks = re.findall(r"```markdown\n(.*?)\n```", content, re.DOTALL) - examples = [b.strip() for b in blocks] - - elif title_lower == "scripts": - pass - - elif title_lower == "tool usage": - tool = content.strip() if content.strip() else None - - elif title_lower == "additional information": - sub_sections = re.split(r"\n(?=#{1,6}\s)", content) - for sub in sub_sections: - sub = sub.strip() - if not sub or sub.startswith("See also:"): - continue - sub_heading = re.match(r"^(#{1,6})\s+(.*)", sub) - if not sub_heading: - continue - sub_key = sub_heading.group(2).strip() - sub_val = sub[sub_heading.end() :].strip() - if sub_val: - others_inline[sub_key] = sub_val - - # Step 4: read scripts/ directory - scripts: dict[str, str] | None = None - scripts_dir = skill_root / "scripts" - if scripts_dir.is_dir(): - scripts = {} - for py_file in scripts_dir.glob("*.py"): - scripts[py_file.name] = py_file.read_text(encoding="utf-8") - - # Step 5: read reference/ directory → merge into others - others = dict(others_inline) - reference_dir = skill_root / "reference" - if reference_dir.is_dir(): - for md_file in reference_dir.glob("*.md"): - others[md_file.name] = md_file.read_text(encoding="utf-8") - - # Step 6: truncate text fields & assemble return dict - truncated_trigger = _truncate(trigger) - - result: dict[str, Any] = { - "name": name, - "description": description, - "tags": [truncated_trigger] if truncated_trigger else [], - "procedure": _truncate(procedure), - "experience": [_truncate(e) for e in experience], - "preference": [_truncate(p) for p in preference], - "examples": [_truncate(e) for e in examples], - "tool": _truncate(tool) if tool else None, - "scripts": {k: _truncate(v) for k, v in scripts.items()} if scripts else None, - "others": {k: _truncate(v) for k, v in others.items()} if others else None, - } - # Only include trigger when non-empty; create_skill_memory_item uses - # `skill_memory.get("tags") or skill_memory.get("trigger", [])`, - # an empty-string trigger would override the correct [] fallback. - if truncated_trigger: - result["trigger"] = truncated_trigger - return result - - -@timed -def process_upload_skill_memory( - fast_memory_items: list[TextualMemoryItem], - info: dict[str, Any], - embedder: BaseEmbedder | None = None, - oss_config: dict[str, Any] | None = None, - skills_dir_config: dict[str, Any] | None = None, - **kwargs, -) -> list[TextualMemoryItem]: - """ - Process a user-uploaded skill zip, parse it, and build a SkillMemory node. - - The zip URL is taken from the fast-stage ``TextualMemoryItem`` sources - (``source.file_info["file_data"]``), consistent with both sync-fine and - async-transfer paths. - """ - zip_url = _extract_zip_url_from_items(fast_memory_items) - if not zip_url: - logger.warning("[UPLOAD_SKILL] No zip URL found in fast_memory_items") - return [] - - tmp_dir = Path(tempfile.mkdtemp(prefix="upload_skill_")) - try: - zip_path = _download_zip(zip_url, tmp_dir) - except Exception as e: - logger.warning("[UPLOAD_SKILL] Failed to download zip: %s", e) - shutil.rmtree(tmp_dir, ignore_errors=True) - return [] - - try: - skill_memory = _extract_and_parse_skill_zip(zip_path) - except FileNotFoundError as e: - logger.warning("[UPLOAD_SKILL] %s", e) - shutil.rmtree(tmp_dir, ignore_errors=True) - return [] - except Exception as e: - logger.error("[UPLOAD_SKILL] Failed to parse skill zip: %s", e) - shutil.rmtree(tmp_dir, ignore_errors=True) - return [] - - skill_memory["url"] = zip_url - skill_memory["skill_source"] = "user_upload" - - try: - skill_memory_item = create_skill_memory_item(skill_memory, info, embedder, **kwargs) - except Exception as e: - logger.error("[UPLOAD_SKILL] Failed to create skill memory item: %s", e) - shutil.rmtree(tmp_dir, ignore_errors=True) - return [] - - # Cleanup temp files - shutil.rmtree(tmp_dir, ignore_errors=True) - - logger.info( - "[UPLOAD_SKILL] Successfully created SkillMemory from uploaded zip: name=%s, id=%s", - skill_memory.get("name"), - skill_memory_item.id, - ) - return [skill_memory_item] diff --git a/src/memos/mem_scheduler/task_schedule_modules/handlers/mem_read_handler.py b/src/memos/mem_scheduler/task_schedule_modules/handlers/mem_read_handler.py index c7b01a53d..20dbb63b2 100644 --- a/src/memos/mem_scheduler/task_schedule_modules/handlers/mem_read_handler.py +++ b/src/memos/mem_scheduler/task_schedule_modules/handlers/mem_read_handler.py @@ -163,9 +163,6 @@ def _process_memories_with_reader( logger.info("Processing %s memories with mem_reader", len(memory_items)) - info = dict(info or {}) - is_upload_skill = info.pop("is_upload_skill", False) - try: processed_memories = mem_reader.fine_transfer_simple_mem( memory_items, @@ -174,7 +171,6 @@ def _process_memories_with_reader( user_name=user_name, chat_history=chat_history, user_context=user_context, - is_upload_skill=is_upload_skill, ) except Exception as e: logger.warning("%s: Fail to transfer mem: %s", e, memory_items) diff --git a/src/memos/multi_mem_cube/single_cube.py b/src/memos/multi_mem_cube/single_cube.py index 79e3837e4..1a8b7092a 100644 --- a/src/memos/multi_mem_cube/single_cube.py +++ b/src/memos/multi_mem_cube/single_cube.py @@ -529,10 +529,7 @@ def _schedule_memory_tasks( content=json.dumps(mem_ids), timestamp=datetime.utcnow(), user_name=self.cube_id, - info={ - **(add_req.info or {}), - "is_upload_skill": getattr(add_req, "is_upload_skill", False), - }, + info=add_req.info, chat_history=add_req.chat_history, user_context=user_context, ) @@ -712,7 +709,6 @@ def _process_text_mem( user_name=user_context.mem_cube_id, chat_history=add_req.chat_history, user_context=user_context, - is_upload_skill=getattr(add_req, "is_upload_skill", False), ) get_memory_ms = ts_gm.duration_ms flattened_local = [mm for m in memories_local for mm in m]