English | 简体中文
高性能 DAG 流水线引擎。Python 声明,Go/Java/C++ 三引擎执行,JSON 解耦。
算子只需声明输入/输出字段,引擎自动推导依赖、构建 DAG、并行调度——你专注业务逻辑,Pineapple 负责把它跑快。
适用于任何需要多步骤数据处理流水线的场景:搜索/推荐/广告排序、特征工程、实时数据加工、规则引擎、ML 推理前后处理,等等。
⚠️ Pre-1.0 阶段:API 和行为语义可能在版本间发生不兼容变更。生产环境使用请锁定具体版本。
Python DSL (Apple) ──compile──> JSON Config
│
┌───────────┼───────────┐
v v v
Pine-Go (Go) Pine-Java Pine-C++
构建 DAG 构建 DAG 构建 DAG
并行执行 并行执行 per-node 并行
| 组件 | 语言 | 职责 |
|---|---|---|
| Apple | Python | 声明式 DSL,编译输出 JSON 配置 |
| Pine-Go | Go | 主执行引擎:解析配置、构建 DAG、并行调度 |
| Pine-Java | Java | 第二执行引擎,与 Pine-Go 行为一致 |
| Pine-C++ | C++23 | 第三执行引擎(标杆运行时),完全 parity + 性能上限探索 |
工程团队用 Go/Java/C++ 开发高性能算子;业务团队用 Python DSL 编排逻辑。两侧通过 JSON 配置彻底解耦。
曾经存在的 Pine-Python 运行时引擎已于 v0.9.7 后移除。仓库中的 Python 代码仅为 Apple DSL 声明层(编译器),不是运行时。
- 隐式构图 — 算子声明输入/输出字段,引擎自动推导 DAG 依赖并执行传递性归约
- 无锁并行 — DAG 中无依赖的算子自动并行执行
- 编译期校验 — 死代码、字段缺失、写后未读等问题在部署前拦截
- Lua 嵌入 — 内置 Lua 算子支持轻量自定义计算。pine-go 默认 wangshu(纯 Go Lua 5.1 VM,NaN-boxing + arena GC),可通过
-tags=lua_gopher切回 gopher-lua;pine-java 用 LuaJC(字节码编译),pine-cpp 用 LuaJIT。端到端开销约 1.2-2x;隔离算子级开销随运行时与计算复杂度变化(C++/LuaJIT 约 3-5x、Java 约 2-9x、Go 约 6-17x),计算密集型热路径建议写原生算子 - 配置热加载 — 服务运行时自动无停机重载引擎配置
- 动态资源 — 双通道资源管理:数据型(如静态 dict / 实时 feature store,snapshot 导出后无锁读)+ 句柄型(如
redis_connection,borrow 借用 + RAII 拆除);后台定时刷新 - Redis cascade-safety —
redis_connection资源暴露{dial,read,write,pool}_timeout_ms+pool_size五参数,per-command 指标pine_redis_command_*(4-state status:ok / timeout / pool_timeout / error),fail-on-error 静默降级契约 - 白盒可观测 — 算子级 trace;
/stats组合响应含/stats.http(请求级 4-state 指标)+/stats.resources(资源池连接池/探针/per-command 4 状态分类);可插拔 Prometheus 接口 - 行存/列存可切换 — DataFrame 支持两种存储模式
- 三引擎一致性 — Go/Java/C++ 引擎通过 CI 交叉验证保证 schema、DAG、执行结果、错误消息字节级一致(19 section cross-validate + 三引擎差分 fuzz + 每日 ASan/TSan sanitized fuzz)
- Pine-C++ 标杆运行时 — 完整第三运行时,内置算子与 Go/Java 完全对等、HTTP server(热加载/graceful shutdown)、ColumnFrame/RowFrame 双物理实现、OperatorInput lazy 投影、LuaJIT 集成、metrics/resource 对等
- Go 1.26+(Pine-Go)
- Java 25+(Pine-Java)
- Python 3.11+(Apple DSL)
- CMake 3.20+ / C++23 编译器 / LuaJIT(Pine-C++,可选)
from apple.flow import Flow
flow = Flow(
name="demo",
common_input=["user_age"],
item_output=["item_id", "item_final_price"],
)
flow.recall_static(
item_output=["item_id", "item_price"],
items=[
{"item_id": "a", "item_price": 100.0},
{"item_id": "b", "item_price": 200.0},
],
)
flow.transform_by_lua(
common_input=["user_age"],
item_input=["item_price"],
item_output=["item_final_price"],
lua_script="""
function discount()
if user_age < 18 then return item_price * 0.8
else return item_price end
end
""",
function_for_item="discount",
)
flow.reorder_sort(
item_input=["item_final_price"],
field="item_final_price",
order="desc",
)
with open("pipeline.json", "w") as f:
f.write(flow.compile())go run ./pine-go/cmd/pineapple-server -config pipeline.json -addr :8080curl -s -X POST http://localhost:8080/execute \
-H "Content-Type: application/json" \
-d '{"common": {"user_age": 16}, "items": []}' | python3 -m json.tool修改 Python 后重新编译,服务自动热加载,无需重启。
pineapple/
├── apple/ # Python DSL (Apple)
│ ├── flow.py # Flow/SubFlow 声明
│ ├── compiler.py # 编译器:DSL → JSON
│ ├── validator.py # 静态校验器
│ └── tests/ # Python 测试
├── apple_generated/ # codegen 自动生成的 Python 绑定
├── pine-go/ # Go 执行引擎 (Pine-Go)
│ ├── cmd/ # CLI 工具
│ │ ├── pineapple-server/ # HTTP 服务
│ │ ├── pineapple-codegen/ # 代码 & 文档生成
│ │ ├── pineapple-dag/ # DAG 渲染
│ │ └── pineapple-run/ # 单次执行
│ ├── internal/ # 内部包(config/dag/dataframe/runtime)
│ ├── operators/ # 内置算子
│ ├── pkg/ # 可复用库(server/codegen/metrics/resource)
│ ├── integration/ # 集成测试
│ └── benchmarks/ # 性能基准测试
├── pine-java/ # Java 执行引擎 (Pine-Java)
│ ├── src/main/java/ # 引擎实现 + CLI 工具
│ └── src/test/java/ # 测试 + 基准 + fuzz
├── pine-cpp/ # C++ 执行引擎 (Pine-C++)
│ ├── include/pine/ # 公共头文件
│ ├── src/ # config/dag/dataframe/runtime/server/lua/redis/http/resource
│ ├── operators/ # 内置算子(与 Go/Java 对等)+ bench stubs(编译开关 PINE_BUILD_BENCH_STUBS)
│ ├── cmd/ # pineapple-run / pineapple-render-dag / pineapple-server / pineapple-codegen / pineapple-cause-chain-probe
│ └── tests/ # doctest 单测套件
├── fixtures/ # 共享测试 fixtures(三引擎公用)
│ ├── operators/ # 算子级单元 fixtures
│ ├── pipelines/ # Pipeline 级端到端 fixtures
│ ├── errors/ # 错误路径 fixtures
│ ├── error_chain/ # ExecutionError 因果链 fixtures
│ ├── server_byte_exact/ # server 响应字节级一致 fixtures
│ └── benchmarks/ # benchmark 配置/请求(含 calibrated 生产 proxy)
├── scripts/ # 开发者脚本
├── design_doc/ # 设计文档
└── doc/ # 生成的算子文档 & 报告
pine-cpp/ 是完整的第三运行时(C++23),定位为在完全 parity 前提下的标杆实现。
当前能力:
- 内置算子与 Go/Java 完全对等(清单见
pine-cpp/CMakeLists.txt与doc/operators/) - HTTP server:
pineapple-server,含热加载、graceful shutdown、HTTP/1.1 keep-alive、客户端断连取消、/health//execute//stats//dag端点 - CLI:
pineapple-run、pineapple-render-dag、pineapple-codegen、pineapple-cause-chain-probe - Frame 多态:ColumnFrame(列存)+ RowFrame(行存),OperatorInput lazy 投影;锁形态(per-call
shared_mutex)与 Go/Java 完全镜像 - LuaJIT:StatePool、沙箱隔离、
_G["..."]变量注入 - 可观测:
metrics::Provider、resource::Manager、/stats.http、/stats.resources、cause chain - CI:cpp-build、cpp-test(doctest)、cpp-sanitizer(ASan/UBSan)、cpp-tsan(ThreadSanitizer)、cpp-lint(-Werror)
- Cross-validate:全 section 接入,三引擎一致性验证
跨四语言的 fmt / lint / test / bench / codegen / 版本管理统一通过顶层 Makefile + pine-go/Makefile 暴露,CI 与本地共用同一命令序列。常用 verb:
| Make 目标 | 用途 |
|---|---|
make fmt |
四语言格式化(gofmt / google-java-format / clang-format / ruff) |
make lint |
四语言 lint(含 checkstyle failOnViolation=true、-Werror) |
make test |
全引擎测试 |
make bench |
默认 pine_bench tag |
make bench-cross-runtime |
跨引擎 fixture 驱动 benchmark(cgroup 隔离) |
make bench-lua-backends |
wangshu vs gopher-lua 同机串行连跑 + benchstat |
make differential-fuzz |
三引擎差分 fuzz |
make cross-validate |
跨引擎一致性验证 |
make codegen |
从 pine-go Registry 生成 apple_generated/ + doc/operators/ |
make codegen-check |
CI 用:codegen 后 git diff --exit-code,确保产物新鲜 |
make check-pr-ci |
watch 当前分支 PR 的 CI 状态(pre-push hook 也会自动调用) |
scripts/ 下的脚本是 Make 目标的具体实现,可单独调用:
| 脚本 | 用途 |
|---|---|
scripts/go-test.sh |
Go 全量测试 |
scripts/java-test.sh |
Java 全量测试 |
scripts/test-all.sh |
Go + Apple(Python) + Java 全量测试 |
scripts/lint.sh |
Go + Java + Python lint |
scripts/go-bench.sh |
Go 性能基准 |
scripts/java-bench.sh |
Java 性能基准 |
scripts/bench-cross-runtime.sh |
跨引擎 HTTP server benchmark(fixture 驱动,cgroup 资源隔离) |
scripts/bench-lua-backends.sh |
wangshu vs gopher-lua 后端对比(benchstat delta) |
scripts/go-fuzz.sh |
Go fuzz 测试 |
scripts/java-fuzz.sh |
Java fuzz 测试 |
scripts/differential-fuzz.sh |
三引擎差异模糊测试(随机生成 pipeline 比对输出) |
scripts/cross-validate.sh |
三引擎交叉验证(schema + DAG + 执行 + 错误 + server + metrics 等) |
scripts/cpp-sanitizer-smoke.sh |
C++ ASan/UBSan 冒烟 |
scripts/cpp-tsan-smoke.sh |
C++ ThreadSanitizer 高并发压测 |
scripts/codegen.sh |
代码生成(--backend go|java) |
scripts/render-dag.sh |
DAG 可视化(--backend go|java) |
scripts/apple-compile.sh |
Apple DSL 编译为 JSON |
scripts/run-pipeline.sh |
单次执行 pipeline |
scripts/bump-version.sh |
版本号同步更新(含 pine-cpp kVersion) |
scripts/check-pr-ci.sh |
watch 当前分支 PR 的 CI 状态(pre-push hook 自动调用) |
仓库内置 .githooks/ 用 git config core.hooksPath .githooks 挂载即生效(首次 clone 后建议配一次):
pre-commit— staged-only 格式 gate(gofmt / clang-format / ruff),不动未 staged 改动pre-push— 工程级 lint(四语言 fail-on-violation)+ 自包装 CI watch(push 完成后自动起check-pr-ci.sh等终态)+ 自动--set-upstream接力(首次 push 新分支无需手动-u)
CI 在每次 push/PR 时自动运行:
- Lint — Go (golangci-lint)、Java (checkstyle, failOnViolation=true)、Python (ruff)、C++ (clang-format -Werror)
- Test — Go/Java/Apple/C++ 全量测试 + 覆盖率
- Sanitizer — C++ ASan/UBSan 冒烟 + ThreadSanitizer 高并发压测
- Fuzz — Go/Java fuzz + 三引擎差异模糊测试
- Daily sanitized fuzz — 每日(北京时间 12:00)跑 ASan/TSan 加持的差分 fuzz 3000+2000 轮,专门面向 race / memory bug 的 deep-diagnostic(独立于每次 push 的 fast 路径)
- Benchmark — Go/Java 性能基准
- Cross-validation — 三引擎 schema/DAG/执行/错误/server/metrics 一致性
- Codegen check — 确保生成代码与源码同步
scripts/cross-validate.sh 验证三引擎的一致性,当前 19 个 section(具体以 scripts/cross-validate/ 目录为准):
- Schema parity — 三端 codegen 导出的算子 schema 与 apple_generated 产物字节级一致
- DAG parity — 相同配置,三端渲染的 DAG(DOT + Mermaid,含 collapse)必须一致
- Execution parity — 相同配置 + 请求,三端执行结果必须一致
- Column-store parity — 以列存模式重复执行验证
- Error parity — 非法配置/请求,三端返回相同的错误分类和消息
- Server parity — HTTP 端点的 status code、body 结构、Content-Type 一致
- Cancellation parity — 超时、运行时错误与客户端断连的取消行为一致
- Concurrent parity — 并发请求下的行为与计数一致
- Raw-byte parity — 不归一化的原始字节输出比对(key 顺序)
- Hot-reload parity — 配置与
resource_config热加载行为一致 - Redis integration — redis 算子在三端的真实 redis 行为一致
- Extensibility parity — 下游扩展模式(middleware、未注册路径等负空间)一致
- Metrics parity —
/stats结构和数值一致(含 lua_pool 计数器、data_parallel 并发不变量) - Byte-exact execute — server
/execute响应字节级一致 - Error cause chain — ExecutionError 因果链可解包一致
- Resource metrics —
/stats.resources子树结构与三态(无流量/有量/不可达)一致 - Templated params —
{{field}}模板参数解析一致 - SubFlow contract stderr — Apple 编译期 SubFlow 契约报错文案稳定
- Bench-stub parity — bench 构建下
reorder_topn_boost字节级一致
如果你在 Go 和 Java 中同时实现了自定义算子并需要保证跨语言一致性,可以复用 Pineapple 的 parity 校验框架。
- Fixture 驱动 — 所有验证基于共享 JSON fixture 文件,而非各语言硬编码 expected 值
- CLI 接口统一 — 每个引擎提供相同的 CLI 工具(
-config、-request),输出 JSON 结果 - JSON 归一化比对 — 通过
sort_keys+ 数值类型统一消除平台差异(Go map 无序、float64/Double 表示差异) - 增量友好 — 新引擎只需实现 CLI 接口即可纳入验证
算子级 fixture(单算子行为验证):
{
"operator": "your_operator_name",
"cases": [
{
"name": "描述性测试名",
"params": { "param1": "value" },
"metadata": {
"common_input": [], "common_output": [],
"item_input": ["field"], "item_output": ["result"]
},
"input": { "common": {}, "items": [{"field": 1}] },
"expected": { "items": [{"result": 2}] }
}
]
}Pipeline 级 fixture(端到端执行验证):
{
"name": "fixture 描述",
"config": { "pipeline_config": {...}, "pipeline_group": {...}, "flow_contract": {...} },
"cases": [
{
"name": "case 描述",
"request": { "common": {...}, "items": [...] },
"expected": { "common": {...}, "items": [...] }
}
]
}错误路径 fixture:
{
"name": "error 描述",
"config": { ... },
"expected_error": { "type": "ConfigError", "message_contains": "关键词" }
}比对两端输出时,必须消除以下平台固有差异:
def normalize_json(text):
"""Go map 顺序不确定,数值类型表示不同"""
import json
obj = json.loads(text)
# 递归将所有 int 统一为 float(消除 Go int vs Java Double)
def unify(v):
if isinstance(v, int): return float(v)
if isinstance(v, list): return [unify(x) for x in v]
if isinstance(v, dict): return {k: unify(x) for k, x in v.items()}
return v
return json.dumps(unify(obj), sort_keys=True)- 在两侧各实现算子,保证参数名和
$metadata声明一致 - 创建 fixture 文件,放入共享目录
- 编写验证脚本:分别调用两端 CLI,归一化输出后逐字节比对
- 纳入 CI:失败即阻断合并
参考 scripts/cross-validate.sh 的完整实现了解实战细节。
跨引擎性能对比(HTTP server 模式,scripts/bench-cross-runtime.sh,10000 请求 × 16 并发,server 以 2C/4G cgroup 隔离,2026-06-25 / v0.10.9 复测)。realistic_*_calibrated* 系列为按真实流量校准的生产 proxy fixture,其余为合成压测。
| Fixture | Go | Java | C++ |
|---|---|---|---|
| small_010 (10 items) | 36298 | 6318 | 20756 |
| small_050 (50 items) | 27270 | 5336 | 17227 |
| small_100 (100 items) | 19658 | 4607 | 13812 |
| medium_0100 (100 items) | 12514 | 3589 | 8542 |
| medium_0500 (500 items) | 3026 | 1965 | 2941 |
| medium_1000 (1000 items) | 1513 | 1295 | 1656 |
| large_0100 (100 items) | 7243 | 3064 | 5120 |
| large_0500 (500 items) | 1684 | 1508 | 1773 |
| large_1000 (1000 items) | 825 | 966 | 951 |
| large_5000 (5000 items) | 155 | 213 | 175 |
| realistic_for_you | 483 | 303 | 349 |
| realistic_for_you_latency | 250 | 141 | 212 |
| realistic_for_you_calibrated (生产校准) | 121 | 127 | 237 |
| realistic_for_you_calibrated_2c4g | 121 | 124 | 224 |
| realistic_for_you_calibrated_itemlua | 127 | 126 | 233 |
| Fixture | Go | Java | C++ |
|---|---|---|---|
| small_010 | 0.4 | 1.5 | 0.6 |
| medium_0500 | 4.9 | 6.8 | 5.3 |
| large_1000 | 18.2 | 14.3 | 15.3 |
| large_5000 | 94.3 | 68.6 | 83.4 |
| realistic_for_you_calibrated | 122.3 | 117.7 | 60.8 |
| realistic_for_you_calibrated_itemlua | 117.1 | 119.5 | 61.5 |
要点:
- 生产校准场景下 C++ 领先约 1.9x(calibrated QPS 237 vs 121/127;P50 60ms vs 117/122ms),这是"标杆运行时"定位的体现
- 合成 small/medium 场景 Go 吞吐最高(轻量请求路径开销最低);大行数场景(large_1000+)Java 的 JIT 热循环优化反超
- itemlua(3000 调用/请求的 boundary-dominated 形状)与 calibrated 在三引擎都统计持平,符合"per-item 边界主导 + 端到端稀释"的校准事实(详见
llmdoc/memory/decisions/perf-evolution-roadmap.md) - 各引擎数字会随版本演进,复现方式:
make bench-cross-runtime或scripts/bench-cross-runtime.sh --requests 10000 --concurrency 16,报告落在bench-results/
| 类别 | 链接 |
|---|---|
| 设计文档 | design_doc/ — 架构、数据模型、算子注册、可观测性等 |
| 算子参考 | doc/operators/ — 所有内置算子详细说明 |
| Pipeline 编写 | doc/guide_pipeline.md — Apple DSL 使用指南 |
| 算子开发 | doc/guide_operator.md — Go 算子开发指南 |
| 第三方扩展 | design_doc/12_distribution.md — 不修改源码添加自定义算子 |
| API 参考 | doc/api.md — HTTP 接口说明 |
| LLM 检索文档 | llmdoc/ — 面向 AI 协作的稳定知识地图(架构 / 决策 / 反思 / 索引) |