diff --git a/plugins-builtin/src/main/scala/za/co/absa/enceladus/plugins/builtin/errorsender/mq/KafkaErrorSenderPluginImpl.scala b/plugins-builtin/src/main/scala/za/co/absa/enceladus/plugins/builtin/errorsender/mq/KafkaErrorSenderPluginImpl.scala
index b25ba2237..5e2354d07 100644
--- a/plugins-builtin/src/main/scala/za/co/absa/enceladus/plugins/builtin/errorsender/mq/KafkaErrorSenderPluginImpl.scala
+++ b/plugins-builtin/src/main/scala/za/co/absa/enceladus/plugins/builtin/errorsender/mq/KafkaErrorSenderPluginImpl.scala
@@ -20,18 +20,18 @@ import org.apache.spark.sql.functions.{col, explode, lit, size, struct, typedLit
import org.apache.spark.sql.types.DataTypes
import org.apache.spark.sql.{Column, DataFrame, DataFrameWriter, Encoder, Encoders}
import za.co.absa.enceladus.plugins.api.postprocessor.PostProcessor
-import za.co.absa.enceladus.plugins.builtin.common.mq.kafka.{KafkaConnectionParams, KafkaSecurityParams, SchemaRegistrySecurityParams}
+import za.co.absa.enceladus.plugins.builtin.common.mq.kafka.{KafkaConnectionParams, KafkaSecurityParams}
import za.co.absa.enceladus.plugins.builtin.errorsender.DceError
import za.co.absa.enceladus.plugins.builtin.errorsender.mq.KafkaErrorSenderPluginImpl.SingleErrorStardardized
import KafkaErrorSenderPluginImpl._
import za.co.absa.enceladus.plugins.builtin.errorsender.mq.kafka.KafkaErrorSenderPlugin
import za.co.absa.enceladus.plugins.builtin.errorsender.params.ErrorSenderPluginParams
-import za.co.absa.enceladus.utils.error.ErrorMessage.ErrorCodes
import za.co.absa.enceladus.utils.modules._
import za.co.absa.spark.commons.implicits.StructTypeImplicits.StructTypeEnhancements
import za.co.absa.abris.avro.functions.to_avro
-import za.co.absa.abris.config.{AbrisConfig, ToAvroConfig}
+import za.co.absa.abris.config.ToAvroConfig
import za.co.absa.enceladus.plugins.builtin.errorsender.mq.kafka.KafkaErrorSenderPlugin.{avroKeySchemaRegistryConfig, avroValueSchemaRegistryConfig, registerSchemas}
+import za.co.absa.enceladus.utils.error.EnceladusErrorMessage.ErrorCodes
import scala.util.{Failure, Success, Try}
diff --git a/pom.xml b/pom.xml
index f01ee01b7..cc57b5e6d 100644
--- a/pom.xml
+++ b/pom.xml
@@ -161,7 +161,7 @@
2.4
6.2.0
1.1.0
- 0.4.0
+ 0.5.0
3.9.0
2.7.3
3.5.4
diff --git a/spark-jobs/src/main/scala/za/co/absa/enceladus/common/ErrorColNormalization.scala b/spark-jobs/src/main/scala/za/co/absa/enceladus/common/ErrorColNormalization.scala
index fad916543..c1bd033d0 100644
--- a/spark-jobs/src/main/scala/za/co/absa/enceladus/common/ErrorColNormalization.scala
+++ b/spark-jobs/src/main/scala/za/co/absa/enceladus/common/ErrorColNormalization.scala
@@ -17,7 +17,7 @@ package za.co.absa.enceladus.common
import com.typesafe.config.Config
import org.apache.spark.sql.DataFrame
-import za.co.absa.enceladus.utils.error.ErrorMessage
+import za.co.absa.spark.commons.errorhandling.ErrorMessage
import za.co.absa.enceladus.utils.implicits.EnceladusDataFrameImplicits.EnceladusDataframeEnhancements
object ErrorColNormalization {
diff --git a/spark-jobs/src/main/scala/za/co/absa/enceladus/common/performance/PerformanceMetricTools.scala b/spark-jobs/src/main/scala/za/co/absa/enceladus/common/performance/PerformanceMetricTools.scala
index 878d2f2be..82c013933 100644
--- a/spark-jobs/src/main/scala/za/co/absa/enceladus/common/performance/PerformanceMetricTools.scala
+++ b/spark-jobs/src/main/scala/za/co/absa/enceladus/common/performance/PerformanceMetricTools.scala
@@ -20,9 +20,9 @@ import org.apache.spark.sql.functions.{col, size, sum}
import org.slf4j.{Logger, LoggerFactory}
import za.co.absa.atum.core.Atum
import za.co.absa.enceladus.utils.config.PathWithFs
-import za.co.absa.enceladus.utils.error.ErrorMessage
import za.co.absa.enceladus.utils.general.ProjectMetadata
import za.co.absa.enceladus.utils.fs.HadoopFsUtils
+import za.co.absa.spark.commons.errorhandling.ErrorMessage
import za.co.absa.spark.commons.implicits.StructTypeImplicits.StructTypeEnhancements
object PerformanceMetricTools extends ProjectMetadata {
diff --git a/spark-jobs/src/main/scala/za/co/absa/enceladus/conformance/interpreter/DynamicInterpreter.scala b/spark-jobs/src/main/scala/za/co/absa/enceladus/conformance/interpreter/DynamicInterpreter.scala
index cae1ea984..89754781c 100644
--- a/spark-jobs/src/main/scala/za/co/absa/enceladus/conformance/interpreter/DynamicInterpreter.scala
+++ b/spark-jobs/src/main/scala/za/co/absa/enceladus/conformance/interpreter/DynamicInterpreter.scala
@@ -28,19 +28,18 @@ import za.co.absa.enceladus.conformance.config.ConformanceConfigParser
import za.co.absa.enceladus.conformance.datasource.PartitioningUtils
import za.co.absa.enceladus.conformance.interpreter.rules._
import za.co.absa.enceladus.conformance.interpreter.rules.custom.CustomConformanceRule
-import za.co.absa.enceladus.conformance.interpreter.rules.mapping.{MappingRuleInterpreter, MappingRuleInterpreterBroadcast,
- MappingRuleInterpreterGroupExplode}
+import za.co.absa.enceladus.conformance.interpreter.rules.mapping._
import za.co.absa.enceladus.dao.EnceladusDAO
import za.co.absa.enceladus.model.conformanceRule._
import za.co.absa.enceladus.model.{Dataset => ConfDataset}
import za.co.absa.enceladus.utils.config.PathWithFs
-import za.co.absa.enceladus.utils.error.ErrorMessage
import za.co.absa.enceladus.utils.fs.HadoopFsUtils
import za.co.absa.enceladus.utils.udf.ConformanceUDFLibrary
import za.co.absa.spark.commons.utils.explode.ExplosionContext
import za.co.absa.spark.commons.implicits.StructTypeImplicits.StructTypeEnhancementsArrays
import za.co.absa.spark.commons.implicits.DataFrameImplicits.DataFrameEnhancements
import za.co.absa.commons.lang.extensions.SeqExtension._
+import za.co.absa.spark.commons.errorhandling.ErrorMessage
case class DynamicInterpreter()(implicit inputFs: FileSystem) {
private val log = LoggerFactory.getLogger(this.getClass)
diff --git a/spark-jobs/src/main/scala/za/co/absa/enceladus/conformance/interpreter/rules/ArrayCollapseInterpreter.scala b/spark-jobs/src/main/scala/za/co/absa/enceladus/conformance/interpreter/rules/ArrayCollapseInterpreter.scala
index e11acaa47..7cb3925a1 100644
--- a/spark-jobs/src/main/scala/za/co/absa/enceladus/conformance/interpreter/rules/ArrayCollapseInterpreter.scala
+++ b/spark-jobs/src/main/scala/za/co/absa/enceladus/conformance/interpreter/rules/ArrayCollapseInterpreter.scala
@@ -19,7 +19,7 @@ import org.apache.spark.sql.{Dataset, Row, SparkSession}
import za.co.absa.enceladus.conformance.interpreter.{ExplosionState, InterpreterContextArgs}
import za.co.absa.enceladus.dao.EnceladusDAO
import za.co.absa.enceladus.model.conformanceRule.ConformanceRule
-import za.co.absa.enceladus.utils.error.ErrorMessage
+import za.co.absa.spark.commons.errorhandling.ErrorMessage
import za.co.absa.spark.commons.utils.ExplodeTools
import za.co.absa.spark.commons.utils.explode.ExplosionContext
diff --git a/spark-jobs/src/main/scala/za/co/absa/enceladus/conformance/interpreter/rules/mapping/CommonMappingRuleInterpreter.scala b/spark-jobs/src/main/scala/za/co/absa/enceladus/conformance/interpreter/rules/mapping/CommonMappingRuleInterpreter.scala
index c92d264bb..75d757d8c 100644
--- a/spark-jobs/src/main/scala/za/co/absa/enceladus/conformance/interpreter/rules/mapping/CommonMappingRuleInterpreter.scala
+++ b/spark-jobs/src/main/scala/za/co/absa/enceladus/conformance/interpreter/rules/mapping/CommonMappingRuleInterpreter.scala
@@ -27,8 +27,8 @@ import za.co.absa.enceladus.model.MappingTable
import za.co.absa.enceladus.model.conformanceRule.MappingConformanceRule
import za.co.absa.enceladus.model.dataFrameFilter.DataFrameFilter
import za.co.absa.enceladus.conformance.interpreter.rules.ValidationException
-import za.co.absa.enceladus.utils.error.Mapping
import za.co.absa.enceladus.utils.validation.ExpressionValidator
+import za.co.absa.spark.commons.errorhandling.ErrorMessage.Mapping
import za.co.absa.spark.commons.implicits.StructTypeImplicits.StructTypeEnhancements
import scala.util.Try
diff --git a/spark-jobs/src/main/scala/za/co/absa/enceladus/conformance/interpreter/rules/mapping/MappingRuleInterpreter.scala b/spark-jobs/src/main/scala/za/co/absa/enceladus/conformance/interpreter/rules/mapping/MappingRuleInterpreter.scala
index 0fb2661a3..735b67ce9 100644
--- a/spark-jobs/src/main/scala/za/co/absa/enceladus/conformance/interpreter/rules/mapping/MappingRuleInterpreter.scala
+++ b/spark-jobs/src/main/scala/za/co/absa/enceladus/conformance/interpreter/rules/mapping/MappingRuleInterpreter.scala
@@ -24,9 +24,9 @@ import za.co.absa.enceladus.conformance.interpreter.{ExplosionState, Interpreter
import za.co.absa.enceladus.dao.EnceladusDAO
import za.co.absa.enceladus.model.conformanceRule.{ConformanceRule, MappingConformanceRule}
import za.co.absa.enceladus.model.{Dataset => ConfDataset}
-import za.co.absa.enceladus.utils.error._
import za.co.absa.enceladus.utils.transformations.ArrayTransformations
import za.co.absa.enceladus.utils.udf.ConformanceUDFNames
+import za.co.absa.spark.commons.errorhandling.ErrorMessage
import za.co.absa.spark.commons.implicits.StructTypeImplicits.StructTypeEnhancementsArrays
import za.co.absa.spark.commons.sql.functions.col_of_path
@@ -52,7 +52,7 @@ case class MappingRuleInterpreter(rule: MappingConformanceRule, conformance: Con
val res = handleArrays(rule.outputColumn, withUniqueId) { dfIn =>
val joined = joinDatasetAndMappingTable(mapTable, dfIn)
- val mappings = rule.attributeMappings.map(x => Mapping(x._1, x._2)).toSeq
+ val mappings = rule.attributeMappings.map(x => ErrorMessage.Mapping(x._1, x._2)).toSeq
val mappingErrUdfCall = call_udf(ConformanceUDFNames.confMappingErr, lit(rule.outputColumn),
array(rule.attributeMappings.values.toSeq.map(col_of_path(_).cast(StringType)): _*),
typedLit(mappings))
@@ -94,7 +94,7 @@ case class MappingRuleInterpreter(rule: MappingConformanceRule, conformance: Con
.select($"conf.*", col(s"err.${ErrorMessage.errorColumnName}")).drop(idField)
}
- private def inclErrorNullArr(mappings: Seq[Mapping], schema: StructType): Column = {
+ private def inclErrorNullArr(mappings: Seq[ErrorMessage.Mapping], schema: StructType): Column = {
val paths = mappings.flatMap { mapping =>
schema.getAllArraysInPath(mapping.mappedDatasetColumn)
}
diff --git a/spark-jobs/src/main/scala/za/co/absa/enceladus/conformance/interpreter/rules/mapping/MappingRuleInterpreterBroadcast.scala b/spark-jobs/src/main/scala/za/co/absa/enceladus/conformance/interpreter/rules/mapping/MappingRuleInterpreterBroadcast.scala
index 0a7b4b4df..ed026b84d 100644
--- a/spark-jobs/src/main/scala/za/co/absa/enceladus/conformance/interpreter/rules/mapping/MappingRuleInterpreterBroadcast.scala
+++ b/spark-jobs/src/main/scala/za/co/absa/enceladus/conformance/interpreter/rules/mapping/MappingRuleInterpreterBroadcast.scala
@@ -23,7 +23,7 @@ import za.co.absa.enceladus.dao.EnceladusDAO
import za.co.absa.enceladus.model.conformanceRule.{ConformanceRule, MappingConformanceRule}
import za.co.absa.enceladus.model.{Dataset => ConfDataset}
import za.co.absa.enceladus.utils.broadcast.{BroadcastUtils, LocalMappingTable}
-import za.co.absa.enceladus.utils.error.ErrorMessage
+import za.co.absa.spark.commons.errorhandling.ErrorMessage
import za.co.absa.spark.hats.transformations.NestedArrayTransformations
import za.co.absa.spark.hats.transformations.NestedArrayTransformations.GetFieldFunction
diff --git a/spark-jobs/src/main/scala/za/co/absa/enceladus/conformance/interpreter/rules/mapping/MappingRuleInterpreterGroupExplode.scala b/spark-jobs/src/main/scala/za/co/absa/enceladus/conformance/interpreter/rules/mapping/MappingRuleInterpreterGroupExplode.scala
index 2b4d69758..ec9d7312c 100644
--- a/spark-jobs/src/main/scala/za/co/absa/enceladus/conformance/interpreter/rules/mapping/MappingRuleInterpreterGroupExplode.scala
+++ b/spark-jobs/src/main/scala/za/co/absa/enceladus/conformance/interpreter/rules/mapping/MappingRuleInterpreterGroupExplode.scala
@@ -23,8 +23,8 @@ import za.co.absa.enceladus.conformance.interpreter.{ExplosionState, Interpreter
import za.co.absa.enceladus.dao.EnceladusDAO
import za.co.absa.enceladus.model.conformanceRule.{ConformanceRule, MappingConformanceRule}
import za.co.absa.enceladus.model.{Dataset => ConfDataset}
-import za.co.absa.enceladus.utils.error._
import za.co.absa.enceladus.utils.udf.ConformanceUDFNames
+import za.co.absa.spark.commons.errorhandling.ErrorMessage
import za.co.absa.spark.commons.sql.functions.col_of_path
import za.co.absa.spark.commons.utils.explode.ExplosionContext
import za.co.absa.spark.commons.utils.{ExplodeTools, SchemaUtils}
@@ -46,7 +46,7 @@ case class MappingRuleInterpreterGroupExplode(rule: MappingConformanceRule,
val (mapTable, defaultValues) = conformPreparation(df, enableCrossJoin = true)
val (explodedDf, expCtx) = explodeIfNeeded(df, explosionState)
- val mappings = rule.attributeMappings.map(x => Mapping(x._1, x._2)).toSeq
+ val mappings = rule.attributeMappings.map(x => ErrorMessage.Mapping(x._1, x._2)).toSeq
val mappingErrUdfCall = call_udf(ConformanceUDFNames.confMappingErr, lit(rule.allOutputColumns().keys.mkString(",")),
array(rule.attributeMappings.values.toSeq.map(col_of_path(_).cast(StringType)): _*),
typedLit(mappings))
diff --git a/spark-jobs/src/test/scala/za/co/absa/enceladus/common/error/ErrorMessageFactory.scala b/spark-jobs/src/test/scala/za/co/absa/enceladus/common/error/ErrorMessageFactory.scala
deleted file mode 100644
index a50869fa7..000000000
--- a/spark-jobs/src/test/scala/za/co/absa/enceladus/common/error/ErrorMessageFactory.scala
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Copyright 2018 ABSA Group Limited
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package za.co.absa.enceladus.common.error
-
-object ErrorMessageFactory {
- def errColSchema(nullable: Boolean): String = s"\n |-- errCol: array (nullable = $nullable)\n"+
- " | |-- element: struct (containsNull = false)\n"+
- " | | |-- errType: string (nullable = true)\n"+
- " | | |-- errCode: string (nullable = true)\n"+
- " | | |-- errMsg: string (nullable = true)\n"+
- " | | |-- errCol: string (nullable = true)\n"+
- " | | |-- rawValues: array (nullable = true)\n"+
- " | | | |-- element: string (containsNull = true)\n"+
- " | | |-- mappings: array (nullable = true)\n"+
- " | | | |-- element: struct (containsNull = true)\n"+
- " | | | | |-- mappingTableColumn: string (nullable = true)\n"+
- " | | | | |-- mappedDatasetColumn: string (nullable = true)\n"
-
- def attachErrColToSchemaPrint(nullable: Boolean, schemaPrint: String): String = {
- schemaPrint + errColSchema(nullable)
- }
-}
diff --git a/spark-jobs/src/test/scala/za/co/absa/enceladus/conformance/interpreter/rules/custom/CustomRuleSuite.scala b/spark-jobs/src/test/scala/za/co/absa/enceladus/conformance/interpreter/rules/custom/CustomRuleSuite.scala
index 8693c2fb3..2565267fa 100644
--- a/spark-jobs/src/test/scala/za/co/absa/enceladus/conformance/interpreter/rules/custom/CustomRuleSuite.scala
+++ b/spark-jobs/src/test/scala/za/co/absa/enceladus/conformance/interpreter/rules/custom/CustomRuleSuite.scala
@@ -25,8 +25,8 @@ import za.co.absa.enceladus.conformance.interpreter.{DynamicInterpreter, Explosi
import za.co.absa.enceladus.dao.EnceladusDAO
import za.co.absa.enceladus.model.conformanceRule.ConformanceRule
import za.co.absa.enceladus.model.{conformanceRule, Dataset => ConfDataset}
-import za.co.absa.enceladus.utils.error.ErrorMessage
import za.co.absa.enceladus.utils.testUtils.{HadoopFsTestBase, TZNormalizedSparkTestBase}
+import za.co.absa.spark.commons.errorhandling.ErrorMessage
case class MyCustomRule(
order: Int,
diff --git a/spark-jobs/src/test/scala/za/co/absa/enceladus/conformance/interpreter/rules/mapping/MappingGroupExplodeSuite.scala b/spark-jobs/src/test/scala/za/co/absa/enceladus/conformance/interpreter/rules/mapping/MappingGroupExplodeSuite.scala
index a5540f7db..928eb09c7 100644
--- a/spark-jobs/src/test/scala/za/co/absa/enceladus/conformance/interpreter/rules/mapping/MappingGroupExplodeSuite.scala
+++ b/spark-jobs/src/test/scala/za/co/absa/enceladus/conformance/interpreter/rules/mapping/MappingGroupExplodeSuite.scala
@@ -18,8 +18,8 @@ package za.co.absa.enceladus.conformance.interpreter.rules.mapping
import org.apache.spark.sql.functions.{array, typedLit}
import za.co.absa.enceladus.conformance.interpreter.DynamicInterpreter
import za.co.absa.enceladus.conformance.interpreter.rules.testcasefactories.NestedTestCaseFactory._
-import za.co.absa.enceladus.conformance.interpreter.rules.testcasefactories.SimpleTestCaseFactory.{simpleMappingRule, simpleMappingRuleMultipleOutputs, simpleMappingRuleMultipleOutputsWithDefaults, simpleMappingRuleWithDefaultValue}
-import za.co.absa.enceladus.utils.error.ErrorMessage
+import za.co.absa.enceladus.conformance.interpreter.rules.testcasefactories.SimpleTestCaseFactory._
+import za.co.absa.spark.commons.errorhandling.ErrorMessage
import za.co.absa.spark.commons.utils.JsonUtils
import za.co.absa.spark.commons.implicits.DataFrameImplicits.DataFrameEnhancements
diff --git a/spark-jobs/src/test/scala/za/co/absa/enceladus/conformance/interpreter/rules/mapping/MappingRuleBroadcastSuite.scala b/spark-jobs/src/test/scala/za/co/absa/enceladus/conformance/interpreter/rules/mapping/MappingRuleBroadcastSuite.scala
index 3b3b1d83d..b54fd2dd5 100644
--- a/spark-jobs/src/test/scala/za/co/absa/enceladus/conformance/interpreter/rules/mapping/MappingRuleBroadcastSuite.scala
+++ b/spark-jobs/src/test/scala/za/co/absa/enceladus/conformance/interpreter/rules/mapping/MappingRuleBroadcastSuite.scala
@@ -19,10 +19,9 @@ import org.apache.spark.sql.functions._
import za.co.absa.enceladus.conformance.interpreter.DynamicInterpreter
import za.co.absa.enceladus.conformance.interpreter.rules.testcasefactories.NestedTestCaseFactory._
import za.co.absa.enceladus.conformance.interpreter.rules.testcasefactories.SimpleTestCaseFactory._
-import za.co.absa.enceladus.utils.error.ErrorMessage
+import za.co.absa.spark.commons.errorhandling.ErrorMessage
import za.co.absa.spark.commons.utils.JsonUtils
import za.co.absa.spark.commons.implicits.DataFrameImplicits.DataFrameEnhancements
-import za.co.absa.spark.commons.implicits.DataFrameImplicits.DataFrameEnhancements
class MappingRuleBroadcastSuite extends MappingInterpreterSuite {
diff --git a/spark-jobs/src/test/scala/za/co/absa/enceladus/conformance/samples/ArraySamples.scala b/spark-jobs/src/test/scala/za/co/absa/enceladus/conformance/samples/ArraySamples.scala
index c3b76ab46..272b228aa 100644
--- a/spark-jobs/src/test/scala/za/co/absa/enceladus/conformance/samples/ArraySamples.scala
+++ b/spark-jobs/src/test/scala/za/co/absa/enceladus/conformance/samples/ArraySamples.scala
@@ -17,7 +17,8 @@ package za.co.absa.enceladus.conformance.samples
import za.co.absa.enceladus.model.conformanceRule._
import za.co.absa.enceladus.model.{MappingTable, Dataset => ConfDataset}
-import za.co.absa.enceladus.utils.error._
+import za.co.absa.enceladus.utils.error.EnceladusErrorMessage
+import za.co.absa.spark.commons.errorhandling.ErrorMessage
case class Outer(order: Int, a: Seq[Inner], myFlag: Boolean)
case class OuterErr(order: Int, a: Seq[Inner], myFlag: Boolean, errCol: Seq[ErrorMessage])
@@ -142,7 +143,7 @@ object ArraySamples {
ConformedInner2(2, "two", "twoDrop me :)", "Hello world", new ConformedInner3("Hello world"), "myConf", "HELLO WORLD")
)),
ConformedInner(Seq()),
- ConformedInner(null)), Seq(ErrorMessage.confMappingErr("a.c.conformedD", List("0", "true"), List(Mapping("ind", "a.c.d"), Mapping("otherFlag", "myFlag"))))),
+ ConformedInner(null)), Seq(EnceladusErrorMessage.confMappingErr("a.c.conformedD", List("0", "true"), List(ErrorMessage.Mapping("ind", "a.c.d"), ErrorMessage.Mapping("otherFlag", "myFlag"))))),
ConformedOuter(2, Seq(), Seq()),
ConformedOuter(3, null, Seq()))
}
diff --git a/spark-jobs/src/test/scala/za/co/absa/enceladus/conformance/samples/EmployeeConformance.scala b/spark-jobs/src/test/scala/za/co/absa/enceladus/conformance/samples/EmployeeConformance.scala
index 5b8dda503..ff4e40545 100644
--- a/spark-jobs/src/test/scala/za/co/absa/enceladus/conformance/samples/EmployeeConformance.scala
+++ b/spark-jobs/src/test/scala/za/co/absa/enceladus/conformance/samples/EmployeeConformance.scala
@@ -17,7 +17,8 @@ package za.co.absa.enceladus.conformance.samples
import za.co.absa.enceladus.model.conformanceRule._
import za.co.absa.enceladus.model.{Dataset, DefaultValue, MappingTable}
-import za.co.absa.enceladus.utils.error.{ErrorMessage, Mapping}
+import za.co.absa.enceladus.utils.error.EnceladusErrorMessage
+import za.co.absa.spark.commons.errorhandling.ErrorMessage
object EmployeeConformance {
val countryMT = MappingTable(name = "country", version = 0, hdfsPath = "src/test/testData/country", schemaName = "country", schemaVersion = 0)
@@ -65,8 +66,8 @@ object EmployeeConformance {
ConformedRole(1), ConformedEmployeeId = "2"),
ConformedEmployee(employee_id = 3, name = "John", surname = "Doe3", dept= 3, role = 2, country = "SWE", conformed_country = null, conformed_department = "Unknown dept",
conformed_role = "External dev", errCol= List(
- ErrorMessage.confMappingErr("conformed_country", Seq("SWE"), Seq(Mapping("country_code", "country"))),
- ErrorMessage.confMappingErr("conformed_department", Seq("3"), Seq(Mapping("dept_id", "dept")))
+ EnceladusErrorMessage.confMappingErr("conformed_country", Seq("SWE"), Seq(ErrorMessage.Mapping("country_code", "country"))),
+ EnceladusErrorMessage.confMappingErr("conformed_department", Seq("3"), Seq(ErrorMessage.Mapping("dept_id", "dept")))
), MyLiteral = "abcdef", MyUpperLiteral = "ABCDEF", Concatenated = "abcdefABCDEF", SparkConfAttr = "hello :)", ConformedRole(2), ConformedEmployeeId = "3"),
ConformedEmployee(employee_id = 4, name = "John", surname = "Doe4", dept= 1, role = 2, country = "IN", conformed_country = "India", conformed_department = "Ingestion Squad",
conformed_role = "Ingestion Developer", errCol= List(), MyLiteral = "abcdef", MyUpperLiteral = "ABCDEF", Concatenated = "abcdefABCDEF", SparkConfAttr = "hello :)",
diff --git a/spark-jobs/src/test/scala/za/co/absa/enceladus/standardization/StandardizationRerunSuite.scala b/spark-jobs/src/test/scala/za/co/absa/enceladus/standardization/StandardizationRerunSuite.scala
index 14e9db954..8d0ffacb8 100644
--- a/spark-jobs/src/test/scala/za/co/absa/enceladus/standardization/StandardizationRerunSuite.scala
+++ b/spark-jobs/src/test/scala/za/co/absa/enceladus/standardization/StandardizationRerunSuite.scala
@@ -26,8 +26,8 @@ import za.co.absa.enceladus.dao.EnceladusDAO
import za.co.absa.enceladus.model.Dataset
import za.co.absa.enceladus.standardization.config.StandardizationConfig
import za.co.absa.enceladus.standardization.fixtures.TempFileFixture
-import za.co.absa.enceladus.utils.error.ErrorMessage
import za.co.absa.enceladus.utils.testUtils.TZNormalizedSparkTestBase
+import za.co.absa.spark.commons.errorhandling.ErrorMessage
import za.co.absa.standardization.ValidationException
import za.co.absa.standardization.{RecordIdGeneration, Standardization}
import za.co.absa.standardization.config.{BasicMetadataColumnsConfig, BasicStandardizationConfig}
@@ -47,6 +47,7 @@ class StandardizationRerunSuite extends FixtureAnyFunSuite with TZNormalizedSpar
private val tmpFilePrefix = "test-input-"
private val tmpFileSuffix = ".csv"
+
private val csvContent: String =
"""101|102|1|2019-05-04|2019-05-04
|201|202|2|2019-05-05|2019-05-05
diff --git a/utils/src/main/scala/za/co/absa/enceladus/utils/broadcast/BroadcastUtils.scala b/utils/src/main/scala/za/co/absa/enceladus/utils/broadcast/BroadcastUtils.scala
index 3398d2d85..2e8e2ef81 100644
--- a/utils/src/main/scala/za/co/absa/enceladus/utils/broadcast/BroadcastUtils.scala
+++ b/utils/src/main/scala/za/co/absa/enceladus/utils/broadcast/BroadcastUtils.scala
@@ -22,7 +22,8 @@ import org.apache.spark.sql.expressions.UserDefinedFunction
import org.apache.spark.sql.functions.{expr, udf}
import org.apache.spark.sql.types.DataType
import org.apache.spark.sql.{Row, SparkSession}
-import za.co.absa.enceladus.utils.error.{ErrorMessage, Mapping}
+import za.co.absa.enceladus.utils.error.EnceladusErrorMessage
+import za.co.absa.spark.commons.errorhandling.ErrorMessage
object BroadcastUtils {
// scalastyle:off null
@@ -108,7 +109,7 @@ object BroadcastUtils {
*/
def getErrorUdf(mappingTable: Broadcast[LocalMappingTable],
outputColumns: Seq[String],
- mappings: Seq[Mapping])(implicit spark: SparkSession): UserDefinedFunction = {
+ mappings: Seq[ErrorMessage.Mapping])(implicit spark: SparkSession): UserDefinedFunction = {
val numberOfArguments = mappingTable.value.keyTypes.size
@@ -117,7 +118,7 @@ object BroadcastUtils {
null
} else {
val strings: Seq[String] = key.map(a => safeToString(a))
- ErrorMessage.confMappingErr(outputColumns.mkString(","), strings, mappings)
+ EnceladusErrorMessage.confMappingErr(outputColumns.mkString(","), strings, mappings)
}
}
val errorMessageType = ScalaReflection.schemaFor[ErrorMessage].dataType
diff --git a/utils/src/main/scala/za/co/absa/enceladus/utils/error/ErrorMessage.scala b/utils/src/main/scala/za/co/absa/enceladus/utils/error/EnceladusErrorMessage.scala
similarity index 88%
rename from utils/src/main/scala/za/co/absa/enceladus/utils/error/ErrorMessage.scala
rename to utils/src/main/scala/za/co/absa/enceladus/utils/error/EnceladusErrorMessage.scala
index 3598f4c45..e606ffa94 100644
--- a/utils/src/main/scala/za/co/absa/enceladus/utils/error/ErrorMessage.scala
+++ b/utils/src/main/scala/za/co/absa/enceladus/utils/error/EnceladusErrorMessage.scala
@@ -18,6 +18,7 @@ package za.co.absa.enceladus.utils.error
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.StructType
import za.co.absa.standardization.config.DefaultErrorCodesConfig
+import za.co.absa.spark.commons.errorhandling.ErrorMessage
/**
* Case class to represent an error message
@@ -29,13 +30,13 @@ import za.co.absa.standardization.config.DefaultErrorCodesConfig
* @param rawValues - Sequence of raw values (which are the potential culprits of the error)
* @param mappings - Sequence of Mappings i.e Mapping Table Column -> Equivalent Mapped Dataset column
*/
-case class ErrorMessage(errType: String, errCode: String, errMsg: String, errCol: String, rawValues: Seq[String], mappings: Seq[Mapping] = Seq())
-case class Mapping(mappingTableColumn: String, mappedDatasetColumn: String)
+//case class ErrorMessage(errType: String, errCode: String, errMsg: String, errCol: String, rawValues: Seq[String], mappings: Seq[Mapping] = Seq())
+//case class Mapping(mappingTableColumn: String, mappedDatasetColumn: String)
-object ErrorMessage {
- val errorColumnName = "errCol"
+object EnceladusErrorMessage {
+// val errorColumnName = "errCol"
- def confMappingErr(errCol: String, rawValues: Seq[String], mappings: Seq[Mapping]): ErrorMessage = ErrorMessage(
+ def confMappingErr(errCol: String, rawValues: Seq[String], mappings: Seq[ErrorMessage.Mapping]): ErrorMessage = ErrorMessage(
errType = "confMapError",
errCode = ErrorCodes.ConfMapError,
errMsg = "Conformance Error - Null produced by mapping conformance rule",
diff --git a/utils/src/main/scala/za/co/absa/enceladus/utils/schema/SparkUtils.scala b/utils/src/main/scala/za/co/absa/enceladus/utils/schema/SparkUtils.scala
index 2deb77912..17f8b5b77 100644
--- a/utils/src/main/scala/za/co/absa/enceladus/utils/schema/SparkUtils.scala
+++ b/utils/src/main/scala/za/co/absa/enceladus/utils/schema/SparkUtils.scala
@@ -18,8 +18,8 @@ package za.co.absa.enceladus.utils.schema
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{Column, DataFrame, SparkSession}
-import za.co.absa.enceladus.utils.error.ErrorMessage
import za.co.absa.enceladus.utils.udf.ConformanceUDFLibrary
+import za.co.absa.spark.commons.errorhandling.ErrorMessage
import za.co.absa.spark.commons.implicits.StructTypeImplicits.StructTypeEnhancements
import za.co.absa.spark.commons.implicits.DataFrameImplicits.DataFrameEnhancements
import za.co.absa.spark.hats.transformations.NestedArrayTransformations
diff --git a/utils/src/main/scala/za/co/absa/enceladus/utils/udf/ConformanceUDFLibrary.scala b/utils/src/main/scala/za/co/absa/enceladus/utils/udf/ConformanceUDFLibrary.scala
index b081770be..188821188 100644
--- a/utils/src/main/scala/za/co/absa/enceladus/utils/udf/ConformanceUDFLibrary.scala
+++ b/utils/src/main/scala/za/co/absa/enceladus/utils/udf/ConformanceUDFLibrary.scala
@@ -18,7 +18,8 @@ package za.co.absa.enceladus.utils.udf
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.api.java.UDF2
import org.apache.spark.sql.types.ArrayType
-import za.co.absa.enceladus.utils.error.{ErrorMessage, Mapping}
+import za.co.absa.enceladus.utils.error.EnceladusErrorMessage
+import za.co.absa.spark.commons.errorhandling.ErrorMessage
import za.co.absa.enceladus.utils.udf.ConformanceUDFNames._
import za.co.absa.spark.commons.OncePerSparkSession
@@ -28,20 +29,20 @@ import scala.collection.mutable
class ConformanceUDFLibrary()(implicit sparkToRegisterTo: SparkSession) extends OncePerSparkSession {
override protected def register(implicit spark: SparkSession): Unit = {
- spark.udf.register(confMappingErr, { (errCol: String, rawValues: Seq[String], mappings: Seq[Mapping]) =>
- ErrorMessage.confMappingErr(errCol, rawValues, mappings)
+ spark.udf.register(confMappingErr, { (errCol: String, rawValues: Seq[String], mappings: Seq[ErrorMessage.Mapping]) =>
+ EnceladusErrorMessage.confMappingErr(errCol, rawValues, mappings)
})
spark.udf.register(confCastErr, { (errCol: String, rawValue: String) =>
- ErrorMessage.confCastErr(errCol, rawValue)
+ EnceladusErrorMessage.confCastErr(errCol, rawValue)
})
spark.udf.register(confNegErr, { (errCol: String, rawValue: String) =>
- ErrorMessage.confNegErr(errCol, rawValue)
+ EnceladusErrorMessage.confNegErr(errCol, rawValue)
})
spark.udf.register(confLitErr, { (errCol: String, rawValue: String) =>
- ErrorMessage.confLitErr(errCol, rawValue)
+ EnceladusErrorMessage.confLitErr(errCol, rawValue)
})
spark.udf.register(arrayDistinctErrors, // this UDF is registered for _spark-hats_ library sake
diff --git a/utils/src/test/scala/za/co/absa/enceladus/utils/broadcast/BroadcastUtilsSuite.scala b/utils/src/test/scala/za/co/absa/enceladus/utils/broadcast/BroadcastUtilsSuite.scala
index 2578e8b1b..25c444f81 100644
--- a/utils/src/test/scala/za/co/absa/enceladus/utils/broadcast/BroadcastUtilsSuite.scala
+++ b/utils/src/test/scala/za/co/absa/enceladus/utils/broadcast/BroadcastUtilsSuite.scala
@@ -18,8 +18,9 @@ package za.co.absa.enceladus.utils.broadcast
import org.apache.spark.sql.functions._
import org.apache.spark.sql.{DataFrame, Row}
import org.scalatest.wordspec.AnyWordSpec
-import za.co.absa.enceladus.utils.error.Mapping
+import za.co.absa.enceladus.utils.error.EnceladusErrorMessage
import za.co.absa.enceladus.utils.testUtils.{LoggerTestBase, TZNormalizedSparkTestBase}
+import za.co.absa.spark.commons.errorhandling.ErrorMessage
import scala.collection.mutable
@@ -522,7 +523,7 @@ class BroadcastUtilsSuite extends AnyWordSpec with TZNormalizedSparkTestBase wit
val localMt = LocalMappingTable(dfMt, Seq("id"), Map(""->"val"))
val broadcastedMt = BroadcastUtils.broadcastMappingTable(localMt)
- val mappings = Seq(Mapping("id", "key2"))
+ val mappings = Seq(ErrorMessage.Mapping("id", "key2"))
val errorUdf1 = BroadcastUtils.getErrorUdf(broadcastedMt, Seq("val"), mappings)
@@ -540,7 +541,7 @@ class BroadcastUtilsSuite extends AnyWordSpec with TZNormalizedSparkTestBase wit
val localMt = LocalMappingTable(dfMt, Seq("id", "id"), Map(""->"val"))
val broadcastedMt = BroadcastUtils.broadcastMappingTable(localMt)
- val mappings = Seq(Mapping("id", "key2"), Mapping("id", "key2"))
+ val mappings = Seq(ErrorMessage.Mapping("id", "key2"), ErrorMessage.Mapping("id", "key2"))
val errorUdf2 = BroadcastUtils.getErrorUdf(broadcastedMt, Seq("val"), mappings)
@@ -552,7 +553,7 @@ class BroadcastUtilsSuite extends AnyWordSpec with TZNormalizedSparkTestBase wit
"3 UDF parameter is used" in {
val localMt = LocalMappingTable(dfMt, Seq("id", "id", "id"), Map(""->"val"))
val broadcastedMt = BroadcastUtils.broadcastMappingTable(localMt)
- val mappings = Seq(Mapping("id", "key2"), Mapping("id", "key2"), Mapping("id", "key2"))
+ val mappings = Seq(ErrorMessage.Mapping("id", "key2"), ErrorMessage.Mapping("id", "key2"), ErrorMessage.Mapping("id", "key2"))
val errorUdf3 = BroadcastUtils.getErrorUdf(broadcastedMt, Seq("val"), mappings)
@@ -567,7 +568,7 @@ class BroadcastUtilsSuite extends AnyWordSpec with TZNormalizedSparkTestBase wit
"4 UDF parameter is used" in {
val localMt = LocalMappingTable(dfMt, Seq("id", "id", "id", "id"), Map(""->"val"))
val broadcastedMt = BroadcastUtils.broadcastMappingTable(localMt)
- val mappings = Seq(Mapping("id", "key2"), Mapping("id", "key2"), Mapping("id", "key2"), Mapping("id", "key2"))
+ val mappings = Seq(ErrorMessage.Mapping("id", "key2"), ErrorMessage.Mapping("id", "key2"), ErrorMessage.Mapping("id", "key2"), ErrorMessage.Mapping("id", "key2"))
val errorUdf4 = BroadcastUtils.getErrorUdf(broadcastedMt, Seq("val"), mappings)
@@ -582,8 +583,8 @@ class BroadcastUtilsSuite extends AnyWordSpec with TZNormalizedSparkTestBase wit
"5 UDF parameter is used" in {
val localMt = LocalMappingTable(dfMt, Seq("id", "id", "id", "id", "id"), Map(""->"val"))
val broadcastedMt = BroadcastUtils.broadcastMappingTable(localMt)
- val mappings = Seq(Mapping("id", "key2"), Mapping("id", "key2"), Mapping("id", "key2"), Mapping("id", "key2"),
- Mapping("id", "key2"))
+ val mappings = Seq(ErrorMessage.Mapping("id", "key2"), ErrorMessage.Mapping("id", "key2"), ErrorMessage.Mapping("id", "key2"), ErrorMessage.Mapping("id", "key2"),
+ ErrorMessage.Mapping("id", "key2"))
val errorUdf5 = BroadcastUtils.getErrorUdf(broadcastedMt, Seq("val"), mappings)
diff --git a/utils/src/test/scala/za/co/absa/enceladus/utils/broadcast/ErrorColumn.scala b/utils/src/test/scala/za/co/absa/enceladus/utils/broadcast/ErrorColumn.scala
index 7ee4caa65..bbb5c3952 100644
--- a/utils/src/test/scala/za/co/absa/enceladus/utils/broadcast/ErrorColumn.scala
+++ b/utils/src/test/scala/za/co/absa/enceladus/utils/broadcast/ErrorColumn.scala
@@ -15,7 +15,8 @@
package za.co.absa.enceladus.utils.broadcast
-import za.co.absa.enceladus.utils.error.ErrorMessage
+import za.co.absa.spark.commons.errorhandling.ErrorMessage
+
/** This case class is used to extract error column from test dataframes. */
case class ErrorColumn(errCol: ErrorMessage)