diff --git a/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestAddFilesProcedure.java b/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestAddFilesProcedure.java index 29993380b50c..d0a90e6412f1 100644 --- a/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestAddFilesProcedure.java +++ b/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestAddFilesProcedure.java @@ -44,6 +44,7 @@ import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Table; import org.apache.iceberg.TableProperties; +import org.apache.iceberg.exceptions.ValidationException; import org.apache.iceberg.io.FileIO; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; @@ -1084,6 +1085,37 @@ public void duplicateDataUnpartitionedAllowed() { sql("SELECT * FROM %s ORDER BY id", tableName)); } + @TestTemplate + public void violateNotNullConstraintFromTable() { + sql("CREATE TABLE %s STORED AS parquet AS SELECT CAST(NULL AS INT) AS id", sourceTableName); + createIcebergTable("id Integer NOT NULL"); + + assertThatThrownBy( + () -> + sql( + "CALL %s.system.add_files('%s', '%s')", + catalogName, tableName, sourceTableName)) + .isInstanceOf(ValidationException.class) + .hasMessageStartingWith("Column 'id' is required but contains 1 null value(s)"); + } + + @TestTemplate + public void violateNotNullConstraintFromFileTable() { + String createParquet = + "CREATE TABLE %s USING parquet LOCATION '%s' AS SELECT CAST(NULL AS INT) AS id"; + sql(createParquet, sourceTableName, fileTableDir.getAbsolutePath()); + createIcebergTable("id Integer NOT NULL"); + + assertThatThrownBy( + () -> + sql( + "CALL %s.system.add_files('%s', '`parquet`.`%s`')", + catalogName, tableName, fileTableDir.getAbsolutePath())) + .cause() + .isInstanceOf(ValidationException.class) + .hasMessageStartingWith("Column 'id' is required but contains 1 null value(s)"); + } + @TestTemplate public void testEmptyImportDoesNotThrow() { createIcebergTable("id Integer, name String, dept String, subdept String"); diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java index 96499184cab3..5c225e6b552e 100644 --- a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java +++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java @@ -57,10 +57,12 @@ import org.apache.iceberg.MetricsConfig; import org.apache.iceberg.PartitionField; import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; import org.apache.iceberg.Table; import org.apache.iceberg.TableOperations; import org.apache.iceberg.TableProperties; import org.apache.iceberg.data.TableMigrationUtil; +import org.apache.iceberg.exceptions.ValidationException; import org.apache.iceberg.hadoop.HadoopFileIO; import org.apache.iceberg.hadoop.SerializableConfiguration; import org.apache.iceberg.hadoop.Util; @@ -78,6 +80,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.spark.source.SparkTable; +import org.apache.iceberg.types.TypeUtil; import org.apache.iceberg.util.PropertyUtil; import org.apache.spark.TaskContext; import org.apache.spark.api.java.JavaRDD; @@ -369,6 +372,7 @@ public boolean isDefinedAt(Expression attr) { } private static Iterator buildManifest( + Schema schema, int formatVersion, Long snapshotId, SerializableConfiguration conf, @@ -392,7 +396,12 @@ private static Iterator buildManifest( ManifestFiles.write(formatVersion, spec, outputFile, snapshotId); try (ManifestWriter writerRef = writer) { - fileTuples.forEachRemaining(fileTuple -> writerRef.add(fileTuple._2)); + fileTuples.forEachRemaining( + fileTuple -> { + DataFile dataFile = fileTuple._2; + verifyRequiredField(schema, dataFile); + writerRef.add(dataFile); + }); } catch (IOException e) { throw SparkExceptionUtil.toUncheckedException( e, "Unable to close the manifest writer: %s", outputPath); @@ -721,7 +730,11 @@ private static void importUnpartitionedSparkTable( } AppendFiles append = targetTable.newAppend(); - files.forEach(append::appendFile); + files.forEach( + dataFile -> { + verifyRequiredField(targetTable.schema(), dataFile); + append.appendFile(dataFile); + }); append.commit(); } catch (NoSuchDatabaseException e) { throw SparkExceptionUtil.toUncheckedException( @@ -908,6 +921,7 @@ public static void importSparkPartitions( (MapPartitionsFunction, ManifestFile>) fileTuple -> buildManifest( + targetTable.schema(), formatVersion, snapshotId, serializableConf, @@ -968,6 +982,33 @@ public static List filterPartitions( } } + private static void verifyRequiredField(Schema schema, DataFile dataFile) { + Set requiredFieldIds = + TypeUtil.indexById(schema.asStruct()).entrySet().stream() + .filter(entry -> entry.getValue().isRequired()) + .map(Map.Entry::getKey) + .collect(Collectors.toSet()); + + if (requiredFieldIds.isEmpty()) { + return; + } + + Map nullValueCounts = dataFile.nullValueCounts(); + if (nullValueCounts == null) { + return; + } + + for (int fieldId : requiredFieldIds) { + Long nullCount = nullValueCounts.getOrDefault(fieldId, 0L); + if (nullCount > 0) { + String fieldName = schema.findField(fieldId).name(); + throw new ValidationException( + "Column '%s' is required but contains %d null value(s): %s", + fieldName, nullCount, dataFile.location()); + } + } + } + private static void deleteManifests(FileIO io, List manifests) { CatalogUtil.deleteFiles(io, Lists.transform(manifests, ManifestFile::path), "manifests"); }