Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion core/wren-core-base/manifest-macro/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ pub fn model(python_binding: proc_macro::TokenStream) -> proc_macro::TokenStream
pub table_reference: Option<String>,
pub columns: Vec<Arc<Column>>,
#[serde(default)]
pub primary_key: Option<String>,
pub primary_key: Option<PrimaryKey>,
#[serde(default, with = "bool_from_int")]
pub cached: bool,
#[serde(default)]
Expand Down
30 changes: 27 additions & 3 deletions core/wren-core-base/src/mdl/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@
#![allow(dead_code)]

use crate::mdl::manifest::{
Column, Cube, CubeDimension, DataSource, JoinType, Manifest, Measure, Model, Relationship,
TimeDimension, View,
Column, Cube, CubeDimension, DataSource, JoinType, Manifest, Measure, Model, PrimaryKey,
Relationship, TimeDimension, View,
};
use crate::mdl::{ColumnLevelOperator, NormalizedExpr, RowLevelAccessControl, SessionProperty};
use std::collections::BTreeMap;
Expand Down Expand Up @@ -144,7 +144,31 @@ impl ModelBuilder {
}

pub fn primary_key(mut self, primary_key: &str) -> Self {
self.model.primary_key = Some(primary_key.to_string());
assert!(
!primary_key.trim().is_empty(),
"primary_key must be a non-empty column name"
);
self.model.primary_key = Some(PrimaryKey::Single(primary_key.to_string()));
self
}

/// Set a composite primary key spanning multiple columns.
/// A single column collapses to [`PrimaryKey::Single`] so the serialized
/// form stays a plain string.
pub fn primary_keys(mut self, primary_keys: &[&str]) -> Self {
assert!(
!primary_keys.is_empty(),
"primary_keys must contain at least one column"
);
assert!(
primary_keys.iter().all(|k| !k.trim().is_empty()),
"primary_keys cannot contain empty column names"
);
self.model.primary_key = Some(if let [single] = primary_keys {
PrimaryKey::Single(single.to_string())
} else {
PrimaryKey::Composite(primary_keys.iter().map(|s| s.to_string()).collect())
});
self
Comment thread
coderabbitai[bot] marked this conversation as resolved.
}

Expand Down
112 changes: 109 additions & 3 deletions core/wren-core-base/src/mdl/manifest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use std::sync::Arc;
mod manifest_impl {
use crate::mdl::manifest::bool_from_int;
use crate::mdl::manifest::table_reference;
use crate::mdl::manifest::PrimaryKey;
use manifest_macro::{
column, column_level_access_control, column_level_operator, cube, cube_dimension,
data_source, join_type, manifest, measure, model, normalized_expr, normalized_expr_type,
Expand Down Expand Up @@ -59,6 +60,7 @@ mod manifest_impl {
mod manifest_impl {
use crate::mdl::manifest::bool_from_int;
use crate::mdl::manifest::table_reference;
use crate::mdl::manifest::PrimaryKey;
use manifest_macro::{
column, column_level_access_control, column_level_operator, cube, cube_dimension,
data_source, join_type, manifest, measure, model, normalized_expr, normalized_expr_type,
Expand Down Expand Up @@ -93,7 +95,28 @@ mod manifest_impl {

pub use crate::mdl::manifest::manifest_impl::*;

pub const MAX_SUPPORTED_LAYOUT_VERSION: u32 = 2;
/// The primary key of a [Model]. A model may declare either a single column
/// (`"primaryKey": "id"`) or a composite key (`"primaryKey": ["a", "b"]`).
/// The `#[serde(untagged)]` representation keeps the legacy single-string form
/// fully backward compatible.
#[derive(serde::Serialize, serde::Deserialize, Debug, PartialEq, Eq, Hash, Clone)]
#[serde(untagged)]
pub enum PrimaryKey {
Single(String),
Composite(Vec<String>),
}

impl PrimaryKey {
/// All primary key columns in declaration order.
pub fn columns(&self) -> Vec<&str> {
match self {
PrimaryKey::Single(s) => vec![s.as_str()],
PrimaryKey::Composite(v) => v.iter().map(String::as_str).collect(),
}
}
}

pub const MAX_SUPPORTED_LAYOUT_VERSION: u32 = 3;

impl Manifest {
pub fn validate_layout_version(&self) -> Result<(), LayoutVersionError> {
Expand Down Expand Up @@ -377,9 +400,21 @@ impl Model {
.map(Arc::clone)
}

/// Return the primary key of the model
/// Return the first primary key column of the model.
/// For a composite key this is the first declared column; use
/// [`Model::primary_keys`] to get every column.
pub fn primary_key(&self) -> Option<&str> {
self.primary_key.as_deref()
self.primary_key
.as_ref()
.and_then(|pk| pk.columns().into_iter().next())
}

/// Return all primary key columns of the model (empty if none declared).
pub fn primary_keys(&self) -> Vec<&str> {
self.primary_key
.as_ref()
.map(PrimaryKey::columns)
.unwrap_or_default()
}

/// Return the table reference of the model
Expand Down Expand Up @@ -557,4 +592,75 @@ mod tests {
model = ModelBuilder::new("empty_model").build();
assert!(matches!(model.source(), ModelSource::Invalid(_)));
}

#[test]
fn test_primary_key_serde() {
use crate::mdl::manifest::{Model, PrimaryKey};

// Legacy single-column form deserializes to Single and serializes back to a string.
let single: Model =
serde_json::from_str(r#"{"name":"customer","columns":[],"primaryKey":"c_custkey"}"#)
.unwrap();
assert_eq!(
single.primary_key,
Some(PrimaryKey::Single("c_custkey".into()))
);
assert_eq!(single.primary_key(), Some("c_custkey"));
assert_eq!(single.primary_keys(), vec!["c_custkey"]);
assert_eq!(
serde_json::to_value(&single.primary_key).unwrap(),
serde_json::json!("c_custkey")
);

// Composite form deserializes to Composite and serializes back to an array.
let composite: Model = serde_json::from_str(
r#"{"name":"partsupp","columns":[],"primaryKey":["ps_partkey","ps_suppkey"]}"#,
)
.unwrap();
assert_eq!(
composite.primary_key,
Some(PrimaryKey::Composite(vec![
"ps_partkey".into(),
"ps_suppkey".into()
]))
);
assert_eq!(composite.primary_key(), Some("ps_partkey"));
assert_eq!(composite.primary_keys(), vec!["ps_partkey", "ps_suppkey"]);
assert_eq!(
serde_json::to_value(&composite.primary_key).unwrap(),
serde_json::json!(["ps_partkey", "ps_suppkey"])
);

// Absent primary key.
let none: Model = serde_json::from_str(r#"{"name":"m","columns":[]}"#).unwrap();
assert_eq!(none.primary_key(), None);
assert!(none.primary_keys().is_empty());

// Builder produces the composite form.
let model = ModelBuilder::new("partsupp")
.primary_keys(&["ps_partkey", "ps_suppkey"])
.build();
assert_eq!(model.primary_keys(), vec!["ps_partkey", "ps_suppkey"]);

// A single-column `primary_keys` collapses to `Single` (serializes to a string).
let model = ModelBuilder::new("customer")
.primary_keys(&["c_custkey"])
.build();
assert_eq!(
model.primary_key,
Some(PrimaryKey::Single("c_custkey".into()))
);
}

#[test]
#[should_panic(expected = "non-empty")]
fn test_builder_rejects_empty_primary_key() {
ModelBuilder::new("m").primary_key(" ").build();
}

#[test]
#[should_panic(expected = "at least one")]
fn test_builder_rejects_empty_primary_keys() {
ModelBuilder::new("m").primary_keys(&[]).build();
}
}
29 changes: 29 additions & 0 deletions core/wren-core-base/src/mdl/migration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ pub fn migrate_manifest(
for version in current..target_version {
match version {
1 => migrate_v1_to_v2(&mut value),
2 => migrate_v2_to_v3(&mut value),
_ => {
return Err(MigrationError::UnsupportedTargetVersion {
target: target_version,
Expand All @@ -102,6 +103,14 @@ fn migrate_v1_to_v2(_value: &mut Value) {
// so existing manifests deserialize correctly without changes.
}

/// v2→v3: No data transformation needed.
/// `primaryKey` accepts a composite array in addition to a single string;
/// existing single-string primary keys remain valid.
fn migrate_v2_to_v3(_value: &mut Value) {
// No-op: `primaryKey` is an untagged `string | array` enum, so existing
// single-column manifests deserialize correctly without changes.
}

#[cfg(test)]
mod tests {
use super::*;
Expand All @@ -114,6 +123,26 @@ mod tests {
assert_eq!(value["layoutVersion"], 2);
}

#[test]
fn test_migrate_v2_to_v3() {
let v2_json = r#"{"layoutVersion":2,"catalog":"wren","schema":"public","models":[]}"#;
let result = migrate_manifest(v2_json, 3).unwrap();
let value: Value = serde_json::from_str(&result).unwrap();
assert_eq!(value["layoutVersion"], 3);
}

#[test]
fn test_migrate_v1_to_v3_preserves_composite_pk() {
let v1_json = r#"{"catalog":"wren","schema":"public","models":[{"name":"partsupp","columns":[],"primaryKey":["ps_partkey","ps_suppkey"]}]}"#;
let result = migrate_manifest(v1_json, 3).unwrap();
let value: Value = serde_json::from_str(&result).unwrap();
assert_eq!(value["layoutVersion"], 3);
assert_eq!(
value["models"][0]["primaryKey"],
serde_json::json!(["ps_partkey", "ps_suppkey"])
);
}

#[test]
fn test_migrate_already_at_target() {
let v2_json = r#"{"layoutVersion":2,"catalog":"wren","schema":"public","models":[]}"#;
Expand Down
17 changes: 10 additions & 7 deletions core/wren-core/core/src/logical_plan/analyze/model_generation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -236,15 +236,18 @@ impl ModelGenerationRule {
let name = alias.name.clone();
let ident =
ident(rebased_measure.to_string()).alias(name.clone());
let rebased_dimension =
rebase_column(&calculation_plan.dimensions[0], &plan_alias)?;
let project = vec![rebased_dimension.clone(), ident];
// Group by every primary key dimension so composite-key
// calculations expose all key columns for the join back.
let rebased_dimensions = calculation_plan
.dimensions
.iter()
.map(|dimension| rebase_column(dimension, &plan_alias))
.collect::<Result<Vec<_>>>()?;
let mut project = rebased_dimensions.clone();
project.push(ident);
let result = match source_plan {
Some(plan) => LogicalPlanBuilder::from(plan)
.aggregate(
vec![rebased_dimension],
vec![rebased_measure],
)?
.aggregate(rebased_dimensions, vec![rebased_measure])?
.project(project)?
.build()?,
_ => {
Expand Down
Loading
Loading