Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
276 changes: 224 additions & 52 deletions scripts/fetch_feishu.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,47 @@
FEISHU_API_BASE = "https://open.feishu.cn/open-apis"


def download_image(file_token, save_dir, access_token):
"""下载飞书图片到本地,返回文件名或 None"""
url = f"{FEISHU_API_BASE}/drive/v1/medias/{file_token}/download"
headers = {"Authorization": f"Bearer {access_token}"}
try:
resp = requests.get(url, headers=headers, timeout=30)
if resp.status_code == 200 and resp.content:
content_type = resp.headers.get("Content-Type", "")
ext = "png"
if "jpeg" in content_type or "jpg" in content_type:
ext = "jpg"
elif "gif" in content_type:
ext = "gif"
elif "webp" in content_type:
ext = "webp"

os.makedirs(save_dir, exist_ok=True)
filename = f"{file_token}.{ext}"
filepath = os.path.join(save_dir, filename)
with open(filepath, "wb") as f:
f.write(resp.content)
return filename
except Exception as e:
print(f"图片下载失败 {file_token}: {e}", file=sys.stderr)
return None


def download_all_images(blocks, save_dir, access_token):
"""下载所有图片,返回 {token: filename} 映射"""
image_map = {}
for block in blocks:
if "image" in block:
image_data = block["image"]
file_token = image_data.get("token", "")
if file_token and file_token not in image_map:
filename = download_image(file_token, save_dir, access_token)
if filename:
image_map[file_token] = filename
return image_map


def get_tenant_access_token():
"""获取 tenant_access_token"""
app_id = os.environ.get("FEISHU_APP_ID")
Expand Down Expand Up @@ -121,115 +162,237 @@ def extract_text_from_elements(elements):
return "".join(parts)


def blocks_to_markdown(blocks):
def render_cell_content(cell_block, block_index, image_map=None, image_dir_name=None):
"""渲染表格单元格内容,将子块拼接为单行文本"""
children_ids = cell_block.get("children", [])
parts = []
for child_id in children_ids:
child = block_index.get(child_id)
if not child:
continue
# 检查是否有图片数据
if "image" in child:
image_data = child["image"]
token_val = image_data.get("token", "")
if image_map and token_val in image_map:
parts.append(f"![image]({image_dir_name}/{image_map[token_val]})")
elif token_val:
parts.append(f"![image](feishu-image://{token_val})")
else:
# 尝试提取文本
for key in child:
if isinstance(child[key], dict) and "elements" in child[key]:
text = extract_text_from_elements(child[key]["elements"])
if text.strip():
parts.append(text)
break
return " ".join(parts).replace("|", "\\|")


def render_table(table_block, block_index, image_map=None, image_dir_name=None):
"""将飞书 Table block 渲染为 Markdown 表格"""
table_data = table_block.get("table", {})
table_prop = table_data.get("property", {})
row_count = table_prop.get("row_size", 0)
col_count = table_prop.get("column_size", 0)
# 使用 cells 列表(按行优先排列)
cell_ids = table_data.get("cells", []) or table_block.get("children", [])

if not cell_ids or row_count == 0 or col_count == 0:
return ""

rows = []
for r in range(row_count):
row = []
for c in range(col_count):
idx = r * col_count + c
if idx < len(cell_ids):
cell_id = cell_ids[idx]
cell_block = block_index.get(cell_id, {})
cell_text = render_cell_content(cell_block, block_index, image_map, image_dir_name)
row.append(cell_text if cell_text else " ")
else:
row.append(" ")
rows.append(row)

# 构建 Markdown 表格
md_lines = []
if rows:
md_lines.append("| " + " | ".join(rows[0]) + " |")
md_lines.append("| " + " | ".join(["---"] * col_count) + " |")
for row in rows[1:]:
md_lines.append("| " + " | ".join(row) + " |")

return "\n".join(md_lines)


LANG_MAP = {1: "plaintext", 2: "abap", 3: "ada", 4: "apache", 5: "apex",
6: "assembly", 7: "bash", 8: "c", 9: "csharp", 10: "cpp",
11: "clojure", 12: "cmake", 13: "coffeescript", 14: "css",
15: "d", 16: "dart", 17: "delphi", 18: "django", 19: "dockerfile",
20: "elixir", 21: "elm", 22: "erlang", 23: "fortran",
24: "fsharp", 25: "go", 26: "graphql", 27: "groovy", 28: "haskell",
29: "html", 30: "http", 31: "java", 32: "javascript",
33: "json", 34: "julia", 35: "kotlin", 36: "latex", 37: "lisp",
38: "lua", 39: "makefile", 40: "markdown", 41: "matlab",
42: "nginx", 43: "objectivec", 44: "ocaml", 45: "perl",
46: "php", 47: "powershell", 48: "properties", 49: "protobuf",
50: "python", 51: "r", 52: "ruby", 53: "rust", 54: "scala",
55: "scheme", 56: "scss", 57: "shell", 58: "sql", 59: "swift",
60: "thrift", 61: "toml", 62: "typescript", 63: "vbnet",
64: "verilog", 65: "vhdl", 66: "visual_basic", 67: "vue",
68: "xml", 69: "yaml"}


def detect_block_kind(block):
"""基于实际数据 key 判断块类型,比 block_type 编号更可靠"""
keys = set(block.keys()) - {"block_id", "block_type", "parent_id", "children", "comment_ids"}
if "page" in keys:
return "page"
if "table" in keys:
return "table"
if "table_cell" in keys:
return "table_cell"
if "image" in keys:
return "image"
if "grid" in keys:
return "grid"
if "grid_column" in keys:
return "grid_column"
# heading1-9
for i in range(1, 10):
if f"heading{i}" in keys:
return f"heading{i}"
if "text" in keys:
return "text"
if "bullet" in keys:
return "bullet"
if "ordered" in keys:
return "ordered"
if "code" in keys:
return "code"
if "quote" in keys:
return "quote"
if "equation" in keys:
return "equation"
if "todo" in keys:
return "todo"
if "divider" in keys:
return "divider"
if "callout" in keys:
return "callout"
return "unknown"


def blocks_to_markdown(blocks, image_map=None, image_dir_name=None):
"""将飞书 blocks 转为 Markdown"""
lines = []
ordered_list_counter = {} # parent_id -> counter

# 建立 block_id -> block 索引,用于表格渲染
block_index = {b.get("block_id"): b for b in blocks if b.get("block_id")}
# 收集所有表格/分栏子块 ID,避免重复渲染
container_child_ids = set()
for b in blocks:
kind = detect_block_kind(b)
if kind in ("table", "grid"):
container_child_ids.update(b.get("children", []))
for child_id in b.get("children", []):
child = block_index.get(child_id, {})
container_child_ids.update(child.get("children", []))

for block in blocks:
block_type = block.get("block_type")
block_id = block.get("block_id", "")
parent_id = block.get("parent_id", "")

# 1 = Page, 2 = Text, 3 = Heading1, 4 = Heading2, ..., 9 = Heading7+
# 10 = BulletList, 11 = OrderedList, 12 = Code, 13 = Quote
# 14 = Equation, 15 = Todo, 16 = Divider
# 17 = Image, 18 = TableCell, 19 = Table
# 22 = Callout, 23 = ChatCard, 27 = Grid, 28 = GridColumn
# 跳过已作为容器子块处理的 block
if block_id in container_child_ids:
continue

if block_type == 2: # Text
kind = detect_block_kind(block)

if kind == "text":
text_data = block.get("text", {})
text = extract_text_from_elements(text_data.get("elements", []))
if text.strip():
lines.append(text)
else:
lines.append("")

elif block_type in (3, 4, 5, 6, 7, 8, 9): # Heading 1-7
level = block_type - 2
heading_data = block.get("heading" + str(level), {}) or block.get("heading", {})
# Try multiple key formats
for key in [f"heading{level}", "heading"]:
if key in block:
heading_data = block[key]
break
elif kind.startswith("heading"):
level = int(kind[-1]) # heading1 -> 1
heading_data = block.get(kind, {})
text = extract_text_from_elements(heading_data.get("elements", []))
lines.append(f"{'#' * level} {text}")

elif block_type == 10: # Bullet list
elif kind == "bullet":
text_data = block.get("bullet", {})
text = extract_text_from_elements(text_data.get("elements", []))
lines.append(f"- {text}")

elif block_type == 11: # Ordered list
elif kind == "ordered":
text_data = block.get("ordered", {})
text = extract_text_from_elements(text_data.get("elements", []))
counter = ordered_list_counter.get(parent_id, 0) + 1
ordered_list_counter[parent_id] = counter
lines.append(f"{counter}. {text}")

elif block_type == 12: # Code block
elif kind == "code":
code_data = block.get("code", {})
text = extract_text_from_elements(code_data.get("elements", []))
lang = code_data.get("style", {}).get("language", "")
# Map language codes
lang_map = {1: "plaintext", 2: "abap", 3: "ada", 4: "apache", 5: "apex",
6: "assembly", 7: "bash", 8: "c", 9: "csharp", 10: "cpp",
11: "clojure", 12: "cmake", 13: "coffeescript", 14: "css",
15: "d", 16: "dart", 17: "delphi", 18: "django", 19: "dockerfile",
20: "elixir", 21: "elm", 22: "erlang", 23: "fortran",
24: "fsharp", 25: "go", 26: "graphql", 27: "groovy", 28: "haskell",
29: "html", 30: "http", 31: "java", 32: "javascript",
33: "json", 34: "julia", 35: "kotlin", 36: "latex", 37: "lisp",
38: "lua", 39: "makefile", 40: "markdown", 41: "matlab",
42: "nginx", 43: "objectivec", 44: "ocaml", 45: "perl",
46: "php", 47: "powershell", 48: "properties", 49: "protobuf",
50: "python", 51: "r", 52: "ruby", 53: "rust", 54: "scala",
55: "scheme", 56: "scss", 57: "shell", 58: "sql", 59: "swift",
60: "thrift", 61: "toml", 62: "typescript", 63: "vbnet",
64: "verilog", 65: "vhdl", 66: "visual_basic", 67: "vue",
68: "xml", 69: "yaml"}
lang_str = lang_map.get(lang, "") if isinstance(lang, int) else str(lang)
lang_str = LANG_MAP.get(lang, "") if isinstance(lang, int) else str(lang)
lines.append(f"```{lang_str}")
lines.append(text)
lines.append("```")

elif block_type == 13: # Quote
elif kind == "quote":
text_data = block.get("quote", {})
text = extract_text_from_elements(text_data.get("elements", []))
lines.append(f"> {text}")

elif block_type == 14: # Equation block
elif kind == "equation":
eq_data = block.get("equation", {})
text = extract_text_from_elements(eq_data.get("elements", []))
lines.append(f"$$\n{text}\n$$")

elif block_type == 15: # Todo
elif kind == "todo":
todo_data = block.get("todo", {})
text = extract_text_from_elements(todo_data.get("elements", []))
done = todo_data.get("style", {}).get("done", False)
checkbox = "[x]" if done else "[ ]"
lines.append(f"- {checkbox} {text}")

elif block_type == 16: # Divider
elif kind == "divider":
lines.append("---")

elif block_type == 17: # Image
elif kind == "image":
image_data = block.get("image", {})
token_val = image_data.get("token", "")
lines.append(f"![image](feishu-image://{token_val})")
if image_map and token_val in image_map:
lines.append(f"![image]({image_dir_name}/{image_map[token_val]})")
elif token_val:
lines.append(f"![image](feishu-image://{token_val})")

elif kind == "table_cell":
pass # 由 table 统一渲染

elif block_type == 22: # Callout
elif kind == "table":
table_md = render_table(block, block_index, image_map, image_dir_name)
if table_md:
lines.append(table_md)

elif kind == "callout":
callout_data = block.get("callout", {})
# Callout is a container, children will be processed separately
emoji = callout_data.get("emoji_id", "")
if emoji:
lines.append(f"> {emoji}")

elif block_type == 1: # Page (root), skip
pass
elif kind in ("page", "grid", "grid_column"):
pass # 容器块,跳过

else:
# Unknown block type, try to extract any text
# Unknown block, try to extract any text
for key in block:
if isinstance(block[key], dict) and "elements" in block[key]:
text = extract_text_from_elements(block[key]["elements"])
Expand All @@ -240,8 +403,8 @@ def blocks_to_markdown(blocks):
return "\n\n".join(lines)


def fetch_feishu_doc(url_or_id):
"""主函数:获取飞书文档并转为 Markdown"""
def fetch_feishu_doc(url_or_id, save_dir=None):
"""主函数:获取飞书文档并转为 Markdown。save_dir 非空时下载图片到本地。"""
# 解析 URL
doc_id, doc_type = parse_feishu_url(url_or_id)
if not doc_id:
Expand Down Expand Up @@ -272,8 +435,16 @@ def fetch_feishu_doc(url_or_id):
if err:
return {"error": err}

# 下载图片(save_dir 非空时)
image_map = {}
image_dir_name = None
if save_dir and title:
image_dir_name = f"{title}_images"
image_save_dir = os.path.join(save_dir, image_dir_name)
image_map = download_all_images(blocks, image_save_dir, token)

# 转换为 Markdown
content = blocks_to_markdown(blocks)
content = blocks_to_markdown(blocks, image_map=image_map, image_dir_name=image_dir_name)

return {
"title": title,
Expand Down Expand Up @@ -312,7 +483,8 @@ def format_as_markdown(result):
url = sys.argv[1]
use_json = "--json" in sys.argv

result = fetch_feishu_doc(url)
save_dir = os.path.expanduser("~/Downloads") if not use_json else None
result = fetch_feishu_doc(url, save_dir=save_dir)

if use_json:
print(json.dumps(result, ensure_ascii=False, indent=2))
Expand Down