Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,87 @@ public void testJoinsWithBucketingOnDecimalColumn() throws NoSuchTableException
checkJoin("decimal_col", "DECIMAL(20, 2)", "bucket(8, decimal_col)");
}

@TestTemplate
public void testJoinWithBucketStringSubsetOfPartitionKeys() throws NoSuchTableException {
// Regression test for #15349: join keys that are a subset of partition keys
// with bucket transforms on String columns
String createTableStmt =
"CREATE TABLE %s (user_id STRING, post_id STRING, "
+ "collected_date STRING, category_id STRING, payload STRING)"
+ "USING iceberg "
+ "PARTITIONED BY (collected_date, bucket(32, user_id), bucket(32, category_id))"
+ "TBLPROPERTIES (%s)";

sql(createTableStmt, tableName, tablePropsAsString(TABLE_PROPERTIES));
sql(createTableStmt, tableName(OTHER_TABLE_NAME), tablePropsAsString(TABLE_PROPERTIES));

sql(
"INSERT INTO %s VALUES "
+ "('user1', 'post1', '2026-05-01', 'catA', 'p1'), "
+ "('user2', 'post2', '2026-05-02', 'catB', 'p2'), "
+ "('user3', 'post3', '2026-05-01', 'catA', 'p3')",
tableName);
sql(
"INSERT INTO %s VALUES "
+ "('user1', 'post1', '2026-05-01', 'catA', 'p1'), "
+ "('user2', 'post2', '2026-05-02', 'catB', 'p2'), "
+ "('user4', 'post4', '2026-05-03', 'catC', 'p4')",
tableName(OTHER_TABLE_NAME));

// SELECT join on bucket source columns only (subset of partition keys)
assertPartitioningAwarePlan(
1,
3,
"SELECT t1.user_id, t1.category_id, t1.collected_date, t1.payload "
+ "FROM %s t1 "
+ "INNER JOIN %s t2 "
+ "ON t1.user_id = t2.user_id AND t1.category_id = t2.category_id "
+ "ORDER BY t1.user_id, t1.category_id",
tableName,
tableName(OTHER_TABLE_NAME));
}

@TestTemplate
public void testMergeIntoWithBucketStringSubsetOfPartitionKeys() throws NoSuchTableException {
// Regression test for #15349: MERGE INTO with join keys as a subset of partition keys
// with bucket transforms on String columns
String createTableStmt =
"CREATE TABLE %s (user_id STRING, post_id STRING, "
+ "collected_date STRING, category_id STRING, payload STRING)"
+ "USING iceberg "
+ "PARTITIONED BY (collected_date, bucket(32, user_id), bucket(32, category_id))"
+ "TBLPROPERTIES (%s)";

sql(createTableStmt, tableName, tablePropsAsString(TABLE_PROPERTIES));
sql(createTableStmt, tableName(OTHER_TABLE_NAME), tablePropsAsString(TABLE_PROPERTIES));

sql(
"INSERT INTO %s VALUES "
+ "('user1', 'post1', '2026-05-01', 'catA', 'p1'), "
+ "('user2', 'post2', '2026-05-02', 'catB', 'p2'), "
+ "('user3', 'post3', '2026-05-01', 'catA', 'p3')",
tableName);
sql(
"INSERT INTO %s VALUES "
+ "('user1', 'post1', '2026-05-01', 'catA', 'updated1'), "
+ "('user2', 'post2', '2026-05-02', 'catB', 'updated2'), "
+ "('user4', 'post4', '2026-05-03', 'catC', 'new4')",
tableName(OTHER_TABLE_NAME));

withSQLConf(
ENABLED_SPJ_SQL_CONF,
() ->
sql(
"MERGE INTO %s t USING %s s "
+ "ON t.user_id = s.user_id AND t.category_id = s.category_id "
+ "WHEN MATCHED THEN UPDATE SET * "
+ "WHEN NOT MATCHED THEN INSERT *",
tableName, tableName(OTHER_TABLE_NAME)));

List<Object[]> result = sql("SELECT * FROM %s ORDER BY user_id", tableName);
assertThat(result).hasSize(4);
}

@TestTemplate
public void testJoinsWithBucketingOnBinaryColumn() throws NoSuchTableException {
checkJoin("binary_col", "BINARY", "bucket(8, binary_col)");
Expand Down
Loading