From d4ec16720bb7ce4661388c9ff21afc2bc44dadf5 Mon Sep 17 00:00:00 2001 From: Hao Date: Mon, 23 Mar 2026 11:29:47 +0800 Subject: [PATCH] fix: feishu block type detection, table rendering & image download - Replace unreliable block_type number matching with key-based detection (detect_block_kind), fixing bullet lists rendered as empty code blocks and images being completely lost - Add table rendering support (render_table + render_cell_content) for Table and TableCell blocks, which were previously unhandled - Add image download via Feishu drive API (drive:drive:readonly permission required), with graceful fallback to feishu-image:// protocol - All changes are backward compatible with default parameter values --- scripts/fetch_feishu.py | 276 ++++++++++++++++++++++++++++++++-------- 1 file changed, 224 insertions(+), 52 deletions(-) diff --git a/scripts/fetch_feishu.py b/scripts/fetch_feishu.py index db767cd..5d2e822 100755 --- a/scripts/fetch_feishu.py +++ b/scripts/fetch_feishu.py @@ -10,6 +10,47 @@ FEISHU_API_BASE = "https://open.feishu.cn/open-apis" +def download_image(file_token, save_dir, access_token): + """下载飞书图片到本地,返回文件名或 None""" + url = f"{FEISHU_API_BASE}/drive/v1/medias/{file_token}/download" + headers = {"Authorization": f"Bearer {access_token}"} + try: + resp = requests.get(url, headers=headers, timeout=30) + if resp.status_code == 200 and resp.content: + content_type = resp.headers.get("Content-Type", "") + ext = "png" + if "jpeg" in content_type or "jpg" in content_type: + ext = "jpg" + elif "gif" in content_type: + ext = "gif" + elif "webp" in content_type: + ext = "webp" + + os.makedirs(save_dir, exist_ok=True) + filename = f"{file_token}.{ext}" + filepath = os.path.join(save_dir, filename) + with open(filepath, "wb") as f: + f.write(resp.content) + return filename + except Exception as e: + print(f"图片下载失败 {file_token}: {e}", file=sys.stderr) + return None + + +def download_all_images(blocks, save_dir, access_token): + """下载所有图片,返回 {token: filename} 映射""" + image_map = {} + for block in blocks: + if "image" in block: + image_data = block["image"] + file_token = image_data.get("token", "") + if file_token and file_token not in image_map: + filename = download_image(file_token, save_dir, access_token) + if filename: + image_map[file_token] = filename + return image_map + + def get_tenant_access_token(): """获取 tenant_access_token""" app_id = os.environ.get("FEISHU_APP_ID") @@ -121,22 +162,156 @@ def extract_text_from_elements(elements): return "".join(parts) -def blocks_to_markdown(blocks): +def render_cell_content(cell_block, block_index, image_map=None, image_dir_name=None): + """渲染表格单元格内容,将子块拼接为单行文本""" + children_ids = cell_block.get("children", []) + parts = [] + for child_id in children_ids: + child = block_index.get(child_id) + if not child: + continue + # 检查是否有图片数据 + if "image" in child: + image_data = child["image"] + token_val = image_data.get("token", "") + if image_map and token_val in image_map: + parts.append(f"![image]({image_dir_name}/{image_map[token_val]})") + elif token_val: + parts.append(f"![image](feishu-image://{token_val})") + else: + # 尝试提取文本 + for key in child: + if isinstance(child[key], dict) and "elements" in child[key]: + text = extract_text_from_elements(child[key]["elements"]) + if text.strip(): + parts.append(text) + break + return " ".join(parts).replace("|", "\\|") + + +def render_table(table_block, block_index, image_map=None, image_dir_name=None): + """将飞书 Table block 渲染为 Markdown 表格""" + table_data = table_block.get("table", {}) + table_prop = table_data.get("property", {}) + row_count = table_prop.get("row_size", 0) + col_count = table_prop.get("column_size", 0) + # 使用 cells 列表(按行优先排列) + cell_ids = table_data.get("cells", []) or table_block.get("children", []) + + if not cell_ids or row_count == 0 or col_count == 0: + return "" + + rows = [] + for r in range(row_count): + row = [] + for c in range(col_count): + idx = r * col_count + c + if idx < len(cell_ids): + cell_id = cell_ids[idx] + cell_block = block_index.get(cell_id, {}) + cell_text = render_cell_content(cell_block, block_index, image_map, image_dir_name) + row.append(cell_text if cell_text else " ") + else: + row.append(" ") + rows.append(row) + + # 构建 Markdown 表格 + md_lines = [] + if rows: + md_lines.append("| " + " | ".join(rows[0]) + " |") + md_lines.append("| " + " | ".join(["---"] * col_count) + " |") + for row in rows[1:]: + md_lines.append("| " + " | ".join(row) + " |") + + return "\n".join(md_lines) + + +LANG_MAP = {1: "plaintext", 2: "abap", 3: "ada", 4: "apache", 5: "apex", + 6: "assembly", 7: "bash", 8: "c", 9: "csharp", 10: "cpp", + 11: "clojure", 12: "cmake", 13: "coffeescript", 14: "css", + 15: "d", 16: "dart", 17: "delphi", 18: "django", 19: "dockerfile", + 20: "elixir", 21: "elm", 22: "erlang", 23: "fortran", + 24: "fsharp", 25: "go", 26: "graphql", 27: "groovy", 28: "haskell", + 29: "html", 30: "http", 31: "java", 32: "javascript", + 33: "json", 34: "julia", 35: "kotlin", 36: "latex", 37: "lisp", + 38: "lua", 39: "makefile", 40: "markdown", 41: "matlab", + 42: "nginx", 43: "objectivec", 44: "ocaml", 45: "perl", + 46: "php", 47: "powershell", 48: "properties", 49: "protobuf", + 50: "python", 51: "r", 52: "ruby", 53: "rust", 54: "scala", + 55: "scheme", 56: "scss", 57: "shell", 58: "sql", 59: "swift", + 60: "thrift", 61: "toml", 62: "typescript", 63: "vbnet", + 64: "verilog", 65: "vhdl", 66: "visual_basic", 67: "vue", + 68: "xml", 69: "yaml"} + + +def detect_block_kind(block): + """基于实际数据 key 判断块类型,比 block_type 编号更可靠""" + keys = set(block.keys()) - {"block_id", "block_type", "parent_id", "children", "comment_ids"} + if "page" in keys: + return "page" + if "table" in keys: + return "table" + if "table_cell" in keys: + return "table_cell" + if "image" in keys: + return "image" + if "grid" in keys: + return "grid" + if "grid_column" in keys: + return "grid_column" + # heading1-9 + for i in range(1, 10): + if f"heading{i}" in keys: + return f"heading{i}" + if "text" in keys: + return "text" + if "bullet" in keys: + return "bullet" + if "ordered" in keys: + return "ordered" + if "code" in keys: + return "code" + if "quote" in keys: + return "quote" + if "equation" in keys: + return "equation" + if "todo" in keys: + return "todo" + if "divider" in keys: + return "divider" + if "callout" in keys: + return "callout" + return "unknown" + + +def blocks_to_markdown(blocks, image_map=None, image_dir_name=None): """将飞书 blocks 转为 Markdown""" lines = [] ordered_list_counter = {} # parent_id -> counter + # 建立 block_id -> block 索引,用于表格渲染 + block_index = {b.get("block_id"): b for b in blocks if b.get("block_id")} + # 收集所有表格/分栏子块 ID,避免重复渲染 + container_child_ids = set() + for b in blocks: + kind = detect_block_kind(b) + if kind in ("table", "grid"): + container_child_ids.update(b.get("children", [])) + for child_id in b.get("children", []): + child = block_index.get(child_id, {}) + container_child_ids.update(child.get("children", [])) + for block in blocks: - block_type = block.get("block_type") + block_id = block.get("block_id", "") parent_id = block.get("parent_id", "") - # 1 = Page, 2 = Text, 3 = Heading1, 4 = Heading2, ..., 9 = Heading7+ - # 10 = BulletList, 11 = OrderedList, 12 = Code, 13 = Quote - # 14 = Equation, 15 = Todo, 16 = Divider - # 17 = Image, 18 = TableCell, 19 = Table - # 22 = Callout, 23 = ChatCard, 27 = Grid, 28 = GridColumn + # 跳过已作为容器子块处理的 block + if block_id in container_child_ids: + continue - if block_type == 2: # Text + kind = detect_block_kind(block) + + if kind == "text": text_data = block.get("text", {}) text = extract_text_from_elements(text_data.get("elements", [])) if text.strip(): @@ -144,92 +319,80 @@ def blocks_to_markdown(blocks): else: lines.append("") - elif block_type in (3, 4, 5, 6, 7, 8, 9): # Heading 1-7 - level = block_type - 2 - heading_data = block.get("heading" + str(level), {}) or block.get("heading", {}) - # Try multiple key formats - for key in [f"heading{level}", "heading"]: - if key in block: - heading_data = block[key] - break + elif kind.startswith("heading"): + level = int(kind[-1]) # heading1 -> 1 + heading_data = block.get(kind, {}) text = extract_text_from_elements(heading_data.get("elements", [])) lines.append(f"{'#' * level} {text}") - elif block_type == 10: # Bullet list + elif kind == "bullet": text_data = block.get("bullet", {}) text = extract_text_from_elements(text_data.get("elements", [])) lines.append(f"- {text}") - elif block_type == 11: # Ordered list + elif kind == "ordered": text_data = block.get("ordered", {}) text = extract_text_from_elements(text_data.get("elements", [])) counter = ordered_list_counter.get(parent_id, 0) + 1 ordered_list_counter[parent_id] = counter lines.append(f"{counter}. {text}") - elif block_type == 12: # Code block + elif kind == "code": code_data = block.get("code", {}) text = extract_text_from_elements(code_data.get("elements", [])) lang = code_data.get("style", {}).get("language", "") - # Map language codes - lang_map = {1: "plaintext", 2: "abap", 3: "ada", 4: "apache", 5: "apex", - 6: "assembly", 7: "bash", 8: "c", 9: "csharp", 10: "cpp", - 11: "clojure", 12: "cmake", 13: "coffeescript", 14: "css", - 15: "d", 16: "dart", 17: "delphi", 18: "django", 19: "dockerfile", - 20: "elixir", 21: "elm", 22: "erlang", 23: "fortran", - 24: "fsharp", 25: "go", 26: "graphql", 27: "groovy", 28: "haskell", - 29: "html", 30: "http", 31: "java", 32: "javascript", - 33: "json", 34: "julia", 35: "kotlin", 36: "latex", 37: "lisp", - 38: "lua", 39: "makefile", 40: "markdown", 41: "matlab", - 42: "nginx", 43: "objectivec", 44: "ocaml", 45: "perl", - 46: "php", 47: "powershell", 48: "properties", 49: "protobuf", - 50: "python", 51: "r", 52: "ruby", 53: "rust", 54: "scala", - 55: "scheme", 56: "scss", 57: "shell", 58: "sql", 59: "swift", - 60: "thrift", 61: "toml", 62: "typescript", 63: "vbnet", - 64: "verilog", 65: "vhdl", 66: "visual_basic", 67: "vue", - 68: "xml", 69: "yaml"} - lang_str = lang_map.get(lang, "") if isinstance(lang, int) else str(lang) + lang_str = LANG_MAP.get(lang, "") if isinstance(lang, int) else str(lang) lines.append(f"```{lang_str}") lines.append(text) lines.append("```") - elif block_type == 13: # Quote + elif kind == "quote": text_data = block.get("quote", {}) text = extract_text_from_elements(text_data.get("elements", [])) lines.append(f"> {text}") - elif block_type == 14: # Equation block + elif kind == "equation": eq_data = block.get("equation", {}) text = extract_text_from_elements(eq_data.get("elements", [])) lines.append(f"$$\n{text}\n$$") - elif block_type == 15: # Todo + elif kind == "todo": todo_data = block.get("todo", {}) text = extract_text_from_elements(todo_data.get("elements", [])) done = todo_data.get("style", {}).get("done", False) checkbox = "[x]" if done else "[ ]" lines.append(f"- {checkbox} {text}") - elif block_type == 16: # Divider + elif kind == "divider": lines.append("---") - elif block_type == 17: # Image + elif kind == "image": image_data = block.get("image", {}) token_val = image_data.get("token", "") - lines.append(f"![image](feishu-image://{token_val})") + if image_map and token_val in image_map: + lines.append(f"![image]({image_dir_name}/{image_map[token_val]})") + elif token_val: + lines.append(f"![image](feishu-image://{token_val})") + + elif kind == "table_cell": + pass # 由 table 统一渲染 - elif block_type == 22: # Callout + elif kind == "table": + table_md = render_table(block, block_index, image_map, image_dir_name) + if table_md: + lines.append(table_md) + + elif kind == "callout": callout_data = block.get("callout", {}) - # Callout is a container, children will be processed separately emoji = callout_data.get("emoji_id", "") if emoji: lines.append(f"> {emoji}") - elif block_type == 1: # Page (root), skip - pass + elif kind in ("page", "grid", "grid_column"): + pass # 容器块,跳过 else: - # Unknown block type, try to extract any text + # Unknown block, try to extract any text for key in block: if isinstance(block[key], dict) and "elements" in block[key]: text = extract_text_from_elements(block[key]["elements"]) @@ -240,8 +403,8 @@ def blocks_to_markdown(blocks): return "\n\n".join(lines) -def fetch_feishu_doc(url_or_id): - """主函数:获取飞书文档并转为 Markdown""" +def fetch_feishu_doc(url_or_id, save_dir=None): + """主函数:获取飞书文档并转为 Markdown。save_dir 非空时下载图片到本地。""" # 解析 URL doc_id, doc_type = parse_feishu_url(url_or_id) if not doc_id: @@ -272,8 +435,16 @@ def fetch_feishu_doc(url_or_id): if err: return {"error": err} + # 下载图片(save_dir 非空时) + image_map = {} + image_dir_name = None + if save_dir and title: + image_dir_name = f"{title}_images" + image_save_dir = os.path.join(save_dir, image_dir_name) + image_map = download_all_images(blocks, image_save_dir, token) + # 转换为 Markdown - content = blocks_to_markdown(blocks) + content = blocks_to_markdown(blocks, image_map=image_map, image_dir_name=image_dir_name) return { "title": title, @@ -312,7 +483,8 @@ def format_as_markdown(result): url = sys.argv[1] use_json = "--json" in sys.argv - result = fetch_feishu_doc(url) + save_dir = os.path.expanduser("~/Downloads") if not use_json else None + result = fetch_feishu_doc(url, save_dir=save_dir) if use_json: print(json.dumps(result, ensure_ascii=False, indent=2))