feat: VSJoin mechanisms, Owner-Computes dedup, StorageManager shard refactoring#136
Conversation
Mechanism I (Bounded-Staleness R/W Decoupling): - Add VSJoinSnapshotFilterPolicy enum and max_staleness_ms config - Track rebuild duration, timestamp, count in globalIndexRebuildLoop - Staleness-aware record filtering in snapshot rebuild Mechanism II (Budgeted Boundary Coverage Routing): - Add VSJoinRouteMode enum (UNICAST/BUDGETED/BROADCAST) - Add vsjoin_fanout_budget config knob - Implement deterministic top-k partition selection in BUDGETED mode - BROADCAST mode routes to all logical partitions Mechanism III (Predictable Skew Control Plane): - Add vsjoin_rebalance_cooldown_ms config with enforcement - Add vsjoin_use_smoothed_load config (EWMA latency vs delta records) - Add getSmoothedLoad() to VSJoinLoadMonitor - Verify RCU-style atomic publish in PartitionAssignment All existing tests pass. Analysis doc added.
…x Knn index isolation Key changes: 1. VSJoinMethod::ExecuteEager now collects candidates directly from ConcurrencyManager::query_for_join as shared_ptr<VectorRecord>, eliminating the costly resolveUidsToRecords that scanned all WindowState partitions per query. 2. Knn (BruteForce) index now tracks inserted UIDs and filters query_for_join results to only return records belonging to this index instance. This prevents cross-stream contamination where a right-stream Local Index would return left-stream records via the shared StorageManager. Performance impact (size=500, para=1): - candidate_fetch: 9713ms -> 2934ms (3.3x improvement) - Precision: maintained at 1.0000 - Recall: maintained at 1.0000 for para=1,2 Known limitation: - Para>=4 recall drops to ~0.80-0.85 due to LSH partition splitting similar vectors across partitions. Global Index rebuild compensates but has latency. Future work: increase multicast_k or local_num_probes.
…imization roadmap
…d routing, predictable skew control Closes #130, #131, #132 - Mechanism I: Add staleness knobs, validity filtering, lock-free foreground path - Mechanism II: Support unicast/budgeted/broadcast route modes with dedup - Mechanism III: Complete RCU AssignmentTable, EMA LoadMonitor, bounded rebalance - Add comprehensive metrics for all three mechanisms - Include gap analysis and optimization report docs
Eliminate cross-subtask and IQ bidirectional duplicate matches inside JoinOperator, reducing Sink dedup pressure to zero. - Add routing_mask_ (uint64_t bitmask) to VectorRecord - Set routing_mask in ResultPartition::emit() for broadcast/multicast/unicast - Owner-Computes rule in JoinOperator emit: only lowest-indexed subtask in intersection(query_mask, cand_mask) emits a match - Propagate candidate routing_mask through JoinFunction result - Add owner_dedup_count metric Results (ClusteredJoin p=8, k=8): Before: total_emits=3,091,648 sink_dedup=2,705,192 After: total_emits=386,456 sink_dedup=0 Recall=1.0000 Precision=1.0000
There was a problem hiding this comment.
Pull request overview
This PR introduces routing-mask–based “owner-computes” deduplication to eliminate duplicate join match emissions in multicast/broadcast scenarios (reducing downstream Sink dedup overhead), while also extending VSJoin routing/configuration, metrics, and related tests/docs.
Changes:
- Add
routing_mask_propagation across routing and join execution, and apply owner-computes dedup at emit time. - Extend VSJoin routing (route modes + metrics), load monitoring (EWMA + totals), and assignment-table concurrency tests.
- Add/validate new VSJoin rebalance config knobs and sync sample configs/docs.
Reviewed changes
Copilot reviewed 59 out of 61 changed files in this pull request and generated 8 comments.
Show a summary per file
| File | Description |
|---|---|
include/common/data_types.h |
Adds routing_mask_ bitmask field to VectorRecord for dedup ownership. |
src/execution/result_partition.cpp |
Sets routing_mask_ during unicast/multicast/broadcast emits. |
src/operator/join_operator.cpp |
Implements owner-computes dedup and propagates masks through join results. |
include/utils/metrics/join_metrics.h |
Adds VSJoin mechanism metrics + owner_dedup_count + probe latency ring buffer. |
src/operator/join_operator_vsjoin_routing.cpp |
Adds VSJoin route-mode dispatch and routing metrics instrumentation. |
include/operator/utils/join_strategy_config.h / src/operator/utils/join_strategy_config.cpp |
Adds VSJoin route/snapshot policy enums + parsing + new config validations/summary. |
src/operator/utils/join_config_validator.cpp + tests |
Validates new VSJoin rebalance knobs; updates LSH recommended window states. |
test/test_utils/join_config_loader.cpp + tests |
Merges/saves new VSJoin knobs; validates loader behavior. |
src/operator/utils/join_strategy_factory.cpp |
Uses method registry preferentially; config-driven VSJoin probe config; removes VSJoin-forced window-state override. |
src/operator/join_operator_methods/vsjoin_method.* |
Adds local multi-probe candidate retrieval + dedup + self-registration. |
src/operator/join_operator_methods/vsjoin_components/* + tests |
Adds assignment-table validity guards; load-monitor EWMA and new test coverage. |
src/index/knn.* |
Adds per-index UID isolation to avoid cross-index leakage. |
config/*.toml |
Adds VSJoin rebalance knobs; adds ClusteredJoin scaling config. |
PR_DESCRIPTION.md / docs/* / .gitignore |
Updates repo documentation and adds research paper assets. |
| // 广播模式:将数据发送到所有通道 | ||
| // Build routing_mask: all channels are targets | ||
| uint64_t route_mask = 0; | ||
| for (size_t i = 0; i < output_channels_.size() && i < 64; ++i) { | ||
| route_mask |= (uint64_t{1} << i); | ||
| } | ||
|
|
There was a problem hiding this comment.
routing_mask_ only tracks the first 64 output channels (bits 0..63), but emit() will still broadcast/multicast to channels >= 64. If owner-computes dedup relies on routing_mask_, records routed to subtasks >= 64 will have incomplete/zero masks, which can reintroduce duplicates or inconsistent ownership decisions. Consider enforcing parallelism/output_channels <= 64 when routing_mask_ is used (fail fast), or switch routing_mask_ to a dynamically sized bitset representation and populate all targets.
| // Build routing bitmask for Owner-Computes dedup. | ||
| // Bit i set ⇒ record lives in subtask i's WindowState. | ||
| uint64_t route_mask = 0; | ||
| for (size_t t : target_subtasks) { | ||
| if (t < 64) route_mask |= (uint64_t{1} << t); | ||
| } | ||
| data_ptr->routing_mask_ = route_mask; | ||
| data_for_join->routing_mask_ = route_mask; |
There was a problem hiding this comment.
Owner-computes mask construction truncates targets to bits < 64. If parallelism > 64 (or VSJoin routes to a subtask index >= 64), routing_mask_ becomes incomplete and the dedup logic can silently fall back or emit defensively, reintroducing duplicates. Please add a runtime guard (e.g., throw/log+disable owner-computes) when parallelism_ > 64, or widen routing_mask_ representation.
| enum class VSJoinRouteMode { | ||
| UNICAST, ///< Route to single best partition only | ||
| BUDGETED, ///< Route to up to fanout_budget partitions (deterministic top-k by distance) | ||
| BROADCAST ///< Route to all partitions | ||
| }; |
There was a problem hiding this comment.
The comment for VSJoinRouteMode::BUDGETED says “deterministic top-k by distance”, but the current routing implementation selects the first fanout_budget LSH candidates without any distance scoring/ranking. Either update the comment to match the behavior (first-k in candidate order), or implement actual distance-based ranking to avoid misleading config semantics.
| private: | ||
| mutable std::shared_mutex local_mutex_; | ||
| // Local record cache: uid -> VectorRecord copy (owned) | ||
| std::unordered_map<uint64_t, std::shared_ptr<const VectorRecord>> local_records_; |
There was a problem hiding this comment.
The header comment says local_records_ stores “VectorRecord copy (owned)”, but the implementation currently inserts nullptr placeholders and only uses the keys for UID membership filtering. Please update the comment (and/or the container type) to reflect actual behavior so future changes don’t assume stored record copies are available.
| # vsjoin_rebalance_imbalance_ratio: 触发重平衡的负载失衡比(max/avg) | ||
| # 类型: double | ||
| # 范围: [1.0, 5.0] | ||
| # 默认: 1.35 | ||
|
|
||
| # vsjoin_rebalance_max_moves: 每轮重平衡最多迁移的 logical partition 数 | ||
| # 类型: int | ||
| # 范围: [1, 1024] | ||
| # 默认: 8 | ||
|
|
There was a problem hiding this comment.
This schema still documents vsjoin_async_threads and vsjoin_allowed_lateness earlier in the VSJoin section, but those fields no longer exist in JoinStrategyConfig / TOML parsing (they are silently ignored). Please either remove them from the schema or explicitly mark them as deprecated/ignored to avoid users thinking they are effective knobs.
| vsjoin_async_threads = 2 | ||
| vsjoin_allowed_lateness = 1000 |
There was a problem hiding this comment.
The VSJoin strategy blocks still set vsjoin_async_threads / vsjoin_allowed_lateness, but these knobs are no longer present in JoinStrategyConfig / TOML parsing (they’re silently ignored). Please remove them from this sample config or mark them as deprecated to avoid confusing users and tests that rely on this file.
| vsjoin_async_threads = 2 | |
| vsjoin_allowed_lateness = 1000 | |
| # vsjoin_async_threads = 2 # deprecated: ignored by JoinStrategyConfig | |
| # vsjoin_allowed_lateness = 1000 # deprecated: ignored by JoinStrategyConfig |
| # chore/vsjoin-factory-paper-sync-20260303 | ||
|
|
||
| ## 📋 概述 | ||
| ## 变更概览 | ||
|
|
||
| 本 PR 实现了 SageFlow 的 **VSJoin 流式向量相似性连接引擎**,包含完整的多线程架构重构和多种 Baseline 算法实现。这是一个大规模重构,包含 **195 个文件变更**,新增约 **50,000+ 行代码**。 | ||
| 本 PR 聚焦三项收敛工作: | ||
|
|
||
| --- | ||
|
|
||
| ## 🏗️ 架构变更 | ||
|
|
||
| ### 1. RuntimeContext 注入与并行执行模型 | ||
|
|
||
| - **RuntimeContext**: 提供任务标识 (`subtask_index`) 和并行度 (`parallelism`) 信息 | ||
| - **ExecutionVertex**: 每个并行实例持有独立的 `RuntimeContext` | ||
| - **Operator 接口**: `open()`, `apply()`, `close()` 方法增加 `RuntimeContext` 参数 | ||
|
|
||
| ### 2. 窗口状态抽象层 | ||
|
|
||
| 新增 `WindowState` 接口,支持多种状态实现: | ||
|
|
||
| | 状态类型 | 描述 | 适用场景 | | ||
| |---------|------|---------| | ||
| | `SharedWindowState` | 所有实例共享同一状态 | RoundRobin 分区 | | ||
| | `PartitionedWindowState` | 每个 subtask 独立状态 | Key/VectorHash 分区 | | ||
| | `TwoTierWindowState` | 两层架构(写友好+紧凑层) | 高吞吐场景 | | ||
| | `PartitionedVectorState` | 向量空间分区状态 | VSJoin/S3J | | ||
|
|
||
| ### 3. 连接策略 | ||
|
|
||
| 统一的 SPSC 队列矩阵连接策略: | ||
|
|
||
| - **ConnectionStrategy**: 统一的连接策略,使用 `upstream × downstream` 个 SPSC 队列 | ||
| - 支持不同的分区器(RoundRobin、KeyHash、VectorHash、LSH、Centroid) | ||
|
|
||
| --- | ||
|
|
||
| ## 🔧 核心组件 | ||
|
|
||
| ### Join 策略工厂模式 (新增) | ||
|
|
||
| ``` | ||
| JoinStrategyConfig (配置) | ||
| ↓ | ||
| JoinConfigValidator (验证) | ||
| ↓ | ||
| JoinStrategyFactory (创建) | ||
| ↓ | ||
| StrategyComponents { | ||
| JoinMethod, | ||
| WindowState (left/right), | ||
| Partitioner, | ||
| Index (shared/partitioned), | ||
| VSJoin/S3J 专用组件 | ||
| } | ||
| ``` | ||
|
|
||
| #### 新增文件: | ||
| - `include/operator/join_strategy_config.h` - 统一配置结构 | ||
| - `include/operator/join_strategy_factory.h` - 策略工厂 | ||
| - `include/operator/join_method_registry.h` - 方法注册中心 | ||
| - `include/operator/join_config_validator.h` - 配置验证器 | ||
| - `include/execution/partitioner_factory.h` - 分区器工厂 | ||
| - `include/state/window_state_factory.h` - 状态工厂 | ||
| - `config/join_strategies.toml` - 策略配置文件 | ||
|
|
||
| ### 向量空间分区器 | ||
|
|
||
| - **VectorSpacePartitioner**: 抽象接口 | ||
| - `LSHPartitioner`: 局部敏感哈希分区 (VSJoin) | ||
| - `KMeansPartitioner`: K-Means 聚类分区 | ||
| - `CentroidPartitioner`: 质心分区 (S3J) | ||
|
|
||
| ### 并发与索引管理 | ||
|
|
||
| - **ConcurrencyManager**: 线程安全的索引管理 | ||
| - **ConcurrencyController**: 索引级别的并发控制 | ||
| - **PartitionedIndex**: 向量空间分区索引 | ||
|
|
||
| ### VSJoin 专用组件 | ||
|
|
||
| - `PartitionCoordinator`: 跨分区协调 | ||
| - `BoundaryTracker`: 边界向量追踪 | ||
| - `AsyncCandidateGenerator`: 异步候选生成 | ||
| - `LateArrivalHandler`: 延迟数据处理 | ||
| - `DistanceVerifier`: 距离验证 | ||
|
|
||
| --- | ||
|
|
||
| ## 📊 支持的 Join 算法 | ||
|
|
||
| | 算法 | 类型 | 分区策略 | 窗口状态 | 说明 | | ||
| |-----|------|---------|---------|------| | ||
| | **BruteForce** | Baseline | RoundRobin | Shared | Ground Truth | | ||
| | **IVF** | Approximate | RoundRobin/KeyHash | Shared/Partitioned | 倒排索引 | | ||
| | **HNSW** | Approximate | RoundRobin | Shared | 分层可导航图 | | ||
| | **S3J** | Baseline | Centroid | PartitionedVector | DEBS'23 | | ||
| | **ClusteredJoin** | Baseline | Centroid | PartitionedVector | VectraFlow | | ||
| | **HDRTree** | Baseline | VectorHash | Partitioned | HDR-Tree | | ||
| | **VSJoin** | Our Method | LSH | PartitionedVector | 本项目核心方法 | | ||
|
|
||
| --- | ||
|
|
||
| ## 🧪 测试覆盖 | ||
| 1. **工厂化主链路收敛** | ||
| - Join 方法创建已接入主执行链路,策略为 **registry 优先 + switch 兜底**。 | ||
| - VSJoin 已完成方法自注册,可通过注册中心直接创建。 | ||
|
|
||
| ### 新增单元测试 (19 个测试文件) | ||
| 2. **配置契约一致性修复(VSJoin)** | ||
| - VSJoin 运行时分区器改为严格按配置契约创建,不再隐式改写为 `Centroid`。 | ||
| - `WindowState` 创建不再被 VSJoin 强制覆盖,改为按配置生效。 | ||
| - 已同步更新 LSH 对 `WindowState` 的推荐集合。 | ||
|
|
||
| - `test_join_strategy_factory.cpp` - 策略工厂测试 | ||
| - `test_join_config_validator.cpp` - 配置验证测试 | ||
| - `test_join_method_registry.cpp` - 注册中心测试 | ||
| - `test_partitioner_factory.cpp` - 分区器工厂测试 | ||
| - `test_window_state_factory.cpp` - 状态工厂测试 | ||
| - `test_window_state.cpp` - 窗口状态基础测试 | ||
| - `test_two_tier_window_state.cpp` - 两层状态测试 | ||
| - `test_partitioned_vector_state.cpp` - 分区向量状态测试 | ||
| - `test_vector_space_partitioner.cpp` - 空间分区器测试 | ||
| - `test_partitioned_index.cpp` - 分区索引测试 | ||
| - `test_partition_coordinator.cpp` - 协调器测试 | ||
| - `test_late_arrival_handler.cpp` - 延迟处理测试 | ||
| - `test_join_operator_state.cpp` - Join 状态测试 | ||
| - `test_bruteforce_method.cpp` / `test_ivf_method.cpp` / `test_s3j_method.cpp` - 方法测试 | ||
| - `test_pca.cpp` / `test_simd_distance.cpp` - 计算优化测试 | ||
| 3. **论文资产并入** | ||
| - 已恢复并提交重命名后的论文目录: | ||
| `docs/research-paper-High_Throughput_Streaming_Vector_Similarity_Joins_on_Multicore_Processors/` | ||
| - 包含 `main.tex`、`References.bib`、`Sections/*` 等核心文件。 | ||
|
|
||
| ### 集成测试 | ||
|
|
||
| - `test_join_pipeline.cpp` - 端到端 Join 流水线测试 | ||
| - `test_join_datasource_modes.cpp` - 多并行度召回率测试 | ||
|
|
||
| ### 性能测试 | ||
|
|
||
| - `perf_join_with_datasource.cpp` - 数据源性能测试 | ||
| - 支持不同数据集模式的性能评估 | ||
|
|
||
| --- | ||
|
|
||
| ## 📚 文档更新 | ||
|
|
||
| - `docs/ARCHITECTURE_REFACTORING.md` - 架构重构指南 | ||
| - `docs/CONNECTION_STRATEGIES.md` - 连接策略详解 | ||
| - `docs/JOIN_PIPELINE_GUIDE.md` - Join 流水线指南 | ||
| - `docs/VSJOIN_IMPLEMENTATION_ROADMAP.md` - VSJoin 实现路线图 | ||
| - `.github/copilot-instructions.md` - AI 辅助开发指南 | ||
| > 约束保持:未修改 VSJoin 集成测试占位文件 `test/IntegrationTest/test_vsjoin_integration.cpp`。 |
There was a problem hiding this comment.
This file’s new header/title (“chore/vsjoin-factory-paper-sync-20260303”) and change summary focus on factory/paper sync rather than the PR’s stated purpose (owner-computes dedup to eliminate multicast duplicates). Please align PR_DESCRIPTION.md with the current PR intent, or move this content to a more appropriate doc file to avoid confusion for reviewers/users.
| #include <unordered_set> | ||
| #include <unordered_map> | ||
| #include <mutex> | ||
|
|
There was a problem hiding this comment.
There are two redundant #include <mutex> directives in this header. Removing the duplicate include will reduce noise and avoid confusion when scanning dependencies.
75aa564 to
275e89a
Compare
…solation, config updates - StorageManager: partition-aware shard storage for per-subtask data isolation - DirectController: new concurrency controller for direct index access - KNN index: simplify API, remove redundant uid-resolution paths - JoinOperator: add combinedMatchId + SubtaskDedupSet for multicast dedup - JoinStrategyConfig: add vsjoin_use_smoothed_load flag - JoinConfigValidator: relax validation for new config fields - JoinStrategyFactory: update factory wiring for new config - Add unified comparison config and algorithm comparison script - Add refactoring documentation for storage shard and vsjoin optimization
275e89a to
5e46515
Compare
…RecommendedConfigShouldBeValid The test was missing index_strategy when validating recommended configs. VSJoin/ClusteredJoin require PARTITIONED index strategy, but the test left it at the default (SHARED), causing validation failure.
Summary
This PR includes all recent development on
feat/vsjoin-exec-mechanismsbranch, covering:Owner-Computes Dedup
Problem
In multicast/broadcast join modes, the same match pair (A,B) was discovered by multiple subtasks and emitted redundantly. Sink (single-threaded) had to deduplicate via hash-set lookup, causing:
Solution
Each VectorRecord carries a
routing_mask_bitmask indicating which subtasks hold it:Results
Comprehensive Benchmark
Shared-Index Algorithms (data=2000)
ClusteredJoin Multicast-k Tradeoff (data=2000, P=8)
VSJoin Fanout Budget Tradeoff (data=2000, P=8)
Files Changed
Core Changes
include/common/data_types.h—routing_mask_field on VectorRecordsrc/execution/result_partition.cpp— set routing_mask on broadcast/multicast/unicastsrc/operator/join_operator.cpp— owner-computes dedup, VSJoin mechanisms, routinginclude/utils/metrics/join_metrics.h— owner_dedup_count metricStorageManager & Index
include/storage/storage_manager.h/src/storage/storage_manager.cpp— shard refactoringinclude/concurrency/direct_controller.h/src/concurrency/direct_controller.cpp— new controllerinclude/index/knn.h/src/index/knn.cpp— API simplificationConfig & Validation
include/operator/utils/join_strategy_config.h— new vsjoin config fieldssrc/operator/utils/join_config_validator.cpp— updated validationtest/UnitTest/test_join_method_registry.cpp— fix RecommendedConfigShouldBeValid testNext Steps