Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1586,7 +1586,7 @@ class Analyzer(
// These columns will be added by ResolveSchemaEvolution later.
sourceTable.output.map { sourceAttr =>
val key = findAttrInTarget(sourceAttr.name).getOrElse(
UnresolvedAttribute(sourceAttr.name))
UnresolvedAttribute.quoted(sourceAttr.name))
Assignment(key, sourceAttr)
}
} else {
Expand Down Expand Up @@ -1622,7 +1622,7 @@ class Analyzer(
// These columns will be added by ResolveSchemaEvolution later.
sourceTable.output.map { sourceAttr =>
val key = findAttrInTarget(sourceAttr.name).getOrElse(
UnresolvedAttribute(sourceAttr.name))
UnresolvedAttribute.quoted(sourceAttr.name))
Assignment(key, sourceAttr)
}
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1245,4 +1245,41 @@ trait MergeIntoSchemaEvolutionBasicTests extends MergeIntoSchemaEvolutionSuiteBa
expectErrorWithoutEvolutionContains = "A column, variable, or function parameter with name " +
"`bonus` cannot be resolved"
)

// SPARK-56462: UPDATE * / INSERT * schema evolution must work when the source has a column
// whose name contains a dot (e.g. `job.title`). Previously, constructing the assignment key
// with UnresolvedAttribute(sourceAttr.name) called apply(), which parsed the dot as a
// name-part separator and produced nameParts = Seq("job", "title") instead of
// Seq("job.title"). That made isSameColumnAssignment return false, blocking schema evolution
// and causing an UNRESOLVED_COLUMN analysis error rather than adding the new column.
testEvolution("source has extra column with dot in name")(
targetData = Seq(
(1, 100, "hr"),
(2, 200, "software"),
(3, 300, "hr")
).toDF("pk", "salary", "dep"),
// "job.title" is a single column name containing a dot (not a qualified reference)
sourceData = Seq(
(2, 150, "finance", "engineer"),
(4, 400, "finance", "manager")
).toDF("pk", "salary", "dep", "job.title"),
clauses = Seq(
updateAll(),
insertAll()
),
// With schema evolution the dot-named column is added to target and populated
expected = Seq[(Int, Int, String, String)](
(1, 100, "hr", null),
(2, 150, "finance", "engineer"),
(3, 300, "hr", null),
(4, 400, "finance", "manager")
).toDF("pk", "salary", "dep", "job.title"),
// Without schema evolution the dot-named extra column is simply ignored
expectedWithoutEvolution = Seq(
(1, 100, "hr"),
(2, 150, "finance"),
(3, 300, "hr"),
(4, 400, "finance")
).toDF("pk", "salary", "dep")
)
}