From 3b3473332f424973b50ed2251b097e8a1cb3bc02 Mon Sep 17 00:00:00 2001 From: WU Leizhi Date: Mon, 20 Oct 2025 08:55:45 +0800 Subject: [PATCH 1/4] feat: bootstrap ProjectAI MVP --- .gitignore | 3 + README.md | 56 +++++++++++++- config/sample_config.json | 16 ++++ data/document_tasks.json | 24 ++++++ data/messages.json | 12 +++ data/sheet_tasks.json | 24 ++++++ run.py | 19 +++++ src/project_ai/__init__.py | 5 ++ src/project_ai/cli.py | 67 +++++++++++++++++ src/project_ai/data_sources/__init__.py | 0 src/project_ai/data_sources/feishu_client.py | 74 +++++++++++++++++++ src/project_ai/models/__init__.py | 0 src/project_ai/models/project_state.py | 51 +++++++++++++ src/project_ai/models/risk.py | 32 ++++++++ src/project_ai/parsers/__init__.py | 0 src/project_ai/services/__init__.py | 0 src/project_ai/services/ingestion_service.py | 19 +++++ .../services/notification_service.py | 57 ++++++++++++++ src/project_ai/services/report_service.py | 73 ++++++++++++++++++ .../services/risk_analysis_service.py | 60 +++++++++++++++ .../templates/daily_report_template.txt | 12 +++ src/project_ai/utils/__init__.py | 0 22 files changed, 602 insertions(+), 2 deletions(-) create mode 100644 .gitignore create mode 100644 config/sample_config.json create mode 100644 data/document_tasks.json create mode 100644 data/messages.json create mode 100644 data/sheet_tasks.json create mode 100644 run.py create mode 100644 src/project_ai/__init__.py create mode 100644 src/project_ai/cli.py create mode 100644 src/project_ai/data_sources/__init__.py create mode 100644 src/project_ai/data_sources/feishu_client.py create mode 100644 src/project_ai/models/__init__.py create mode 100644 src/project_ai/models/project_state.py create mode 100644 src/project_ai/models/risk.py create mode 100644 src/project_ai/parsers/__init__.py create mode 100644 src/project_ai/services/__init__.py create mode 100644 src/project_ai/services/ingestion_service.py create mode 100644 src/project_ai/services/notification_service.py create mode 100644 src/project_ai/services/report_service.py create mode 100644 src/project_ai/services/risk_analysis_service.py create mode 100644 src/project_ai/templates/daily_report_template.txt create mode 100644 src/project_ai/utils/__init__.py diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..f849cca --- /dev/null +++ b/.gitignore @@ -0,0 +1,3 @@ +__pycache__/ +*.pyc +output/ diff --git a/README.md b/README.md index 9178279..d6a5e0a 100644 --- a/README.md +++ b/README.md @@ -1,2 +1,54 @@ -# Program-Management-System -用于座舱项目管理风险汇报 +# ProjectAI 自动项目管控工具 + +ProjectAI 是面向智能座舱项目管理团队的 AI 自动项目管控工具样例,实现了数据采集、风险识别、日报生成与通知提醒的最小可行产品(MVP)。 + +## 功能概览 + +- **信息抓取**:从示例飞书文档、表格、群消息数据中提取任务与事件。 +- **风险识别**:通过规则 + AI 推理思路(此处以规则示例实现)识别延期、阻塞、进度滞后等风险。 +- **日报生成**:根据模板自动生成日报文本并保存到本地文件。 +- **通知提醒**:对高风险任务生成待发送的飞书提醒内容。 + +## 目录结构 + +``` +├── config/ # 示例配置 +├── data/ # 示例飞书导出数据 +├── output/ # 日报输出目录 +├── src/project_ai/ # 核心源码 +│ ├── data_sources/ # 数据抓取与解析 +│ ├── models/ # 数据模型与风险模型 +│ ├── services/ # 风险识别、报告、通知等服务 +│ └── templates/ # 日报模板 +``` + +## 快速开始 + +1. 创建虚拟环境并安装依赖(本示例使用 Python 标准库,无需额外依赖)。 +2. 执行以下命令运行完整流程: + + ```bash + python run.py run --config config/sample_config.json + ``` + +3. 运行结束后,在 `output/daily_report.txt` 中查看生成的日报,同时命令行会输出需要发送的高风险提醒。 + +## 配置说明 + +`config/sample_config.json` 包含以下配置项: + +- `feishu`:指定示例数据文件路径,可替换为真实的 Feishu API 数据抓取逻辑。 +- `report.output_path`:日报输出路径。 +- `report.template_path`:日报模板,可根据团队格式自定义。 +- `notifications`:通知开关、接收人列表与高风险提醒阈值。 + +## 后续扩展建议 + +- 接入真实的 Feishu API,实现定时同步项目数据。 +- 增加 NLP 模型解析非结构化群聊/文档内容,提取更多任务语义。 +- 结合历史数据和机器学习模型,优化风险识别准确率。 +- 构建 Web 看板展示进度、风险趋势,并加入甘特图等可视化能力。 + +## 许可证 + +本示例项目用于演示 AI 自动项目管控的基础能力,可根据团队需要自由扩展与修改。 diff --git a/config/sample_config.json b/config/sample_config.json new file mode 100644 index 0000000..f3768ee --- /dev/null +++ b/config/sample_config.json @@ -0,0 +1,16 @@ +{ + "feishu": { + "documents": ["data/document_tasks.json"], + "sheets": ["data/sheet_tasks.json"], + "messages": ["data/messages.json"] + }, + "report": { + "output_path": "output/daily_report.txt", + "template_path": "src/project_ai/templates/daily_report_template.txt" + }, + "notifications": { + "enabled": true, + "recipients": ["project.manager@feishu"], + "high_risk_threshold": 3 + } +} diff --git a/data/document_tasks.json b/data/document_tasks.json new file mode 100644 index 0000000..93f6174 --- /dev/null +++ b/data/document_tasks.json @@ -0,0 +1,24 @@ +{ + "tasks": [ + { + "id": "DOC-101", + "title": "语音交互需求评审", + "owner": "张伟", + "due_date": "2024-05-10", + "status": "completed", + "progress": 100, + "description": "完成语音交互模块的需求评审", + "blockers": null + }, + { + "id": "DOC-102", + "title": "驾驶舱 UI 设计验收", + "owner": "李娜", + "due_date": "2024-05-14", + "status": "in progress", + "progress": 45, + "description": "与设计团队确认交互细节", + "blockers": null + } + ] +} diff --git a/data/messages.json b/data/messages.json new file mode 100644 index 0000000..a76b98b --- /dev/null +++ b/data/messages.json @@ -0,0 +1,12 @@ +{ + "events": [ + { + "timestamp": "2024-05-11 09:30", + "summary": "产品经理在群内确认语音交互需求已冻结。" + }, + { + "timestamp": "2024-05-11 15:00", + "summary": "传感器团队预计 5 月 13 日提供新固件。" + } + ] +} diff --git a/data/sheet_tasks.json b/data/sheet_tasks.json new file mode 100644 index 0000000..03acb37 --- /dev/null +++ b/data/sheet_tasks.json @@ -0,0 +1,24 @@ +{ + "tasks": [ + { + "id": "SHEET-201", + "title": "智能导航算法联调", + "owner": "王强", + "due_date": "2024-05-12", + "status": "blocked", + "progress": 30, + "description": "等待传感器团队提供最新固件", + "blockers": "传感器固件未发布" + }, + { + "id": "SHEET-202", + "title": "驾驶行为模型验证", + "owner": "陈晨", + "due_date": "2024-05-16", + "status": "not started", + "progress": 10, + "description": "搭建仿真环境并验证模型数据", + "blockers": null + } + ] +} diff --git a/run.py b/run.py new file mode 100644 index 0000000..dce6c06 --- /dev/null +++ b/run.py @@ -0,0 +1,19 @@ +from __future__ import annotations + +import sys +from pathlib import Path + + +def main() -> None: + project_root = Path(__file__).parent + src_path = project_root / "src" + if str(src_path) not in sys.path: + sys.path.insert(0, str(src_path)) + + from project_ai.cli import main as cli_main # noqa: WPS433 - runtime import for path setup + + cli_main() + + +if __name__ == "__main__": + main() diff --git a/src/project_ai/__init__.py b/src/project_ai/__init__.py new file mode 100644 index 0000000..75652e7 --- /dev/null +++ b/src/project_ai/__init__.py @@ -0,0 +1,5 @@ +"""ProjectAI - AI 驱动的项目风险识别与日报生成工具。""" + +from .cli import main + +__all__ = ["main"] diff --git a/src/project_ai/cli.py b/src/project_ai/cli.py new file mode 100644 index 0000000..d7c9360 --- /dev/null +++ b/src/project_ai/cli.py @@ -0,0 +1,67 @@ +from __future__ import annotations + +import argparse +import json +from pathlib import Path +from typing import Any, Dict + +from .services.ingestion_service import IngestionService +from .services.notification_service import NotificationService +from .services.report_service import ReportService +from .services.risk_analysis_service import RiskAnalysisService + + +def load_config(path: Path) -> Dict[str, Any]: + with path.open("r", encoding="utf-8") as handle: + return json.load(handle) + + +def build_parser() -> argparse.ArgumentParser: + parser = argparse.ArgumentParser(description="ProjectAI - AI 驱动的项目风险与日报工具") + parser.add_argument("command", choices=["run"], help="要执行的操作") + parser.add_argument("--config", required=True, help="配置文件路径 (JSON)") + parser.add_argument("--output", help="日报输出路径,覆盖配置文件设置") + parser.add_argument("--template", help="日报模板路径,覆盖配置文件设置") + return parser + + +def main(argv: list[str] | None = None) -> None: + parser = build_parser() + args = parser.parse_args(argv) + + config_path = Path(args.config) + config = load_config(config_path) + + if args.command != "run": # pragma: no cover - defensive + parser.error(f"暂不支持的命令:{args.command}") + + ingestion = IngestionService.from_config(config) + state = ingestion.load_state() + + risk_service = RiskAnalysisService() + risks = risk_service.analyze(state) + + report_template = Path(args.template) if args.template else None + if not report_template and config.get("report", {}).get("template_path"): + report_template = Path(config["report"]["template_path"]) + + report_service = ReportService(template_path=report_template) + report_content = report_service.render_report(state, risks) + + output_path = args.output or config.get("report", {}).get("output_path", "output/daily_report.txt") + report_service.save_report(report_content, Path(output_path)) + + notification_service = NotificationService.from_config(config) + notifications = notification_service.build_notifications(risks) + + print("日报已生成:", output_path) + if notifications: + print("需要发送的提醒:") + for notification in notifications: + print(f"- 发送给 {notification.target}: {notification.message}") + else: + print("暂无需要发送的高风险提醒。") + + +if __name__ == "__main__": # pragma: no cover + main() diff --git a/src/project_ai/data_sources/__init__.py b/src/project_ai/data_sources/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/project_ai/data_sources/feishu_client.py b/src/project_ai/data_sources/feishu_client.py new file mode 100644 index 0000000..7c7624c --- /dev/null +++ b/src/project_ai/data_sources/feishu_client.py @@ -0,0 +1,74 @@ +from __future__ import annotations + +import json +from dataclasses import dataclass +from datetime import datetime +from pathlib import Path +from typing import Dict, Iterable, List + +from ..models.project_state import ProjectEvent, ProjectState, Task + + +@dataclass +class FeishuSourceConfig: + documents: List[Path] + sheets: List[Path] + messages: List[Path] + + @classmethod + def from_dict(cls, data: Dict[str, Iterable[str]]) -> "FeishuSourceConfig": + return cls( + documents=[Path(path) for path in data.get("documents", [])], + sheets=[Path(path) for path in data.get("sheets", [])], + messages=[Path(path) for path in data.get("messages", [])], + ) + + +class FeishuClient: + """Light-weight client that loads sample Feishu exports from JSON files.""" + + def __init__(self, config: FeishuSourceConfig) -> None: + self._config = config + + def load_project_state(self) -> ProjectState: + state = ProjectState() + for path in self._config.documents + self._config.sheets: + self._load_tasks(path, state) + for path in self._config.messages: + self._load_events(path, state) + return state + + def _load_tasks(self, path: Path, state: ProjectState) -> None: + raw = self._read_json(path) + for entry in raw.get("tasks", []): + try: + task = Task( + id=str(entry["id"]), + title=entry["title"], + owner=entry.get("owner", "unknown"), + due_date=datetime.fromisoformat(entry["due_date"]).date(), + status=entry.get("status", "unknown"), + progress=int(entry.get("progress", 0)), + source=str(path), + description=entry.get("description", ""), + blockers=entry.get("blockers"), + ) + except (KeyError, ValueError) as exc: # pragma: no cover - defensive + raise ValueError(f"Invalid task entry in {path}: {entry}") from exc + state.add_task(task) + + def _load_events(self, path: Path, state: ProjectState) -> None: + raw = self._read_json(path) + for entry in raw.get("events", []): + event = ProjectEvent( + timestamp=entry.get("timestamp", ""), + summary=entry.get("summary", ""), + source=str(path), + ) + state.add_event(event) + + def _read_json(self, path: Path) -> Dict: + if not path.exists(): # pragma: no cover - defensive + raise FileNotFoundError(f"Sample data file not found: {path}") + with path.open("r", encoding="utf-8") as handle: + return json.load(handle) diff --git a/src/project_ai/models/__init__.py b/src/project_ai/models/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/project_ai/models/project_state.py b/src/project_ai/models/project_state.py new file mode 100644 index 0000000..7a20db3 --- /dev/null +++ b/src/project_ai/models/project_state.py @@ -0,0 +1,51 @@ +from __future__ import annotations + +from dataclasses import dataclass, field +from datetime import date +from typing import List, Optional + + +@dataclass +class Task: + """Represents a unit of work tracked by the project management system.""" + + id: str + title: str + owner: str + due_date: date + status: str + progress: int + source: str + description: str = "" + blockers: Optional[str] = None + + def normalized_status(self) -> str: + """Return a lower-case status to simplify comparisons.""" + + return self.status.lower().strip() + + +@dataclass +class ProjectEvent: + """Represents a notable event extracted from project communications.""" + + timestamp: str + summary: str + source: str + + +@dataclass +class ProjectState: + """Container for all of the project's structured data.""" + + tasks: List[Task] = field(default_factory=list) + events: List[ProjectEvent] = field(default_factory=list) + + def add_task(self, task: Task) -> None: + self.tasks.append(task) + + def add_event(self, event: ProjectEvent) -> None: + self.events.append(event) + + def incomplete_tasks(self) -> List[Task]: + return [task for task in self.tasks if task.normalized_status() != "completed"] diff --git a/src/project_ai/models/risk.py b/src/project_ai/models/risk.py new file mode 100644 index 0000000..99a07e3 --- /dev/null +++ b/src/project_ai/models/risk.py @@ -0,0 +1,32 @@ +from __future__ import annotations + +from dataclasses import dataclass +from enum import Enum +from typing import Optional + + +class RiskLevel(str, Enum): + LOW = "low" + MEDIUM = "medium" + HIGH = "high" + + +@dataclass +class Risk: + """Represents a risk identified within the project.""" + + title: str + description: str + level: RiskLevel + owner: Optional[str] = None + related_task_id: Optional[str] = None + + +@dataclass +class Notification: + """Notification produced when a risk warrants human attention.""" + + message: str + target: str + urgency: RiskLevel + related_risk: Risk diff --git a/src/project_ai/parsers/__init__.py b/src/project_ai/parsers/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/project_ai/services/__init__.py b/src/project_ai/services/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/project_ai/services/ingestion_service.py b/src/project_ai/services/ingestion_service.py new file mode 100644 index 0000000..6ceac2e --- /dev/null +++ b/src/project_ai/services/ingestion_service.py @@ -0,0 +1,19 @@ +from __future__ import annotations + +from ..data_sources.feishu_client import FeishuClient, FeishuSourceConfig +from ..models.project_state import ProjectState + + +class IngestionService: + """High level orchestrator for loading project data from Feishu.""" + + def __init__(self, client: FeishuClient) -> None: + self._client = client + + @classmethod + def from_config(cls, config: dict) -> "IngestionService": + source = FeishuSourceConfig.from_dict(config.get("feishu", {})) + return cls(FeishuClient(source)) + + def load_state(self) -> ProjectState: + return self._client.load_project_state() diff --git a/src/project_ai/services/notification_service.py b/src/project_ai/services/notification_service.py new file mode 100644 index 0000000..f425ddd --- /dev/null +++ b/src/project_ai/services/notification_service.py @@ -0,0 +1,57 @@ +from __future__ import annotations + +from dataclasses import dataclass +from typing import Iterable, List + +from ..models.risk import Notification, Risk, RiskLevel + + +@dataclass +class NotificationConfig: + enabled: bool + recipients: List[str] + high_risk_threshold: int = 1 + + @classmethod + def from_dict(cls, data: dict) -> "NotificationConfig": + return cls( + enabled=data.get("enabled", False), + recipients=list(data.get("recipients", [])), + high_risk_threshold=int(data.get("high_risk_threshold", 1)), + ) + + +class NotificationService: + """Generate Feishu-ready notifications for high priority risks.""" + + def __init__(self, config: NotificationConfig) -> None: + self._config = config + + @classmethod + def from_config(cls, data: dict) -> "NotificationService": + return cls(NotificationConfig.from_dict(data.get("notifications", {}))) + + def build_notifications(self, risks: Iterable[Risk]) -> List[Notification]: + if not self._config.enabled: + return [] + + notifications: List[Notification] = [] + high_risks = [risk for risk in risks if risk.level == RiskLevel.HIGH] + + for risk in high_risks[: self._config.high_risk_threshold]: + for recipient in self._config.recipients or [risk.owner or "未指定"]: + message = ( + f"[高风险提醒] {risk.title}\n" + f"责任人:{risk.owner or '未指定'}\n" + f"详情:{risk.description}" + ) + notifications.append( + Notification( + message=message, + target=recipient, + urgency=RiskLevel.HIGH, + related_risk=risk, + ) + ) + + return notifications diff --git a/src/project_ai/services/report_service.py b/src/project_ai/services/report_service.py new file mode 100644 index 0000000..b4b455b --- /dev/null +++ b/src/project_ai/services/report_service.py @@ -0,0 +1,73 @@ +from __future__ import annotations + +from datetime import date +from pathlib import Path +from typing import Iterable, List + +from ..models.project_state import ProjectEvent, ProjectState, Task +from ..models.risk import Risk + + +class ReportService: + """Generate a structured daily report summarising project status.""" + + def __init__(self, template_path: Path | None = None) -> None: + self._template_path = template_path + + def render_report(self, state: ProjectState, risks: Iterable[Risk]) -> str: + tasks = state.tasks + events = state.events + risk_list = list(risks) + + if self._template_path and self._template_path.exists(): + template = self._template_path.read_text(encoding="utf-8") + return template.format( + date=date.today().isoformat(), + total_tasks=len(tasks), + completed_tasks=len([t for t in tasks if t.normalized_status() == "completed"]), + risks=self._format_risks(risk_list), + highlights=self._format_events(events), + ) + + return self._default_template(tasks, events, risk_list) + + def save_report(self, content: str, output_path: Path) -> None: + output_path.parent.mkdir(parents=True, exist_ok=True) + output_path.write_text(content, encoding="utf-8") + + def _default_template(self, tasks: List[Task], events: List[ProjectEvent], risks: List[Risk]) -> str: + lines = ["=== ProjectAI 日报 ===", f"日期:{date.today().isoformat()}"] + lines.append("\n任务概览:") + lines.append(f" 总任务数:{len(tasks)}") + lines.append(f" 已完成:{len([t for t in tasks if t.normalized_status() == 'completed'])}") + lines.append(f" 进行中:{len([t for t in tasks if t.normalized_status() == 'in progress'])}") + + lines.append("\n重点风险:") + if risks: + for risk in risks: + lines.append(f"- [{risk.level.value.upper()}] {risk.title} (责任人:{risk.owner or '未指定'})") + lines.append(f" {risk.description}") + else: + lines.append("- 当前无识别到的风险") + + lines.append("\n关键事件:") + if events: + for event in events: + lines.append(f"- {event.timestamp}:{event.summary}") + else: + lines.append("- 无新的沟通记录") + + return "\n".join(lines) + + def _format_risks(self, risks: List[Risk]) -> str: + if not risks: + return "- 当前无识别到的风险" + return "\n".join( + f"- [{risk.level.value.upper()}] {risk.title} (责任人:{risk.owner or '未指定'})\n {risk.description}" # noqa: E501 + for risk in risks + ) + + def _format_events(self, events: List[ProjectEvent]) -> str: + if not events: + return "- 无新的沟通记录" + return "\n".join(f"- {event.timestamp}:{event.summary}" for event in events) diff --git a/src/project_ai/services/risk_analysis_service.py b/src/project_ai/services/risk_analysis_service.py new file mode 100644 index 0000000..334b0c8 --- /dev/null +++ b/src/project_ai/services/risk_analysis_service.py @@ -0,0 +1,60 @@ +from __future__ import annotations + +from datetime import date +from typing import Iterable, List + +from ..models.project_state import ProjectState, Task +from ..models.risk import Risk, RiskLevel + + +class RiskAnalysisService: + """Apply simple rule-based heuristics to detect project risks.""" + + def __init__(self, today: date | None = None) -> None: + self._today = today or date.today() + + def analyze(self, state: ProjectState) -> List[Risk]: + risks: List[Risk] = [] + for task in state.tasks: + risks.extend(self._evaluate_task(task)) + return risks + + def _evaluate_task(self, task: Task) -> Iterable[Risk]: + status = task.normalized_status() + + if status in {"blocked", "on hold"}: + yield Risk( + title=f"任务阻塞:{task.title}", + description=(task.blockers or "任务被标记为阻塞,需要协调解除。"), + level=RiskLevel.HIGH, + owner=task.owner, + related_task_id=task.id, + ) + + if task.due_date < self._today and status != "completed": + yield Risk( + title=f"任务延期:{task.title}", + description="任务超过截止日期仍未完成。", + level=RiskLevel.HIGH, + owner=task.owner, + related_task_id=task.id, + ) + + days_to_due = (task.due_date - self._today).days + if 0 <= days_to_due <= 2 and task.progress < 50 and status != "completed": + yield Risk( + title=f"进度滞后:{task.title}", + description="任务临近截止但进度不足 50%。", + level=RiskLevel.MEDIUM, + owner=task.owner, + related_task_id=task.id, + ) + + if task.progress < 25 and status == "not started": + yield Risk( + title=f"启动延迟:{task.title}", + description="任务标记未开始且进度极低,需确认原因。", + level=RiskLevel.LOW, + owner=task.owner, + related_task_id=task.id, + ) diff --git a/src/project_ai/templates/daily_report_template.txt b/src/project_ai/templates/daily_report_template.txt new file mode 100644 index 0000000..28b3e8f --- /dev/null +++ b/src/project_ai/templates/daily_report_template.txt @@ -0,0 +1,12 @@ +=== ProjectAI 日报 === +日期:{date} + +任务概览: +- 总任务数:{total_tasks} +- 已完成:{completed_tasks} + +重点风险: +{risks} + +关键事件: +{highlights} diff --git a/src/project_ai/utils/__init__.py b/src/project_ai/utils/__init__.py new file mode 100644 index 0000000..e69de29 From 970673b39d196b76954570a2e780030bf0544a5f Mon Sep 17 00:00:00 2001 From: WU Leizhi Date: Mon, 20 Oct 2025 09:49:43 +0800 Subject: [PATCH 2/4] design: outline Feishu API integration path --- README.md | 6 + config/sample_config.json | 17 +- docs/feishu_api_integration.md | 131 ++++++++++++++++ src/project_ai/data_sources/__init__.py | 12 ++ .../data_sources/feishu_api_client.py | 145 ++++++++++++++++++ src/project_ai/services/ingestion_service.py | 21 ++- 6 files changed, 324 insertions(+), 8 deletions(-) create mode 100644 docs/feishu_api_integration.md create mode 100644 src/project_ai/data_sources/feishu_api_client.py diff --git a/README.md b/README.md index d6a5e0a..acb5034 100644 --- a/README.md +++ b/README.md @@ -52,3 +52,9 @@ ProjectAI 是面向智能座舱项目管理团队的 AI 自动项目管控工具 ## 许可证 本示例项目用于演示 AI 自动项目管控的基础能力,可根据团队需要自由扩展与修改。 + +## 飞书 API 集成设计 + +- 当前仓库默认使用本地 JSON 示例数据(`mode=local_files`)。 +- 若需接入真实飞书接口,请参考 [docs/feishu_api_integration.md](docs/feishu_api_integration.md),其中包含鉴权、数据抓取、配置结构与迁移步骤的完整设计。 +- API 模式需要额外安装 `requests` 等网络依赖,并在配置中提供企业自建应用的凭据。 diff --git a/config/sample_config.json b/config/sample_config.json index f3768ee..7406f39 100644 --- a/config/sample_config.json +++ b/config/sample_config.json @@ -1,8 +1,15 @@ { "feishu": { - "documents": ["data/document_tasks.json"], - "sheets": ["data/sheet_tasks.json"], - "messages": ["data/messages.json"] + "documents": [ + "data/document_tasks.json" + ], + "sheets": [ + "data/sheet_tasks.json" + ], + "messages": [ + "data/messages.json" + ], + "mode": "local_files" }, "report": { "output_path": "output/daily_report.txt", @@ -10,7 +17,9 @@ }, "notifications": { "enabled": true, - "recipients": ["project.manager@feishu"], + "recipients": [ + "project.manager@feishu" + ], "high_risk_threshold": 3 } } diff --git a/docs/feishu_api_integration.md b/docs/feishu_api_integration.md new file mode 100644 index 0000000..f85de04 --- /dev/null +++ b/docs/feishu_api_integration.md @@ -0,0 +1,131 @@ +# 飞书 API 集成设计方案 + +本方案阐述如何将当前依赖本地 JSON 示例数据的 ProjectAI 项目接入真实的飞书开放平台接口,实现项目数据的自动抓取、风险分析、日报生成与通知提醒的闭环。 + +## 设计目标 + +1. **统一数据装载接口**:保留现有的 `FeishuClient` 以支持本地调试,同时新增基于飞书开放平台的 `FeishuAPIClient`,二者实现相同的 `load_project_state()` 接口,便于在配置中切换。 +2. **覆盖核心数据源**:支持从多维表格(Bitable)、文档(Docs)、群聊(IM)读取任务与事件信息。 +3. **安全合规**:按照飞书平台要求安全存储凭据,管理访问令牌,并记录关键操作日志。 +4. **可扩展性**:为后续扩展(例如甘特图、看板、更多数据源)预留结构。 + +## 系统架构调整 + +``` +┌────────────────────┐ +│ run.py / CLI │ +└─────────┬──────────┘ + │ +┌─────────▼──────────┐ +│ IngestionService │ 根据配置选择数据客户端(本地 / API) +└─────────┬──────────┘ + │ + ┌───────▼───────────────────────────────────────┐ + │ Data Sources │ + │ • FeishuClient ← 本地 JSON 示例数据 │ + │ • FeishuAPIClient ← 飞书开放平台接口 │ + └───────┬───────────────────────────────────────┘ + │ ProjectState +┌─────────▼──────────┐ +│ Risk / Report / ...│ +└────────────────────┘ +``` + +`FeishuAPIClient` 负责: + +- 统一鉴权(Tenant Access Token 与 User Access Token 管理)。 +- 调用 Docs、Bitable、IM 等接口并抽取数据。 +- 数据清洗、字段映射、增量同步与缓存控制。 + +## 关键模块设计 + +### 1. 配置结构 + +在 `config` 中新增 `mode` 与 `api` 子配置,示例: + +```json +{ + "feishu": { + "mode": "api", + "api": { + "app_id": "cli_xxx", + "app_secret": "xxx", + "tenant_key": "xxx", + "base_url": "https://open.feishu.cn/open-apis", + "document_ids": ["doccnxxxxxxxx"], + "bitable": [ + { + "app_token": "bascnxxx", + "table_id": "tblxxx", + "view_id": "vewxxx" + } + ], + "chat_ids": ["oc_xxx"], + "sync_window_days": 7 + } + } +} +``` + +### 2. 认证与令牌管理 + +- 使用**企业自建应用**凭据 (`app_id`, `app_secret`) 通过 `/auth/v3/tenant_access_token/internal` 获取租户令牌。 +- 令牌需缓存至内存或可选的 Redis/本地文件,并在过期前自动刷新(默认有效期 2 小时)。 +- 如需访问用户维度接口(例如读取私有文档),需集成 OAuth2.0 用户授权流程并持久化 `user_access_token`。 +- 在 `FeishuAPIClient` 中实现 `_get_tenant_access_token()` 与 `_ensure_token()`,统一管理请求头。 + +### 3. 数据抓取 + +| 数据类型 | API | 解析要点 | +|----------|-----|----------| +| 文档任务 | [Docs Blocks API](https://open.feishu.cn/document/server-docs/docs/docs-docs) | 通过 `/docx/v1/documents/{document_id}/blocks` 遍历段落,使用标记符(例如 `TODO` 列表、表格)提取任务字段。 | +| 多维表格 | [Bitable API](https://open.feishu.cn/document/server-docs/docs/bitable-v1/overview) | `/bitable/v1/apps/{app_token}/tables/{table_id}/records`;字段映射到 `Task`(负责人、截止日期、进度、状态等)。 | +| 群聊消息 | [IM Message API](https://open.feishu.cn/document/server-docs/im-v1/message/list) | 支持按群 ID + 时间窗口分页查询;解析文本、富文本中的关键事件,写入 `ProjectEvent`。 | + +实现步骤: + +1. 在 `FeishuAPIClient.load_project_state()` 中: + - 调用 `self._fetch_docs_tasks()`、`self._fetch_bitable_tasks()`、`self._fetch_chat_events()`。 + - 将结果合并到 `ProjectState`。 +2. 针对不同数据源,实现可插拔的解析器: + - `DocTaskParser`:识别段落、清单、表格格式。 + - `BitableTaskParser`:根据字段配置映射。 + - `ChatEventParser`:基于关键词或正则提取事件摘要。 +3. 考虑增量同步,使用 `sync_window_days` 控制查询时间范围,并记录上次同步时间戳。 + +### 4. 错误处理与重试 + +- 封装统一的 `_request()` 方法: + - 注入鉴权头、幂等 ID(`X-Request-ID`)。 + - 对 429/5xx 状态码进行指数退避重试。 + - 记录请求与响应摘要,便于审计。 +- 对解析失败的记录写入告警日志,同时继续处理其他数据,避免单条数据阻塞流程。 + +### 5. 安全与合规 + +- 所有敏感信息通过环境变量或密钥管理服务传入,避免硬编码在仓库中。 +- 如果部署在服务器,需要限制日志中出现的个人信息;对导出的任务/消息内容进行脱敏处理。 +- 结合企业内部安全规范,评估权限最小化策略(仅授予必要的 API 权限)。 + +### 6. 测试策略 + +1. **单元测试**:为 `FeishuAPIClient` 的解析函数编写 Mock 响应,确保字段映射准确。 +2. **集成测试**:使用飞书提供的沙箱或测试租户进行端到端验证,确认鉴权、分页、重试逻辑正确。 +3. **回归测试**:保留原有本地 JSON 数据模式,确保 `mode=local_files` 时行为不变。 +4. **监控验证**:对接后在生产环境添加成功率、延迟、错误码等监控指标。 + +### 7. 迁移步骤 + +1. **准备环境**:创建飞书企业自建应用,配置回调与权限,获取 `app_id`/`app_secret`。 +2. **实现 API 客户端**:按照本文设计补全 `FeishuAPIClient`,并接入解析器。 +3. **本地验证**:在测试租户中配置 `config`,运行 `python run.py run --config ...` 检查输出。 +4. **部署密钥管理**:将凭据注入运行环境,添加日志与监控。 +5. **灰度发布**:小范围启用 API 模式,观察日报与提醒准确性,再逐步覆盖所有项目。 + +### 8. 后续扩展 + +- 支持事件回调(Webhook)触发增量同步,减少轮询成本。 +- 引入缓存层(Redis)保存原始响应,支持调试回放。 +- 为 IM 消息推送实现真正的机器人 Webhook 调用,实现自动提醒发送。 + +通过以上设计,即可在保持现有代码结构的前提下,无缝切换到真实的飞书 API,实现项目数据的自动化获取与处理。 diff --git a/src/project_ai/data_sources/__init__.py b/src/project_ai/data_sources/__init__.py index e69de29..ced6638 100644 --- a/src/project_ai/data_sources/__init__.py +++ b/src/project_ai/data_sources/__init__.py @@ -0,0 +1,12 @@ +"""Data source clients for ProjectAI.""" + +from .feishu_client import FeishuClient, FeishuSourceConfig +from .feishu_api_client import BitableConfig, FeishuAPIClient, FeishuAPIConfig + +__all__ = [ + "FeishuClient", + "FeishuSourceConfig", + "FeishuAPIClient", + "FeishuAPIConfig", + "BitableConfig", +] diff --git a/src/project_ai/data_sources/feishu_api_client.py b/src/project_ai/data_sources/feishu_api_client.py new file mode 100644 index 0000000..761e6b0 --- /dev/null +++ b/src/project_ai/data_sources/feishu_api_client.py @@ -0,0 +1,145 @@ +from __future__ import annotations + +import time +from dataclasses import dataclass, field +from typing import TYPE_CHECKING, Any, Dict, List, Optional + +try: # pragma: no cover - optional dependency for API mode + import requests +except ImportError: # pragma: no cover - optional dependency + requests = None + +if TYPE_CHECKING: # pragma: no cover - typing aid + from requests import Session +else: # pragma: no cover - runtime fallback for type hints + Session = Any + +from ..models.project_state import ProjectState + + +@dataclass +class BitableConfig: + app_token: str + table_id: str + view_id: Optional[str] = None + field_mapping: Dict[str, str] = field(default_factory=dict) + + +@dataclass +class FeishuAPIConfig: + app_id: str + app_secret: str + base_url: str + tenant_key: Optional[str] = None + document_ids: List[str] = field(default_factory=list) + bitable: List[BitableConfig] = field(default_factory=list) + chat_ids: List[str] = field(default_factory=list) + sync_window_days: int = 7 + + @classmethod + def from_dict(cls, data: Dict[str, object]) -> "FeishuAPIConfig": + bitable_configs = [ + BitableConfig( + app_token=entry["app_token"], + table_id=entry["table_id"], + view_id=entry.get("view_id"), + field_mapping=entry.get("field_mapping", {}), + ) + for entry in data.get("bitable", []) + ] + return cls( + app_id=data["app_id"], + app_secret=data["app_secret"], + base_url=data.get("base_url", "https://open.feishu.cn/open-apis"), + tenant_key=data.get("tenant_key"), + document_ids=list(data.get("document_ids", [])), + bitable=bitable_configs, + chat_ids=list(data.get("chat_ids", [])), + sync_window_days=int(data.get("sync_window_days", 7)), + ) + + +class FeishuAPIClient: + """Placeholder client outlining integration with the Feishu Open Platform.""" + + def __init__(self, config: FeishuAPIConfig, session: Optional["Session"] = None) -> None: + self._config = config + if session is not None: + self._session = session + else: + if requests is None: # pragma: no cover - defensive for local mode + raise RuntimeError( + "The 'requests' package is required for Feishu API integration. Install it to enable mode=api." + ) + self._session = requests.Session() + self._tenant_token: Optional[str] = None + self._tenant_token_expire_at: float = 0.0 + + def load_project_state(self) -> ProjectState: # pragma: no cover - network integration required + raise NotImplementedError( + "FeishuAPIClient is a design stub. Implement API calls as described in docs/feishu_api_integration.md." + ) + + # --- Authentication helpers ------------------------------------------------- + + def _ensure_tenant_token(self) -> str: + if self._tenant_token and time.time() < self._tenant_token_expire_at - 60: + return self._tenant_token + if requests is None: # pragma: no cover - defensive + raise RuntimeError("requests dependency missing; cannot fetch tenant token.") + payload = { + "app_id": self._config.app_id, + "app_secret": self._config.app_secret, + } + url = f"{self._config.base_url}/auth/v3/tenant_access_token/internal" + response = self._session.post(url, json=payload, timeout=10) + response.raise_for_status() + data = response.json() + if data.get("code") != 0: + raise RuntimeError(f"Failed to obtain tenant token: {data}") + self._tenant_token = data["tenant_access_token"] + self._tenant_token_expire_at = time.time() + int(data.get("expire", 7200)) + return self._tenant_token + + def _authorized_headers(self) -> Dict[str, str]: + token = self._ensure_tenant_token() + headers = { + "Authorization": f"Bearer {token}", + "Content-Type": "application/json", + } + if self._config.tenant_key: + headers["X-Tenant-Key"] = self._config.tenant_key + return headers + + # --- Request helper --------------------------------------------------------- + + def _request( + self, + method: str, + path: str, + *, + params: Optional[Dict[str, object]] = None, + json: Optional[Dict[str, object]] = None, + ) -> Dict[str, object]: # pragma: no cover - network integration required + if requests is None: # pragma: no cover - defensive + raise RuntimeError("requests dependency missing; cannot perform API requests.") + url = f"{self._config.base_url}{path}" + for attempt in range(3): + response = self._session.request( + method, + url, + params=params, + json=json, + headers=self._authorized_headers(), + timeout=15, + ) + if response.status_code in {429, 500, 502, 503, 504} and attempt < 2: + backoff = 2 ** attempt + time.sleep(backoff) + continue + response.raise_for_status() + payload = response.json() + if payload.get("code") != 0: + raise RuntimeError(f"Feishu API error: {payload}") + return payload + raise RuntimeError(f"Feishu API request failed after retries: {method} {path}") diff --git a/src/project_ai/services/ingestion_service.py b/src/project_ai/services/ingestion_service.py index 6ceac2e..a1d4ad3 100644 --- a/src/project_ai/services/ingestion_service.py +++ b/src/project_ai/services/ingestion_service.py @@ -1,19 +1,32 @@ from __future__ import annotations -from ..data_sources.feishu_client import FeishuClient, FeishuSourceConfig +from typing import Protocol + +from ..data_sources import FeishuAPIClient, FeishuAPIConfig, FeishuClient, FeishuSourceConfig from ..models.project_state import ProjectState +class _StateLoader(Protocol): + def load_project_state(self) -> ProjectState: + ... + class IngestionService: """High level orchestrator for loading project data from Feishu.""" - def __init__(self, client: FeishuClient) -> None: + def __init__(self, client: "_StateLoader") -> None: self._client = client @classmethod def from_config(cls, config: dict) -> "IngestionService": - source = FeishuSourceConfig.from_dict(config.get("feishu", {})) - return cls(FeishuClient(source)) + feishu_config = config.get("feishu", {}) + mode = feishu_config.get("mode", "local_files") + if mode == "api": + api_config = FeishuAPIConfig.from_dict(feishu_config.get("api", {})) + client: "_StateLoader" = FeishuAPIClient(api_config) + else: + source = FeishuSourceConfig.from_dict(feishu_config) + client = FeishuClient(source) + return cls(client) def load_state(self) -> ProjectState: return self._client.load_project_state() From 6fbe1b0a0737814fbbba0f69370eae8ea3d66e13 Mon Sep 17 00:00:00 2001 From: WU Leizhi Date: Mon, 20 Oct 2025 09:49:47 +0800 Subject: [PATCH 3/4] docs: detail feishu docs api requirements --- docs/feishu_api_integration.md | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/docs/feishu_api_integration.md b/docs/feishu_api_integration.md index f85de04..ff7e8e2 100644 --- a/docs/feishu_api_integration.md +++ b/docs/feishu_api_integration.md @@ -93,6 +93,25 @@ - `ChatEventParser`:基于关键词或正则提取事件摘要。 3. 考虑增量同步,使用 `sync_window_days` 控制查询时间范围,并记录上次同步时间戳。 +#### 云文档 API 权限与端点明细 + +当前可以开放的飞书云文档接口包括: + +| 能力 | 接口 | 作用 | 所需权限 Scope | +|------|------|------|----------------| +| 获取文档基本信息 | `GET /docx/v1/documents/{document_id}` | 获取文档标题、拥有者、`revision_id`、最后更新时间等元信息,可用于判断文档是否更新、定位负责人。 | `docx:document:read` | +| 获取文档结构化内容 | `GET /docx/v1/documents/{document_id}/content` | 返回完整的块结构(段落、清单、表格等),便于按层级解析任务列表、关键节点。 | `docx:document:read` | +| 获取文档块列表(分页) | `GET /docx/v1/documents/{document_id}/blocks` | 对于大文档分批获取块列表,可结合 `page_size`/`page_token` 做增量加载,避免一次性下载过大内容。 | `docx:document:read` | +| 获取指定块详情 | `GET /docx/v1/documents/{document_id}/blocks/{block_id}` | 在需要解析表格、待办清单等复杂块时进一步展开子节点,提取任务字段。 | `docx:document:read` | +| 获取文档纯文本 | `GET /docx/v1/documents/{document_id}/raw_content` | 当结构化解析失败时的兜底方案,将全文转为纯文本再用正则/关键词识别任务。 | `docx:document:read` | + +> **实现建议** +> +> 1. `FeishuAPIClient` 在同步文档时,先调用“获取文档基本信息”保存 `revision_id`,若与上次同步一致可跳过解析。 +> 2. 默认使用 “结构化内容” 接口一次性获取文档树;当文档体量较大或需要定期增量更新时,切换至“块列表 + 指定块详情”组合,基于 `page_token` 做分页。 +> 3. 解析过程中建议保留块的 `block_type` 与 `text`/`elements` 信息,方便映射到 `Task` 字段(如负责人、截止日期、状态标签)。 +> 4. 如果需要读取协作者或评论,需额外开通 `docx:comment:read` 等权限,此处按当前可开放的接口规划。 + ### 4. 错误处理与重试 - 封装统一的 `_request()` 方法: From c5ba46ca5a4b27fbac75e5746268150ca8104ab2 Mon Sep 17 00:00:00 2001 From: WU Leizhi Date: Mon, 20 Oct 2025 11:00:11 +0800 Subject: [PATCH 4/4] feat: enable feishu docs ingestion via api --- README.md | 40 +++ docs/feishu_api_integration.md | 2 +- .../data_sources/feishu_api_client.py | 263 +++++++++++++++++- 3 files changed, 299 insertions(+), 6 deletions(-) diff --git a/README.md b/README.md index acb5034..97426cd 100644 --- a/README.md +++ b/README.md @@ -42,6 +42,46 @@ ProjectAI 是面向智能座舱项目管理团队的 AI 自动项目管控工具 - `report.template_path`:日报模板,可根据团队格式自定义。 - `notifications`:通知开关、接收人列表与高风险提醒阈值。 +## 接入真实飞书云文档 + +要使用企业自建应用(例如用户提供的 `APP ID: cli_a87f247f8b33d00c`、`APP Secret: HCSvCW70fv9hh6M9VkxTMdOhMxeE8gQn`、`Tenant: t-g104aka8DBCSH47JL6L6XSSUP7Y7N3WJZIAXNMHE`)访问真实的飞书云文档内容,请按照以下步骤操作: + +1. 安装网络依赖: + + ```bash + pip install requests + ``` + +2. 在配置中启用 API 模式,例如复制一份 `config/sample_config.json`,并将 `feishu.mode` 设置为 `"api"`,同时在 `feishu.api` 中填写凭据与需要同步的文档 ID: + + ```json + { + "feishu": { + "mode": "api", + "api": { + "app_id": "cli_a87f247f8b33d00c", + "app_secret": "HCSvCW70fv9hh6M9VkxTMdOhMxeE8gQn", + "tenant_key": "t-g104aka8DBCSH47JL6L6XSSUP7Y7N3WJZIAXNMHE", + "document_ids": ["doccnXXXXXXXXXXXXXX"] + } + } + } + ``` + + > 为保障安全,生产环境推荐通过环境变量或密钥服务注入上述敏感配置。 + +3. 执行命令: + + ```bash + python run.py run --config config/your_api_config.json + ``` + + `FeishuAPIClient` 会自动调用云文档的 `raw_content` 与 `content` 接口,解析其中的待办(`- [ ]` 或 `TODO`)条目并生成任务,同时同步文档元数据更新时间。 + +4. 如需同步多维表格、群聊消息,可在 `feishu.api.bitable`、`feishu.api.chat_ids` 中继续填充配置,客户端会自动调用相关接口。 + +有关鉴权流程、字段映射与分页策略的更多说明,请参阅 [docs/feishu_api_integration.md](docs/feishu_api_integration.md)。 + ## 后续扩展建议 - 接入真实的 Feishu API,实现定时同步项目数据。 diff --git a/docs/feishu_api_integration.md b/docs/feishu_api_integration.md index ff7e8e2..1bfdd9b 100644 --- a/docs/feishu_api_integration.md +++ b/docs/feishu_api_integration.md @@ -136,7 +136,7 @@ ### 7. 迁移步骤 1. **准备环境**:创建飞书企业自建应用,配置回调与权限,获取 `app_id`/`app_secret`。 -2. **实现 API 客户端**:按照本文设计补全 `FeishuAPIClient`,并接入解析器。 +2. **实现 API 客户端**:按照本文设计补全 `FeishuAPIClient`,并接入解析器(当前实现已对接云文档 `raw_content` 与 `content` 接口,可直接解析待办事项为任务数据)。 3. **本地验证**:在测试租户中配置 `config`,运行 `python run.py run --config ...` 检查输出。 4. **部署密钥管理**:将凭据注入运行环境,添加日志与监控。 5. **灰度发布**:小范围启用 API 模式,观察日报与提醒准确性,再逐步覆盖所有项目。 diff --git a/src/project_ai/data_sources/feishu_api_client.py b/src/project_ai/data_sources/feishu_api_client.py index 761e6b0..b3020ef 100644 --- a/src/project_ai/data_sources/feishu_api_client.py +++ b/src/project_ai/data_sources/feishu_api_client.py @@ -1,7 +1,10 @@ from __future__ import annotations +import json +import re import time from dataclasses import dataclass, field +from datetime import date, datetime from typing import TYPE_CHECKING, Any, Dict, List, Optional try: # pragma: no cover - optional dependency for API mode @@ -14,7 +17,7 @@ else: # pragma: no cover - runtime fallback for type hints Session = Any -from ..models.project_state import ProjectState +from ..models.project_state import ProjectEvent, ProjectState, Task @dataclass @@ -60,7 +63,17 @@ def from_dict(cls, data: Dict[str, object]) -> "FeishuAPIConfig": class FeishuAPIClient: - """Placeholder client outlining integration with the Feishu Open Platform.""" + """Client that retrieves project data from the Feishu Open Platform APIs.""" + + TODO_PATTERN = re.compile(r"^(?:[-*]\s*)?\[(?P[ xX])\]", re.IGNORECASE) + OWNER_PATTERN = re.compile(r"@(?P[A-Za-z0-9_\-\.\u4e00-\u9fa5]+)") + OWNER_FALLBACK_PATTERN = re.compile(r"(?:owner|负责人)[::]\s*(?P[^|]+)", re.IGNORECASE) + PROGRESS_PATTERN = re.compile(r"(?:progress|进度)[::]\s*(?P\d{1,3})%?", re.IGNORECASE) + BLOCKER_PATTERN = re.compile(r"(?:blocker|阻塞|risk)[::]\s*(?P[^|]+)", re.IGNORECASE) + DATE_PATTERNS = ( + re.compile(r"(?:due|截止|到期)[::]\s*(?P\d{4}[-/]\d{1,2}[-/]\d{1,2})", re.IGNORECASE), + re.compile(r"(?P\d{4}[-/]\d{1,2}[-/]\d{1,2})"), + ) def __init__(self, config: FeishuAPIConfig, session: Optional["Session"] = None) -> None: self._config = config @@ -76,9 +89,17 @@ def __init__(self, config: FeishuAPIConfig, session: Optional["Session"] = None) self._tenant_token_expire_at: float = 0.0 def load_project_state(self) -> ProjectState: # pragma: no cover - network integration required - raise NotImplementedError( - "FeishuAPIClient is a design stub. Implement API calls as described in docs/feishu_api_integration.md." - ) + state = ProjectState() + for document_id in self._config.document_ids: + for task in self._fetch_document_tasks(document_id): + state.add_task(task) + for event in self._fetch_document_events(document_id): + state.add_event(event) + for task in self._fetch_bitable_tasks(): + state.add_task(task) + for event in self._fetch_chat_events(): + state.add_event(event) + return state # --- Authentication helpers ------------------------------------------------- @@ -143,3 +164,235 @@ def _request( raise RuntimeError(f"Feishu API error: {payload}") return payload raise RuntimeError(f"Feishu API request failed after retries: {method} {path}") + + # --- Document ingestion ---------------------------------------------------- + + def _fetch_document_tasks(self, document_id: str) -> List[Task]: # pragma: no cover - network integration required + raw_text = self._fetch_document_raw_text(document_id) + if not raw_text: + content = self._fetch_document_structured_content(document_id) + raw_text = self._flatten_content_to_text(content) + tasks: List[Task] = [] + for index, line in enumerate(raw_text.splitlines(), start=1): + parsed = self._parse_task_from_line(document_id, line, index) + if parsed is not None: + tasks.append(parsed) + return tasks + + def _fetch_document_events(self, document_id: str) -> List[ProjectEvent]: # pragma: no cover - network integration required + payload = self._request("GET", f"/docx/v1/documents/{document_id}") + document = payload.get("data", {}).get("document", {}) + updated_at = document.get("updated_time", "") + title = document.get("title", "") + if not updated_at and not title: + return [] + summary = f"文档《{title}》已更新" + return [ + ProjectEvent( + timestamp=updated_at, + summary=summary, + source=f"feishu:doc:{document_id}", + ) + ] + + def _fetch_bitable_tasks(self) -> List[Task]: # pragma: no cover - network integration required + tasks: List[Task] = [] + for config in self._config.bitable: + records = self._request( + "GET", + f"/bitable/v1/apps/{config.app_token}/tables/{config.table_id}/records", + params={"view_id": config.view_id} if config.view_id else None, + ) + for record in records.get("data", {}).get("items", []): + task = self._parse_bitable_record(config, record) + if task is not None: + tasks.append(task) + return tasks + + def _fetch_chat_events(self) -> List[ProjectEvent]: # pragma: no cover - network integration required + return [] + + def _fetch_document_raw_text(self, document_id: str) -> str: # pragma: no cover - network integration required + try: + payload = self._request("GET", f"/docx/v1/documents/{document_id}/raw_content") + except Exception: # pragma: no cover - defensive fallback + return "" + return payload.get("data", {}).get("content", "") or "" + + def _fetch_document_structured_content(self, document_id: str) -> Dict[str, Any]: # pragma: no cover - network integration required + payload = self._request("GET", f"/docx/v1/documents/{document_id}/content") + content = payload.get("data", {}).get("content", {}) + if isinstance(content, str): + try: + return json.loads(content) + except json.JSONDecodeError: # pragma: no cover - defensive + return {} + return content + + def _flatten_content_to_text(self, content: Dict[str, Any]) -> str: + blocks = content.get("blocks", []) if isinstance(content, dict) else [] + lines: List[str] = [] + for block in blocks: + text = self._extract_text_from_block(block) + if text: + lines.append(text) + return "\n".join(lines) + + def _extract_text_from_block(self, block: Dict[str, Any]) -> str: + block_type = block.get("block_type") or block.get("type") + if block_type == "todo": + return self._extract_text_from_elements(block.get("elements", [])) + if block_type in {"paragraph", "heading1", "heading2", "heading3", "heading4", "heading5", "heading6"}: + return self._extract_text_from_elements(block.get("elements", [])) + if block_type in {"bullet_list", "ordered_list"}: + texts = [self._extract_text_from_block(child) for child in block.get("children", [])] + return "\n".join(filter(None, texts)) + if block_type == "table": + rows = [] + for row in block.get("table", {}).get("rows", []): + cells = [self._extract_text_from_elements(cell.get("elements", [])) for cell in row.get("cells", [])] + rows.append(" | ".join(filter(None, cells))) + return "\n".join(filter(None, rows)) + children = block.get("children", []) + if isinstance(children, list): + texts = [self._extract_text_from_block(child) for child in children] + return "\n".join(filter(None, texts)) + return "" + + def _extract_text_from_elements(self, elements: List[Dict[str, Any]]) -> str: + pieces: List[str] = [] + for element in elements or []: + text_run = element.get("text_run") or element.get("textRun") + if text_run and "content" in text_run: + pieces.append(str(text_run.get("content", ""))) + return "".join(pieces) + + def _parse_task_from_line(self, document_id: str, line: str, index: int) -> Optional[Task]: + stripped = line.strip() + if not stripped: + return None + todo_match = self.TODO_PATTERN.search(stripped) + if not todo_match and not stripped.lower().startswith("todo"): + return None + status = "completed" if todo_match and todo_match.group("mark").lower() == "x" else "in_progress" + normalized = stripped + if todo_match: + normalized = stripped[todo_match.end() :].strip() + elif normalized.lower().startswith("todo"): + normalized = normalized[4:].lstrip(":: ") + + segments = [segment.strip() for segment in re.split(r"\s*\|\s*", normalized) if segment.strip()] + title = segments[0] if segments else normalized + metadata = segments[1:] if len(segments) > 1 else [] + owner = self._extract_owner(stripped, metadata) + due = self._extract_due_date(stripped) + progress = self._extract_progress(stripped, status) + blockers = self._extract_blockers(metadata) + description = " | ".join(metadata) if metadata else "" + return Task( + id=f"{document_id}-{index}", + title=title or f"未命名任务 {index}", + owner=owner, + due_date=due, + status=status, + progress=progress, + source=f"feishu:doc:{document_id}", + description=description, + blockers=blockers, + ) + + def _extract_owner(self, line: str, metadata: List[str]) -> str: + owner_match = self.OWNER_PATTERN.search(line) + if owner_match: + return owner_match.group("owner").strip() + for segment in metadata: + match = self.OWNER_FALLBACK_PATTERN.search(segment) + if match: + return match.group("owner").strip() + return "unknown" + + def _extract_due_date(self, line: str) -> date: + for pattern in self.DATE_PATTERNS: + match = pattern.search(line) + if not match: + continue + value = match.group("date") + parsed = self._parse_date(value) + if parsed is not None: + return parsed + return date.today() + + def _parse_date(self, value: str) -> Optional[date]: + normalized = value.replace("/", "-") + try: + parsed = datetime.strptime(normalized, "%Y-%m-%d") + except ValueError: + return None + return parsed.date() + + def _extract_progress(self, line: str, status: str) -> int: + match = self.PROGRESS_PATTERN.search(line) + if match: + progress = int(match.group("progress")) + return max(0, min(progress, 100)) + if status == "completed": + return 100 + return 0 + + def _extract_blockers(self, metadata: List[str]) -> Optional[str]: + for segment in metadata: + match = self.BLOCKER_PATTERN.search(segment) + if match: + return match.group("blocker").strip() + return None + + # --- Bitable parsing ------------------------------------------------------- + + def _parse_bitable_record(self, config: BitableConfig, record: Dict[str, Any]) -> Optional[Task]: + fields = record.get("fields", {}) + if not fields: + return None + mapping = config.field_mapping or {} + title_field = mapping.get("title", "title") + owner_field = mapping.get("owner", "owner") + due_field = mapping.get("due_date", "due_date") + status_field = mapping.get("status", "status") + progress_field = mapping.get("progress", "progress") + description_field = mapping.get("description", "description") + blockers_field = mapping.get("blockers", "blockers") + title = self._first_text(fields.get(title_field)) + if not title: + return None + owner = self._first_text(fields.get(owner_field)) or "unknown" + due_value = self._first_text(fields.get(due_field)) + due = self._parse_date(due_value) if due_value else None + if due is None: + due = date.today() + status = self._first_text(fields.get(status_field)) or "in_progress" + progress_value = self._first_text(fields.get(progress_field)) + progress = int(progress_value) if progress_value and progress_value.isdigit() else 0 + description = self._first_text(fields.get(description_field)) or "" + blockers = self._first_text(fields.get(blockers_field)) or None + record_id = record.get("record_id") or record.get("id") or "unknown" + return Task( + id=str(record_id), + title=title, + owner=owner, + due_date=due, + status=status, + progress=max(0, min(progress, 100)), + source=f"feishu:bitable:{config.app_token}:{config.table_id}", + description=description, + blockers=blockers, + ) + + def _first_text(self, value: Any) -> str: + if value is None: + return "" + if isinstance(value, list) and value: + if isinstance(value[0], dict) and "text" in value[0]: + return str(value[0].get("text", "")) + return str(value[0]) + if isinstance(value, dict) and "text" in value: + return str(value.get("text", "")) + return str(value)