Skip to content

feat: VSJoin mechanisms, Owner-Computes dedup, StorageManager shard refactoring#136

Merged
ZeroJustMe merged 16 commits into
main-devfrom
feature/owner-computes-dedup
Apr 2, 2026
Merged

feat: VSJoin mechanisms, Owner-Computes dedup, StorageManager shard refactoring#136
ZeroJustMe merged 16 commits into
main-devfrom
feature/owner-computes-dedup

Conversation

@ZeroJustMe
Copy link
Copy Markdown
Collaborator

@ZeroJustMe ZeroJustMe commented Apr 2, 2026

Summary

This PR includes all recent development on feat/vsjoin-exec-mechanisms branch, covering:

  1. VSJoin Three Core Mechanisms — Bounded-Staleness, Budgeted Routing, Predictable Skew Control
  2. Owner-Computes Dedup — Zero Sink overhead for multicast join via routing_mask bitmask
  3. StorageManager Shard Refactoring — Partition-aware shard storage for per-subtask data isolation
  4. KNN Index Simplification — Removed redundant uid-resolution paths
  5. DirectController — New concurrency controller for direct index access

Owner-Computes Dedup

Problem

In multicast/broadcast join modes, the same match pair (A,B) was discovered by multiple subtasks and emitted redundantly. Sink (single-threaded) had to deduplicate via hash-set lookup, causing:

  • O(P) redundant emissions (P = parallelism)
  • Sink becoming pipeline bottleneck at high parallelism
  • P=4 case: ExecTime=39s (30s stuck in Sink wait)

Solution

Each VectorRecord carries a routing_mask_ bitmask indicating which subtasks hold it:

intersection = query.routing_mask_ & candidate.routing_mask_
owner = __builtin_ctzll(intersection)  // lowest common subtask
if (owner != current_subtask) skip;

Results

Config Para Old Emits New Emits Reduction Old Dedup New Dedup Recall
CJ k=2,p=2 2 772,912 386,456 -50% 386,456 0 1.0
CJ k=4,p=4 4 1,545,824 386,456 -75% 1,159,368 0 1.0
CJ k=8,p=8 8 3,091,648 386,456 -87.5% 2,705,192 0 1.0
CJ k=16,p=16 16 6,183,296 386,456 -93.75% 5,796,840 0 1.0

Comprehensive Benchmark

Shared-Index Algorithms (data=2000)

Algorithm P=1 P=2 P=4 P=8 Recall
BruteForce 6,148ms 5,702ms 14,474ms 17,525ms 1.0000
IVF 6,878ms 6,270ms 12,222ms 17,875ms 1.0000

ClusteredJoin Multicast-k Tradeoff (data=2000, P=8)

k Recall JoinTime Throughput
1 0.441 2,268ms 2,904/s
2 1.000 3,734ms 3,103/s
3 1.000 6,096ms 2,722/s
4 1.000 5,787ms 3,731/s

VSJoin Fanout Budget Tradeoff (data=2000, P=8)

Fanout Recall JoinTime Throughput
1 0.371 1,628ms 6,390/s
2 0.362 1,731ms 6,010/s
3 0.388 1,720ms 6,046/s
4 0.385 1,720ms 6,046/s

VSJoin recall at high parallelism is limited by LSH partition quality. Increasing fanout doesn't help significantly — future work should improve LSH hash functions or switch to learned partitioning.

Files Changed

Core Changes

  • include/common/data_types.hrouting_mask_ field on VectorRecord
  • src/execution/result_partition.cpp — set routing_mask on broadcast/multicast/unicast
  • src/operator/join_operator.cpp — owner-computes dedup, VSJoin mechanisms, routing
  • include/utils/metrics/join_metrics.h — owner_dedup_count metric

StorageManager & Index

  • include/storage/storage_manager.h / src/storage/storage_manager.cpp — shard refactoring
  • include/concurrency/direct_controller.h / src/concurrency/direct_controller.cpp — new controller
  • include/index/knn.h / src/index/knn.cpp — API simplification

Config & Validation

  • include/operator/utils/join_strategy_config.h — new vsjoin config fields
  • src/operator/utils/join_config_validator.cpp — updated validation
  • test/UnitTest/test_join_method_registry.cpp — fix RecommendedConfigShouldBeValid test

Next Steps

  1. Recall-aware load balancing — Current rebalancer migrates routing only, not WindowState data
  2. Per-logical-partition load tracking — LoadMonitor needs finer granularity
  3. LSH partition quality — VSJoin high-parallelism recall limited by hash quality
  4. Bounded-Staleness tuning — Global Index rebuild interval vs recall systematic evaluation

Mechanism I (Bounded-Staleness R/W Decoupling):
- Add VSJoinSnapshotFilterPolicy enum and max_staleness_ms config
- Track rebuild duration, timestamp, count in globalIndexRebuildLoop
- Staleness-aware record filtering in snapshot rebuild

Mechanism II (Budgeted Boundary Coverage Routing):
- Add VSJoinRouteMode enum (UNICAST/BUDGETED/BROADCAST)
- Add vsjoin_fanout_budget config knob
- Implement deterministic top-k partition selection in BUDGETED mode
- BROADCAST mode routes to all logical partitions

Mechanism III (Predictable Skew Control Plane):
- Add vsjoin_rebalance_cooldown_ms config with enforcement
- Add vsjoin_use_smoothed_load config (EWMA latency vs delta records)
- Add getSmoothedLoad() to VSJoinLoadMonitor
- Verify RCU-style atomic publish in PartitionAssignment

All existing tests pass. Analysis doc added.
…x Knn index isolation

Key changes:
1. VSJoinMethod::ExecuteEager now collects candidates directly from
   ConcurrencyManager::query_for_join as shared_ptr<VectorRecord>,
   eliminating the costly resolveUidsToRecords that scanned all
   WindowState partitions per query.

2. Knn (BruteForce) index now tracks inserted UIDs and filters
   query_for_join results to only return records belonging to this
   index instance. This prevents cross-stream contamination where
   a right-stream Local Index would return left-stream records
   via the shared StorageManager.

Performance impact (size=500, para=1):
- candidate_fetch: 9713ms -> 2934ms (3.3x improvement)
- Precision: maintained at 1.0000
- Recall: maintained at 1.0000 for para=1,2

Known limitation:
- Para>=4 recall drops to ~0.80-0.85 due to LSH partition splitting
  similar vectors across partitions. Global Index rebuild compensates
  but has latency. Future work: increase multicast_k or local_num_probes.
…d routing, predictable skew control

Closes #130, #131, #132

- Mechanism I: Add staleness knobs, validity filtering, lock-free foreground path
- Mechanism II: Support unicast/budgeted/broadcast route modes with dedup
- Mechanism III: Complete RCU AssignmentTable, EMA LoadMonitor, bounded rebalance
- Add comprehensive metrics for all three mechanisms
- Include gap analysis and optimization report docs
Eliminate cross-subtask and IQ bidirectional duplicate matches inside
JoinOperator, reducing Sink dedup pressure to zero.

- Add routing_mask_ (uint64_t bitmask) to VectorRecord
- Set routing_mask in ResultPartition::emit() for broadcast/multicast/unicast
- Owner-Computes rule in JoinOperator emit: only lowest-indexed subtask
  in intersection(query_mask, cand_mask) emits a match
- Propagate candidate routing_mask through JoinFunction result
- Add owner_dedup_count metric

Results (ClusteredJoin p=8, k=8):
  Before: total_emits=3,091,648  sink_dedup=2,705,192
  After:  total_emits=386,456    sink_dedup=0
  Recall=1.0000  Precision=1.0000
Copilot AI review requested due to automatic review settings April 2, 2026 13:10
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR introduces routing-mask–based “owner-computes” deduplication to eliminate duplicate join match emissions in multicast/broadcast scenarios (reducing downstream Sink dedup overhead), while also extending VSJoin routing/configuration, metrics, and related tests/docs.

Changes:

  • Add routing_mask_ propagation across routing and join execution, and apply owner-computes dedup at emit time.
  • Extend VSJoin routing (route modes + metrics), load monitoring (EWMA + totals), and assignment-table concurrency tests.
  • Add/validate new VSJoin rebalance config knobs and sync sample configs/docs.

Reviewed changes

Copilot reviewed 59 out of 61 changed files in this pull request and generated 8 comments.

Show a summary per file
File Description
include/common/data_types.h Adds routing_mask_ bitmask field to VectorRecord for dedup ownership.
src/execution/result_partition.cpp Sets routing_mask_ during unicast/multicast/broadcast emits.
src/operator/join_operator.cpp Implements owner-computes dedup and propagates masks through join results.
include/utils/metrics/join_metrics.h Adds VSJoin mechanism metrics + owner_dedup_count + probe latency ring buffer.
src/operator/join_operator_vsjoin_routing.cpp Adds VSJoin route-mode dispatch and routing metrics instrumentation.
include/operator/utils/join_strategy_config.h / src/operator/utils/join_strategy_config.cpp Adds VSJoin route/snapshot policy enums + parsing + new config validations/summary.
src/operator/utils/join_config_validator.cpp + tests Validates new VSJoin rebalance knobs; updates LSH recommended window states.
test/test_utils/join_config_loader.cpp + tests Merges/saves new VSJoin knobs; validates loader behavior.
src/operator/utils/join_strategy_factory.cpp Uses method registry preferentially; config-driven VSJoin probe config; removes VSJoin-forced window-state override.
src/operator/join_operator_methods/vsjoin_method.* Adds local multi-probe candidate retrieval + dedup + self-registration.
src/operator/join_operator_methods/vsjoin_components/* + tests Adds assignment-table validity guards; load-monitor EWMA and new test coverage.
src/index/knn.* Adds per-index UID isolation to avoid cross-index leakage.
config/*.toml Adds VSJoin rebalance knobs; adds ClusteredJoin scaling config.
PR_DESCRIPTION.md / docs/* / .gitignore Updates repo documentation and adds research paper assets.

Comment on lines 49 to +55
// 广播模式:将数据发送到所有通道
// Build routing_mask: all channels are targets
uint64_t route_mask = 0;
for (size_t i = 0; i < output_channels_.size() && i < 64; ++i) {
route_mask |= (uint64_t{1} << i);
}

Copy link

Copilot AI Apr 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

routing_mask_ only tracks the first 64 output channels (bits 0..63), but emit() will still broadcast/multicast to channels >= 64. If owner-computes dedup relies on routing_mask_, records routed to subtasks >= 64 will have incomplete/zero masks, which can reintroduce duplicates or inconsistent ownership decisions. Consider enforcing parallelism/output_channels <= 64 when routing_mask_ is used (fail fast), or switch routing_mask_ to a dynamically sized bitset representation and populate all targets.

Copilot uses AI. Check for mistakes.
Comment on lines +1515 to +1522
// Build routing bitmask for Owner-Computes dedup.
// Bit i set ⇒ record lives in subtask i's WindowState.
uint64_t route_mask = 0;
for (size_t t : target_subtasks) {
if (t < 64) route_mask |= (uint64_t{1} << t);
}
data_ptr->routing_mask_ = route_mask;
data_for_join->routing_mask_ = route_mask;
Copy link

Copilot AI Apr 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Owner-computes mask construction truncates targets to bits < 64. If parallelism > 64 (or VSJoin routes to a subtask index >= 64), routing_mask_ becomes incomplete and the dedup logic can silently fall back or emit defensively, reintroducing duplicates. Please add a runtime guard (e.g., throw/log+disable owner-computes) when parallelism_ > 64, or widen routing_mask_ representation.

Copilot uses AI. Check for mistakes.
Comment on lines +82 to +86
enum class VSJoinRouteMode {
UNICAST, ///< Route to single best partition only
BUDGETED, ///< Route to up to fanout_budget partitions (deterministic top-k by distance)
BROADCAST ///< Route to all partitions
};
Copy link

Copilot AI Apr 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment for VSJoinRouteMode::BUDGETED says “deterministic top-k by distance”, but the current routing implementation selects the first fanout_budget LSH candidates without any distance scoring/ranking. Either update the comment to match the behavior (first-k in candidate order), or implement actual distance-based ranking to avoid misleading config semantics.

Copilot uses AI. Check for mistakes.
Comment thread include/index/knn.h Outdated
Comment on lines +20 to +23
private:
mutable std::shared_mutex local_mutex_;
// Local record cache: uid -> VectorRecord copy (owned)
std::unordered_map<uint64_t, std::shared_ptr<const VectorRecord>> local_records_;
Copy link

Copilot AI Apr 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The header comment says local_records_ stores “VectorRecord copy (owned)”, but the implementation currently inserts nullptr placeholders and only uses the keys for UID membership filtering. Please update the comment (and/or the container type) to reflect actual behavior so future changes don’t assume stored record copies are available.

Copilot uses AI. Check for mistakes.
Comment on lines +182 to +191
# vsjoin_rebalance_imbalance_ratio: 触发重平衡的负载失衡比(max/avg)
# 类型: double
# 范围: [1.0, 5.0]
# 默认: 1.35

# vsjoin_rebalance_max_moves: 每轮重平衡最多迁移的 logical partition 数
# 类型: int
# 范围: [1, 1024]
# 默认: 8

Copy link

Copilot AI Apr 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This schema still documents vsjoin_async_threads and vsjoin_allowed_lateness earlier in the VSJoin section, but those fields no longer exist in JoinStrategyConfig / TOML parsing (they are silently ignored). Please either remove them from the schema or explicitly mark them as deprecated/ignored to avoid users thinking they are effective knobs.

Copilot uses AI. Check for mistakes.
Comment on lines 200 to 201
vsjoin_async_threads = 2
vsjoin_allowed_lateness = 1000
Copy link

Copilot AI Apr 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The VSJoin strategy blocks still set vsjoin_async_threads / vsjoin_allowed_lateness, but these knobs are no longer present in JoinStrategyConfig / TOML parsing (they’re silently ignored). Please remove them from this sample config or mark them as deprecated to avoid confusing users and tests that rely on this file.

Suggested change
vsjoin_async_threads = 2
vsjoin_allowed_lateness = 1000
# vsjoin_async_threads = 2 # deprecated: ignored by JoinStrategyConfig
# vsjoin_allowed_lateness = 1000 # deprecated: ignored by JoinStrategyConfig

Copilot uses AI. Check for mistakes.
Comment thread PR_DESCRIPTION.md
Comment on lines +1 to +21
# chore/vsjoin-factory-paper-sync-20260303

## 📋 概述
## 变更概览

本 PR 实现了 SageFlow 的 **VSJoin 流式向量相似性连接引擎**,包含完整的多线程架构重构和多种 Baseline 算法实现。这是一个大规模重构,包含 **195 个文件变更**,新增约 **50,000+ 行代码**。
本 PR 聚焦三项收敛工作:

---

## 🏗️ 架构变更

### 1. RuntimeContext 注入与并行执行模型

- **RuntimeContext**: 提供任务标识 (`subtask_index`) 和并行度 (`parallelism`) 信息
- **ExecutionVertex**: 每个并行实例持有独立的 `RuntimeContext`
- **Operator 接口**: `open()`, `apply()`, `close()` 方法增加 `RuntimeContext` 参数

### 2. 窗口状态抽象层

新增 `WindowState` 接口,支持多种状态实现:

| 状态类型 | 描述 | 适用场景 |
|---------|------|---------|
| `SharedWindowState` | 所有实例共享同一状态 | RoundRobin 分区 |
| `PartitionedWindowState` | 每个 subtask 独立状态 | Key/VectorHash 分区 |
| `TwoTierWindowState` | 两层架构(写友好+紧凑层) | 高吞吐场景 |
| `PartitionedVectorState` | 向量空间分区状态 | VSJoin/S3J |

### 3. 连接策略

统一的 SPSC 队列矩阵连接策略:

- **ConnectionStrategy**: 统一的连接策略,使用 `upstream × downstream` 个 SPSC 队列
- 支持不同的分区器(RoundRobin、KeyHash、VectorHash、LSH、Centroid)

---

## 🔧 核心组件

### Join 策略工厂模式 (新增)

```
JoinStrategyConfig (配置)
JoinConfigValidator (验证)
JoinStrategyFactory (创建)
StrategyComponents {
JoinMethod,
WindowState (left/right),
Partitioner,
Index (shared/partitioned),
VSJoin/S3J 专用组件
}
```

#### 新增文件:
- `include/operator/join_strategy_config.h` - 统一配置结构
- `include/operator/join_strategy_factory.h` - 策略工厂
- `include/operator/join_method_registry.h` - 方法注册中心
- `include/operator/join_config_validator.h` - 配置验证器
- `include/execution/partitioner_factory.h` - 分区器工厂
- `include/state/window_state_factory.h` - 状态工厂
- `config/join_strategies.toml` - 策略配置文件

### 向量空间分区器

- **VectorSpacePartitioner**: 抽象接口
- `LSHPartitioner`: 局部敏感哈希分区 (VSJoin)
- `KMeansPartitioner`: K-Means 聚类分区
- `CentroidPartitioner`: 质心分区 (S3J)

### 并发与索引管理

- **ConcurrencyManager**: 线程安全的索引管理
- **ConcurrencyController**: 索引级别的并发控制
- **PartitionedIndex**: 向量空间分区索引

### VSJoin 专用组件

- `PartitionCoordinator`: 跨分区协调
- `BoundaryTracker`: 边界向量追踪
- `AsyncCandidateGenerator`: 异步候选生成
- `LateArrivalHandler`: 延迟数据处理
- `DistanceVerifier`: 距离验证

---

## 📊 支持的 Join 算法

| 算法 | 类型 | 分区策略 | 窗口状态 | 说明 |
|-----|------|---------|---------|------|
| **BruteForce** | Baseline | RoundRobin | Shared | Ground Truth |
| **IVF** | Approximate | RoundRobin/KeyHash | Shared/Partitioned | 倒排索引 |
| **HNSW** | Approximate | RoundRobin | Shared | 分层可导航图 |
| **S3J** | Baseline | Centroid | PartitionedVector | DEBS'23 |
| **ClusteredJoin** | Baseline | Centroid | PartitionedVector | VectraFlow |
| **HDRTree** | Baseline | VectorHash | Partitioned | HDR-Tree |
| **VSJoin** | Our Method | LSH | PartitionedVector | 本项目核心方法 |

---

## 🧪 测试覆盖
1. **工厂化主链路收敛**
- Join 方法创建已接入主执行链路,策略为 **registry 优先 + switch 兜底**。
- VSJoin 已完成方法自注册,可通过注册中心直接创建。

### 新增单元测试 (19 个测试文件)
2. **配置契约一致性修复(VSJoin)**
- VSJoin 运行时分区器改为严格按配置契约创建,不再隐式改写为 `Centroid`。
- `WindowState` 创建不再被 VSJoin 强制覆盖,改为按配置生效。
- 已同步更新 LSH 对 `WindowState` 的推荐集合。

- `test_join_strategy_factory.cpp` - 策略工厂测试
- `test_join_config_validator.cpp` - 配置验证测试
- `test_join_method_registry.cpp` - 注册中心测试
- `test_partitioner_factory.cpp` - 分区器工厂测试
- `test_window_state_factory.cpp` - 状态工厂测试
- `test_window_state.cpp` - 窗口状态基础测试
- `test_two_tier_window_state.cpp` - 两层状态测试
- `test_partitioned_vector_state.cpp` - 分区向量状态测试
- `test_vector_space_partitioner.cpp` - 空间分区器测试
- `test_partitioned_index.cpp` - 分区索引测试
- `test_partition_coordinator.cpp` - 协调器测试
- `test_late_arrival_handler.cpp` - 延迟处理测试
- `test_join_operator_state.cpp` - Join 状态测试
- `test_bruteforce_method.cpp` / `test_ivf_method.cpp` / `test_s3j_method.cpp` - 方法测试
- `test_pca.cpp` / `test_simd_distance.cpp` - 计算优化测试
3. **论文资产并入**
- 已恢复并提交重命名后的论文目录:
`docs/research-paper-High_Throughput_Streaming_Vector_Similarity_Joins_on_Multicore_Processors/`
- 包含 `main.tex`、`References.bib`、`Sections/*` 等核心文件。

### 集成测试

- `test_join_pipeline.cpp` - 端到端 Join 流水线测试
- `test_join_datasource_modes.cpp` - 多并行度召回率测试

### 性能测试

- `perf_join_with_datasource.cpp` - 数据源性能测试
- 支持不同数据集模式的性能评估

---

## 📚 文档更新

- `docs/ARCHITECTURE_REFACTORING.md` - 架构重构指南
- `docs/CONNECTION_STRATEGIES.md` - 连接策略详解
- `docs/JOIN_PIPELINE_GUIDE.md` - Join 流水线指南
- `docs/VSJOIN_IMPLEMENTATION_ROADMAP.md` - VSJoin 实现路线图
- `.github/copilot-instructions.md` - AI 辅助开发指南
> 约束保持:未修改 VSJoin 集成测试占位文件 `test/IntegrationTest/test_vsjoin_integration.cpp`。
Copy link

Copilot AI Apr 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This file’s new header/title (“chore/vsjoin-factory-paper-sync-20260303”) and change summary focus on factory/paper sync rather than the PR’s stated purpose (owner-computes dedup to eliminate multicast duplicates). Please align PR_DESCRIPTION.md with the current PR intent, or move this content to a more appropriate doc file to avoid confusion for reviewers/users.

Copilot uses AI. Check for mistakes.
Comment on lines 14 to 17
#include <unordered_set>
#include <unordered_map>
#include <mutex>

Copy link

Copilot AI Apr 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are two redundant #include <mutex> directives in this header. Removing the duplicate include will reduce noise and avoid confusion when scanning dependencies.

Copilot uses AI. Check for mistakes.
@ZeroJustMe ZeroJustMe self-assigned this Apr 2, 2026
@ZeroJustMe ZeroJustMe force-pushed the feature/owner-computes-dedup branch from 75aa564 to 275e89a Compare April 2, 2026 13:55
…solation, config updates

- StorageManager: partition-aware shard storage for per-subtask data isolation
- DirectController: new concurrency controller for direct index access
- KNN index: simplify API, remove redundant uid-resolution paths
- JoinOperator: add combinedMatchId + SubtaskDedupSet for multicast dedup
- JoinStrategyConfig: add vsjoin_use_smoothed_load flag
- JoinConfigValidator: relax validation for new config fields
- JoinStrategyFactory: update factory wiring for new config
- Add unified comparison config and algorithm comparison script
- Add refactoring documentation for storage shard and vsjoin optimization
@ZeroJustMe ZeroJustMe force-pushed the feature/owner-computes-dedup branch from 275e89a to 5e46515 Compare April 2, 2026 15:03
…RecommendedConfigShouldBeValid

The test was missing index_strategy when validating recommended configs.
VSJoin/ClusteredJoin require PARTITIONED index strategy, but the test
left it at the default (SHARED), causing validation failure.
@ZeroJustMe ZeroJustMe changed the title feat: Owner-Computes Dedup — Zero Sink Overhead for Multicast Join feat: VSJoin mechanisms, Owner-Computes dedup, StorageManager shard refactoring Apr 2, 2026
@ZeroJustMe ZeroJustMe changed the base branch from main to main-dev April 2, 2026 15:51
@ZeroJustMe ZeroJustMe merged commit da99c0f into main-dev Apr 2, 2026
3 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants