diff --git a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/sql/TestStoragePartitionedJoins.java b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/sql/TestStoragePartitionedJoins.java index 8788dff15806..16096f283296 100644 --- a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/sql/TestStoragePartitionedJoins.java +++ b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/sql/TestStoragePartitionedJoins.java @@ -175,6 +175,87 @@ public void testJoinsWithBucketingOnDecimalColumn() throws NoSuchTableException checkJoin("decimal_col", "DECIMAL(20, 2)", "bucket(8, decimal_col)"); } + @TestTemplate + public void testJoinWithBucketStringSubsetOfPartitionKeys() throws NoSuchTableException { + // Regression test for #15349: join keys that are a subset of partition keys + // with bucket transforms on String columns + String createTableStmt = + "CREATE TABLE %s (user_id STRING, post_id STRING, " + + "collected_date STRING, category_id STRING, payload STRING)" + + "USING iceberg " + + "PARTITIONED BY (collected_date, bucket(32, user_id), bucket(32, category_id))" + + "TBLPROPERTIES (%s)"; + + sql(createTableStmt, tableName, tablePropsAsString(TABLE_PROPERTIES)); + sql(createTableStmt, tableName(OTHER_TABLE_NAME), tablePropsAsString(TABLE_PROPERTIES)); + + sql( + "INSERT INTO %s VALUES " + + "('user1', 'post1', '2026-05-01', 'catA', 'p1'), " + + "('user2', 'post2', '2026-05-02', 'catB', 'p2'), " + + "('user3', 'post3', '2026-05-01', 'catA', 'p3')", + tableName); + sql( + "INSERT INTO %s VALUES " + + "('user1', 'post1', '2026-05-01', 'catA', 'p1'), " + + "('user2', 'post2', '2026-05-02', 'catB', 'p2'), " + + "('user4', 'post4', '2026-05-03', 'catC', 'p4')", + tableName(OTHER_TABLE_NAME)); + + // SELECT join on bucket source columns only (subset of partition keys) + assertPartitioningAwarePlan( + 1, + 3, + "SELECT t1.user_id, t1.category_id, t1.collected_date, t1.payload " + + "FROM %s t1 " + + "INNER JOIN %s t2 " + + "ON t1.user_id = t2.user_id AND t1.category_id = t2.category_id " + + "ORDER BY t1.user_id, t1.category_id", + tableName, + tableName(OTHER_TABLE_NAME)); + } + + @TestTemplate + public void testMergeIntoWithBucketStringSubsetOfPartitionKeys() throws NoSuchTableException { + // Regression test for #15349: MERGE INTO with join keys as a subset of partition keys + // with bucket transforms on String columns + String createTableStmt = + "CREATE TABLE %s (user_id STRING, post_id STRING, " + + "collected_date STRING, category_id STRING, payload STRING)" + + "USING iceberg " + + "PARTITIONED BY (collected_date, bucket(32, user_id), bucket(32, category_id))" + + "TBLPROPERTIES (%s)"; + + sql(createTableStmt, tableName, tablePropsAsString(TABLE_PROPERTIES)); + sql(createTableStmt, tableName(OTHER_TABLE_NAME), tablePropsAsString(TABLE_PROPERTIES)); + + sql( + "INSERT INTO %s VALUES " + + "('user1', 'post1', '2026-05-01', 'catA', 'p1'), " + + "('user2', 'post2', '2026-05-02', 'catB', 'p2'), " + + "('user3', 'post3', '2026-05-01', 'catA', 'p3')", + tableName); + sql( + "INSERT INTO %s VALUES " + + "('user1', 'post1', '2026-05-01', 'catA', 'updated1'), " + + "('user2', 'post2', '2026-05-02', 'catB', 'updated2'), " + + "('user4', 'post4', '2026-05-03', 'catC', 'new4')", + tableName(OTHER_TABLE_NAME)); + + withSQLConf( + ENABLED_SPJ_SQL_CONF, + () -> + sql( + "MERGE INTO %s t USING %s s " + + "ON t.user_id = s.user_id AND t.category_id = s.category_id " + + "WHEN MATCHED THEN UPDATE SET * " + + "WHEN NOT MATCHED THEN INSERT *", + tableName, tableName(OTHER_TABLE_NAME))); + + List result = sql("SELECT * FROM %s ORDER BY user_id", tableName); + assertThat(result).hasSize(4); + } + @TestTemplate public void testJoinsWithBucketingOnBinaryColumn() throws NoSuchTableException { checkJoin("binary_col", "BINARY", "bucket(8, binary_col)");