Skip to content

Commit edfac38

Browse files
yaooqinnCopilot
andcommitted
[SPARK-55716][SQL][FOLLOWUP] Simplify NOT NULL preservation by skipping asNullable in resolveRelation
Instead of calling `dataSchema.asNullable` in `resolveRelation()` and then restoring nullability from the user schema in `CreateDataSourceTableCommand`, add a `forceNullable` parameter to `resolveRelation()` and pass `false` from the create-table path. This eliminates the `restoreNullability` and `restoreDataTypeNullability` helper methods, simplifying the code significantly (-37 lines net). The read path is unaffected: `resolveRelation()` defaults to `forceNullable = true` (preserving SPARK-13738 safety). Only the create-table path skips `asNullable` so the catalog stores the user's original nullability constraints. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
1 parent 172d68e commit edfac38

2 files changed

Lines changed: 11 additions & 46 deletions

File tree

sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala

Lines changed: 6 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,9 @@ import org.apache.spark.sql.classic.ClassicConversions.castToImpl
2828
import org.apache.spark.sql.errors.QueryCompilationErrors
2929
import org.apache.spark.sql.execution.CommandExecutionMode
3030
import org.apache.spark.sql.execution.datasources._
31+
import org.apache.spark.sql.internal.SQLConf
3132
import org.apache.spark.sql.sources.BaseRelation
32-
import org.apache.spark.sql.types.{ArrayType, DataType, MapType, StructType}
33+
import org.apache.spark.sql.types.StructType
3334
import org.apache.spark.util.ArrayImplicits._
3435

3536
/**
@@ -79,7 +80,9 @@ case class CreateDataSourceTableCommand(table: CatalogTable, ignoreIfExists: Boo
7980
bucketSpec = table.bucketSpec,
8081
options = table.storage.properties ++ pathOption,
8182
// As discussed in SPARK-19583, we don't check if the location is existed
82-
catalogTable = Some(tableWithDefaultOptions)).resolveRelation(checkFilesExist = false)
83+
catalogTable = Some(tableWithDefaultOptions))
84+
.resolveRelation(checkFilesExist = false,
85+
forceNullable = !sessionState.conf.getConf(SQLConf.FILE_SOURCE_INSERT_ENFORCE_NOT_NULL))
8386

8487
val partitionColumnNames = if (table.schema.nonEmpty) {
8588
table.partitionColumnNames
@@ -107,17 +110,8 @@ case class CreateDataSourceTableCommand(table: CatalogTable, ignoreIfExists: Boo
107110
table.copy(schema = new StructType(), partitionColumnNames = Nil)
108111

109112
case _ =>
110-
// Merge nullability from the user-specified schema into the resolved schema.
111-
// DataSource.resolveRelation() calls dataSchema.asNullable which strips NOT NULL
112-
// constraints. We restore nullability from the original user schema while keeping
113-
// the resolved data types (which may include CharVarchar normalization, metadata, etc.)
114-
val resolvedSchema = if (table.schema.nonEmpty) {
115-
restoreNullability(dataSource.schema, table.schema)
116-
} else {
117-
dataSource.schema
118-
}
119113
table.copy(
120-
schema = resolvedSchema,
114+
schema = dataSource.schema,
121115
partitionColumnNames = partitionColumnNames,
122116
// If metastore partition management for file source tables is enabled, we start off with
123117
// partition provider hive, but no partitions in the metastore. The user has to call
@@ -132,38 +126,6 @@ case class CreateDataSourceTableCommand(table: CatalogTable, ignoreIfExists: Boo
132126
Seq.empty[Row]
133127
}
134128

135-
/**
136-
* Recursively restores nullability from the original user-specified schema into
137-
* the resolved schema. The resolved schema's data types are preserved (they may
138-
* contain CharVarchar normalization, metadata, etc.), but nullability flags
139-
* (top-level and nested) are taken from the original schema.
140-
*/
141-
private def restoreNullability(resolved: StructType, original: StructType): StructType = {
142-
val originalFields = original.fields.map(f => f.name -> f).toMap
143-
StructType(resolved.fields.map { resolvedField =>
144-
originalFields.get(resolvedField.name) match {
145-
case Some(origField) =>
146-
resolvedField.copy(
147-
nullable = origField.nullable,
148-
dataType = restoreDataTypeNullability(resolvedField.dataType, origField.dataType))
149-
case None => resolvedField
150-
}
151-
})
152-
}
153-
154-
private def restoreDataTypeNullability(resolved: DataType, original: DataType): DataType = {
155-
(resolved, original) match {
156-
case (r: StructType, o: StructType) => restoreNullability(r, o)
157-
case (ArrayType(rElem, _), ArrayType(oElem, oNull)) =>
158-
ArrayType(restoreDataTypeNullability(rElem, oElem), oNull)
159-
case (MapType(rKey, rVal, _), MapType(oKey, oVal, oValNull)) =>
160-
MapType(
161-
restoreDataTypeNullability(rKey, oKey),
162-
restoreDataTypeNullability(rVal, oVal),
163-
oValNull)
164-
case _ => resolved
165-
}
166-
}
167129
}
168130

169131
/**

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -362,7 +362,10 @@ case class DataSource(
362362
* is considered as a non-streaming file based data source. Since we know
363363
* that files already exist, we don't need to check them again.
364364
*/
365-
def resolveRelation(checkFilesExist: Boolean = true, readOnly: Boolean = false): BaseRelation = {
365+
def resolveRelation(
366+
checkFilesExist: Boolean = true,
367+
readOnly: Boolean = false,
368+
forceNullable: Boolean = true): BaseRelation = {
366369
val relation = (providingInstance(), userSpecifiedSchema) match {
367370
// TODO: Throw when too much is given.
368371
case (dataSource: SchemaRelationProvider, Some(schema)) =>
@@ -436,7 +439,7 @@ case class DataSource(
436439
HadoopFsRelation(
437440
fileCatalog,
438441
partitionSchema = partitionSchema,
439-
dataSchema = dataSchema.asNullable,
442+
dataSchema = if (forceNullable) dataSchema.asNullable else dataSchema,
440443
bucketSpec = bucketSpec,
441444
format,
442445
caseInsensitiveOptions)(sparkSession)

0 commit comments

Comments
 (0)