From 5147ba0acb059058fe279370bdf606bef5863b6b Mon Sep 17 00:00:00 2001 From: Ammar Chalifah Date: Tue, 19 May 2026 16:32:19 +0200 Subject: [PATCH 1/4] Spark: Fix type mismatch in SPJ with bucket partition key on string column Co-authored-by: Cursor --- .../spark/source/StructInternalRow.java | 4 +- .../sql/TestStoragePartitionedJoins.java | 38 +++++++++++++++++++ 2 files changed, 40 insertions(+), 2 deletions(-) diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/StructInternalRow.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/StructInternalRow.java index 074f04d03468..7b9f8d702b46 100644 --- a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/StructInternalRow.java +++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/StructInternalRow.java @@ -180,8 +180,8 @@ public UTF8String getUTF8String(int ordinal) { } private UTF8String getUTF8StringInternal(int ordinal) { - CharSequence seq = struct.get(ordinal, CharSequence.class); - return UTF8String.fromString(seq.toString()); + Object value = struct.get(ordinal, Object.class); + return UTF8String.fromString(value.toString()); } @Override diff --git a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/sql/TestStoragePartitionedJoins.java b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/sql/TestStoragePartitionedJoins.java index 8788dff15806..269458a78878 100644 --- a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/sql/TestStoragePartitionedJoins.java +++ b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/sql/TestStoragePartitionedJoins.java @@ -175,6 +175,44 @@ public void testJoinsWithBucketingOnDecimalColumn() throws NoSuchTableException checkJoin("decimal_col", "DECIMAL(20, 2)", "bucket(8, decimal_col)"); } + @TestTemplate + public void testJoinsWithBucketingOnStringColumn() throws NoSuchTableException { + checkJoin("string_col", "STRING", "bucket(8, string_col)"); + } + + @TestTemplate + public void testJoinsWithIdentityAndBucketOnStringColumn() throws NoSuchTableException { + // Regression test for GitHub issue #15349: + // bucket transform on a String column produces Integer partition values, + // but StructInternalRow.getUTF8StringInternal assumed CharSequence + String createTableStmt = + "CREATE TABLE %s (id BIGINT, dep STRING, user_id STRING)" + + "USING iceberg " + + "PARTITIONED BY (dep, bucket(8, user_id))" + + "TBLPROPERTIES (%s)"; + + sql(createTableStmt, tableName, tablePropsAsString(TABLE_PROPERTIES)); + sql(createTableStmt, tableName(OTHER_TABLE_NAME), tablePropsAsString(TABLE_PROPERTIES)); + + sql( + "INSERT INTO %s VALUES (1, 'software', 'user1'), (2, 'hr', 'user2'), (3, 'software', 'user3')", + tableName); + sql( + "INSERT INTO %s VALUES (1, 'software', 'user1'), (2, 'hr', 'user2'), (4, 'software', 'user4')", + tableName(OTHER_TABLE_NAME)); + + assertPartitioningAwarePlan( + 1, /* expected num of shuffles with SPJ */ + 3, /* expected num of shuffles without SPJ */ + "SELECT t1.id, t1.dep, t1.user_id " + + "FROM %s t1 " + + "INNER JOIN %s t2 " + + "ON t1.id = t2.id AND t1.dep = t2.dep AND t1.user_id = t2.user_id " + + "ORDER BY t1.id, t1.dep, t1.user_id", + tableName, + tableName(OTHER_TABLE_NAME)); + } + @TestTemplate public void testJoinsWithBucketingOnBinaryColumn() throws NoSuchTableException { checkJoin("binary_col", "BINARY", "bucket(8, binary_col)"); From 7ac314f8a759144e622d181c2d082d47e7370d6a Mon Sep 17 00:00:00 2001 From: Ammar Chalifah Date: Wed, 27 May 2026 23:15:08 +0300 Subject: [PATCH 2/4] Adding SPJ test case for join on subset of partition key --- .../sql/TestStoragePartitionedJoins.java | 38 ++++++++++++++++++- 1 file changed, 37 insertions(+), 1 deletion(-) diff --git a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/sql/TestStoragePartitionedJoins.java b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/sql/TestStoragePartitionedJoins.java index 269458a78878..94cf5dc5cd30 100644 --- a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/sql/TestStoragePartitionedJoins.java +++ b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/sql/TestStoragePartitionedJoins.java @@ -182,7 +182,6 @@ public void testJoinsWithBucketingOnStringColumn() throws NoSuchTableException { @TestTemplate public void testJoinsWithIdentityAndBucketOnStringColumn() throws NoSuchTableException { - // Regression test for GitHub issue #15349: // bucket transform on a String column produces Integer partition values, // but StructInternalRow.getUTF8StringInternal assumed CharSequence String createTableStmt = @@ -213,6 +212,43 @@ public void testJoinsWithIdentityAndBucketOnStringColumn() throws NoSuchTableExc tableName(OTHER_TABLE_NAME)); } + @TestTemplate + public void testJoinsWithMultipleBucketPartitionsOnStringColumns() throws NoSuchTableException { + String createTableStmt = + "CREATE TABLE %s (id BIGINT, dep STRING, category STRING, user_id STRING)" + + "USING iceberg " + + "PARTITIONED BY (dep, bucket(8, user_id), bucket(4, category))" + + "TBLPROPERTIES (%s)"; + + sql(createTableStmt, tableName, tablePropsAsString(TABLE_PROPERTIES)); + sql(createTableStmt, tableName(OTHER_TABLE_NAME), tablePropsAsString(TABLE_PROPERTIES)); + + sql( + "INSERT INTO %s VALUES " + + "(1, 'software', 'catA', 'user1'), " + + "(2, 'hr', 'catB', 'user2'), " + + "(3, 'software', 'catA', 'user3')", + tableName); + sql( + "INSERT INTO %s VALUES " + + "(1, 'software', 'catA', 'user1'), " + + "(2, 'hr', 'catB', 'user2'), " + + "(4, 'software', 'catC', 'user4')", + tableName(OTHER_TABLE_NAME)); + + assertPartitioningAwarePlan( + 1, /* expected num of shuffles with SPJ */ + 3, /* expected num of shuffles without SPJ */ + "SELECT t1.id, t1.dep, t1.category, t1.user_id " + + "FROM %s t1 " + + "INNER JOIN %s t2 " + + "ON t1.id = t2.id AND t1.dep = t2.dep " + + "AND t1.user_id = t2.user_id AND t1.category = t2.category " + + "ORDER BY t1.id, t1.dep, t1.category, t1.user_id", + tableName, + tableName(OTHER_TABLE_NAME)); + } + @TestTemplate public void testJoinsWithBucketingOnBinaryColumn() throws NoSuchTableException { checkJoin("binary_col", "BINARY", "bucket(8, binary_col)"); From 789b3b010791e2f371362d6d2fa7818e7de7126d Mon Sep 17 00:00:00 2001 From: Ammar Chalifah Date: Thu, 28 May 2026 12:34:01 +0300 Subject: [PATCH 3/4] Revert the fix until the issue is reproducibe from Iceberg repo --- .../org/apache/iceberg/spark/source/StructInternalRow.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/StructInternalRow.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/StructInternalRow.java index 7b9f8d702b46..074f04d03468 100644 --- a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/StructInternalRow.java +++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/StructInternalRow.java @@ -180,8 +180,8 @@ public UTF8String getUTF8String(int ordinal) { } private UTF8String getUTF8StringInternal(int ordinal) { - Object value = struct.get(ordinal, Object.class); - return UTF8String.fromString(value.toString()); + CharSequence seq = struct.get(ordinal, CharSequence.class); + return UTF8String.fromString(seq.toString()); } @Override From a56776186e218b0a4f34a0242162f676f71b2ef8 Mon Sep 17 00:00:00 2001 From: Ammar Chalifah Date: Thu, 28 May 2026 12:54:18 +0300 Subject: [PATCH 4/4] Change test suite for SPJ on subset of partition key --- .../sql/TestStoragePartitionedJoins.java | 81 ++++++++++--------- 1 file changed, 44 insertions(+), 37 deletions(-) diff --git a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/sql/TestStoragePartitionedJoins.java b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/sql/TestStoragePartitionedJoins.java index 94cf5dc5cd30..16096f283296 100644 --- a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/sql/TestStoragePartitionedJoins.java +++ b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/sql/TestStoragePartitionedJoins.java @@ -176,48 +176,54 @@ public void testJoinsWithBucketingOnDecimalColumn() throws NoSuchTableException } @TestTemplate - public void testJoinsWithBucketingOnStringColumn() throws NoSuchTableException { - checkJoin("string_col", "STRING", "bucket(8, string_col)"); - } - - @TestTemplate - public void testJoinsWithIdentityAndBucketOnStringColumn() throws NoSuchTableException { - // bucket transform on a String column produces Integer partition values, - // but StructInternalRow.getUTF8StringInternal assumed CharSequence + public void testJoinWithBucketStringSubsetOfPartitionKeys() throws NoSuchTableException { + // Regression test for #15349: join keys that are a subset of partition keys + // with bucket transforms on String columns String createTableStmt = - "CREATE TABLE %s (id BIGINT, dep STRING, user_id STRING)" + "CREATE TABLE %s (user_id STRING, post_id STRING, " + + "collected_date STRING, category_id STRING, payload STRING)" + "USING iceberg " - + "PARTITIONED BY (dep, bucket(8, user_id))" + + "PARTITIONED BY (collected_date, bucket(32, user_id), bucket(32, category_id))" + "TBLPROPERTIES (%s)"; sql(createTableStmt, tableName, tablePropsAsString(TABLE_PROPERTIES)); sql(createTableStmt, tableName(OTHER_TABLE_NAME), tablePropsAsString(TABLE_PROPERTIES)); sql( - "INSERT INTO %s VALUES (1, 'software', 'user1'), (2, 'hr', 'user2'), (3, 'software', 'user3')", + "INSERT INTO %s VALUES " + + "('user1', 'post1', '2026-05-01', 'catA', 'p1'), " + + "('user2', 'post2', '2026-05-02', 'catB', 'p2'), " + + "('user3', 'post3', '2026-05-01', 'catA', 'p3')", tableName); sql( - "INSERT INTO %s VALUES (1, 'software', 'user1'), (2, 'hr', 'user2'), (4, 'software', 'user4')", + "INSERT INTO %s VALUES " + + "('user1', 'post1', '2026-05-01', 'catA', 'p1'), " + + "('user2', 'post2', '2026-05-02', 'catB', 'p2'), " + + "('user4', 'post4', '2026-05-03', 'catC', 'p4')", tableName(OTHER_TABLE_NAME)); + // SELECT join on bucket source columns only (subset of partition keys) assertPartitioningAwarePlan( - 1, /* expected num of shuffles with SPJ */ - 3, /* expected num of shuffles without SPJ */ - "SELECT t1.id, t1.dep, t1.user_id " + 1, + 3, + "SELECT t1.user_id, t1.category_id, t1.collected_date, t1.payload " + "FROM %s t1 " + "INNER JOIN %s t2 " - + "ON t1.id = t2.id AND t1.dep = t2.dep AND t1.user_id = t2.user_id " - + "ORDER BY t1.id, t1.dep, t1.user_id", + + "ON t1.user_id = t2.user_id AND t1.category_id = t2.category_id " + + "ORDER BY t1.user_id, t1.category_id", tableName, tableName(OTHER_TABLE_NAME)); } @TestTemplate - public void testJoinsWithMultipleBucketPartitionsOnStringColumns() throws NoSuchTableException { + public void testMergeIntoWithBucketStringSubsetOfPartitionKeys() throws NoSuchTableException { + // Regression test for #15349: MERGE INTO with join keys as a subset of partition keys + // with bucket transforms on String columns String createTableStmt = - "CREATE TABLE %s (id BIGINT, dep STRING, category STRING, user_id STRING)" + "CREATE TABLE %s (user_id STRING, post_id STRING, " + + "collected_date STRING, category_id STRING, payload STRING)" + "USING iceberg " - + "PARTITIONED BY (dep, bucket(8, user_id), bucket(4, category))" + + "PARTITIONED BY (collected_date, bucket(32, user_id), bucket(32, category_id))" + "TBLPROPERTIES (%s)"; sql(createTableStmt, tableName, tablePropsAsString(TABLE_PROPERTIES)); @@ -225,28 +231,29 @@ public void testJoinsWithMultipleBucketPartitionsOnStringColumns() throws NoSuch sql( "INSERT INTO %s VALUES " - + "(1, 'software', 'catA', 'user1'), " - + "(2, 'hr', 'catB', 'user2'), " - + "(3, 'software', 'catA', 'user3')", + + "('user1', 'post1', '2026-05-01', 'catA', 'p1'), " + + "('user2', 'post2', '2026-05-02', 'catB', 'p2'), " + + "('user3', 'post3', '2026-05-01', 'catA', 'p3')", tableName); sql( "INSERT INTO %s VALUES " - + "(1, 'software', 'catA', 'user1'), " - + "(2, 'hr', 'catB', 'user2'), " - + "(4, 'software', 'catC', 'user4')", + + "('user1', 'post1', '2026-05-01', 'catA', 'updated1'), " + + "('user2', 'post2', '2026-05-02', 'catB', 'updated2'), " + + "('user4', 'post4', '2026-05-03', 'catC', 'new4')", tableName(OTHER_TABLE_NAME)); - assertPartitioningAwarePlan( - 1, /* expected num of shuffles with SPJ */ - 3, /* expected num of shuffles without SPJ */ - "SELECT t1.id, t1.dep, t1.category, t1.user_id " - + "FROM %s t1 " - + "INNER JOIN %s t2 " - + "ON t1.id = t2.id AND t1.dep = t2.dep " - + "AND t1.user_id = t2.user_id AND t1.category = t2.category " - + "ORDER BY t1.id, t1.dep, t1.category, t1.user_id", - tableName, - tableName(OTHER_TABLE_NAME)); + withSQLConf( + ENABLED_SPJ_SQL_CONF, + () -> + sql( + "MERGE INTO %s t USING %s s " + + "ON t.user_id = s.user_id AND t.category_id = s.category_id " + + "WHEN MATCHED THEN UPDATE SET * " + + "WHEN NOT MATCHED THEN INSERT *", + tableName, tableName(OTHER_TABLE_NAME))); + + List result = sql("SELECT * FROM %s ORDER BY user_id", tableName); + assertThat(result).hasSize(4); } @TestTemplate