diff --git a/plugins-builtin/src/main/scala/za/co/absa/enceladus/plugins/builtin/errorsender/mq/KafkaErrorSenderPluginImpl.scala b/plugins-builtin/src/main/scala/za/co/absa/enceladus/plugins/builtin/errorsender/mq/KafkaErrorSenderPluginImpl.scala index b25ba2237..5e2354d07 100644 --- a/plugins-builtin/src/main/scala/za/co/absa/enceladus/plugins/builtin/errorsender/mq/KafkaErrorSenderPluginImpl.scala +++ b/plugins-builtin/src/main/scala/za/co/absa/enceladus/plugins/builtin/errorsender/mq/KafkaErrorSenderPluginImpl.scala @@ -20,18 +20,18 @@ import org.apache.spark.sql.functions.{col, explode, lit, size, struct, typedLit import org.apache.spark.sql.types.DataTypes import org.apache.spark.sql.{Column, DataFrame, DataFrameWriter, Encoder, Encoders} import za.co.absa.enceladus.plugins.api.postprocessor.PostProcessor -import za.co.absa.enceladus.plugins.builtin.common.mq.kafka.{KafkaConnectionParams, KafkaSecurityParams, SchemaRegistrySecurityParams} +import za.co.absa.enceladus.plugins.builtin.common.mq.kafka.{KafkaConnectionParams, KafkaSecurityParams} import za.co.absa.enceladus.plugins.builtin.errorsender.DceError import za.co.absa.enceladus.plugins.builtin.errorsender.mq.KafkaErrorSenderPluginImpl.SingleErrorStardardized import KafkaErrorSenderPluginImpl._ import za.co.absa.enceladus.plugins.builtin.errorsender.mq.kafka.KafkaErrorSenderPlugin import za.co.absa.enceladus.plugins.builtin.errorsender.params.ErrorSenderPluginParams -import za.co.absa.enceladus.utils.error.ErrorMessage.ErrorCodes import za.co.absa.enceladus.utils.modules._ import za.co.absa.spark.commons.implicits.StructTypeImplicits.StructTypeEnhancements import za.co.absa.abris.avro.functions.to_avro -import za.co.absa.abris.config.{AbrisConfig, ToAvroConfig} +import za.co.absa.abris.config.ToAvroConfig import za.co.absa.enceladus.plugins.builtin.errorsender.mq.kafka.KafkaErrorSenderPlugin.{avroKeySchemaRegistryConfig, avroValueSchemaRegistryConfig, registerSchemas} +import za.co.absa.enceladus.utils.error.EnceladusErrorMessage.ErrorCodes import scala.util.{Failure, Success, Try} diff --git a/pom.xml b/pom.xml index f01ee01b7..cc57b5e6d 100644 --- a/pom.xml +++ b/pom.xml @@ -161,7 +161,7 @@ 2.4 6.2.0 1.1.0 - 0.4.0 + 0.5.0 3.9.0 2.7.3 3.5.4 diff --git a/spark-jobs/src/main/scala/za/co/absa/enceladus/common/ErrorColNormalization.scala b/spark-jobs/src/main/scala/za/co/absa/enceladus/common/ErrorColNormalization.scala index fad916543..c1bd033d0 100644 --- a/spark-jobs/src/main/scala/za/co/absa/enceladus/common/ErrorColNormalization.scala +++ b/spark-jobs/src/main/scala/za/co/absa/enceladus/common/ErrorColNormalization.scala @@ -17,7 +17,7 @@ package za.co.absa.enceladus.common import com.typesafe.config.Config import org.apache.spark.sql.DataFrame -import za.co.absa.enceladus.utils.error.ErrorMessage +import za.co.absa.spark.commons.errorhandling.ErrorMessage import za.co.absa.enceladus.utils.implicits.EnceladusDataFrameImplicits.EnceladusDataframeEnhancements object ErrorColNormalization { diff --git a/spark-jobs/src/main/scala/za/co/absa/enceladus/common/performance/PerformanceMetricTools.scala b/spark-jobs/src/main/scala/za/co/absa/enceladus/common/performance/PerformanceMetricTools.scala index 878d2f2be..82c013933 100644 --- a/spark-jobs/src/main/scala/za/co/absa/enceladus/common/performance/PerformanceMetricTools.scala +++ b/spark-jobs/src/main/scala/za/co/absa/enceladus/common/performance/PerformanceMetricTools.scala @@ -20,9 +20,9 @@ import org.apache.spark.sql.functions.{col, size, sum} import org.slf4j.{Logger, LoggerFactory} import za.co.absa.atum.core.Atum import za.co.absa.enceladus.utils.config.PathWithFs -import za.co.absa.enceladus.utils.error.ErrorMessage import za.co.absa.enceladus.utils.general.ProjectMetadata import za.co.absa.enceladus.utils.fs.HadoopFsUtils +import za.co.absa.spark.commons.errorhandling.ErrorMessage import za.co.absa.spark.commons.implicits.StructTypeImplicits.StructTypeEnhancements object PerformanceMetricTools extends ProjectMetadata { diff --git a/spark-jobs/src/main/scala/za/co/absa/enceladus/conformance/interpreter/DynamicInterpreter.scala b/spark-jobs/src/main/scala/za/co/absa/enceladus/conformance/interpreter/DynamicInterpreter.scala index cae1ea984..89754781c 100644 --- a/spark-jobs/src/main/scala/za/co/absa/enceladus/conformance/interpreter/DynamicInterpreter.scala +++ b/spark-jobs/src/main/scala/za/co/absa/enceladus/conformance/interpreter/DynamicInterpreter.scala @@ -28,19 +28,18 @@ import za.co.absa.enceladus.conformance.config.ConformanceConfigParser import za.co.absa.enceladus.conformance.datasource.PartitioningUtils import za.co.absa.enceladus.conformance.interpreter.rules._ import za.co.absa.enceladus.conformance.interpreter.rules.custom.CustomConformanceRule -import za.co.absa.enceladus.conformance.interpreter.rules.mapping.{MappingRuleInterpreter, MappingRuleInterpreterBroadcast, - MappingRuleInterpreterGroupExplode} +import za.co.absa.enceladus.conformance.interpreter.rules.mapping._ import za.co.absa.enceladus.dao.EnceladusDAO import za.co.absa.enceladus.model.conformanceRule._ import za.co.absa.enceladus.model.{Dataset => ConfDataset} import za.co.absa.enceladus.utils.config.PathWithFs -import za.co.absa.enceladus.utils.error.ErrorMessage import za.co.absa.enceladus.utils.fs.HadoopFsUtils import za.co.absa.enceladus.utils.udf.ConformanceUDFLibrary import za.co.absa.spark.commons.utils.explode.ExplosionContext import za.co.absa.spark.commons.implicits.StructTypeImplicits.StructTypeEnhancementsArrays import za.co.absa.spark.commons.implicits.DataFrameImplicits.DataFrameEnhancements import za.co.absa.commons.lang.extensions.SeqExtension._ +import za.co.absa.spark.commons.errorhandling.ErrorMessage case class DynamicInterpreter()(implicit inputFs: FileSystem) { private val log = LoggerFactory.getLogger(this.getClass) diff --git a/spark-jobs/src/main/scala/za/co/absa/enceladus/conformance/interpreter/rules/ArrayCollapseInterpreter.scala b/spark-jobs/src/main/scala/za/co/absa/enceladus/conformance/interpreter/rules/ArrayCollapseInterpreter.scala index e11acaa47..7cb3925a1 100644 --- a/spark-jobs/src/main/scala/za/co/absa/enceladus/conformance/interpreter/rules/ArrayCollapseInterpreter.scala +++ b/spark-jobs/src/main/scala/za/co/absa/enceladus/conformance/interpreter/rules/ArrayCollapseInterpreter.scala @@ -19,7 +19,7 @@ import org.apache.spark.sql.{Dataset, Row, SparkSession} import za.co.absa.enceladus.conformance.interpreter.{ExplosionState, InterpreterContextArgs} import za.co.absa.enceladus.dao.EnceladusDAO import za.co.absa.enceladus.model.conformanceRule.ConformanceRule -import za.co.absa.enceladus.utils.error.ErrorMessage +import za.co.absa.spark.commons.errorhandling.ErrorMessage import za.co.absa.spark.commons.utils.ExplodeTools import za.co.absa.spark.commons.utils.explode.ExplosionContext diff --git a/spark-jobs/src/main/scala/za/co/absa/enceladus/conformance/interpreter/rules/mapping/CommonMappingRuleInterpreter.scala b/spark-jobs/src/main/scala/za/co/absa/enceladus/conformance/interpreter/rules/mapping/CommonMappingRuleInterpreter.scala index c92d264bb..75d757d8c 100644 --- a/spark-jobs/src/main/scala/za/co/absa/enceladus/conformance/interpreter/rules/mapping/CommonMappingRuleInterpreter.scala +++ b/spark-jobs/src/main/scala/za/co/absa/enceladus/conformance/interpreter/rules/mapping/CommonMappingRuleInterpreter.scala @@ -27,8 +27,8 @@ import za.co.absa.enceladus.model.MappingTable import za.co.absa.enceladus.model.conformanceRule.MappingConformanceRule import za.co.absa.enceladus.model.dataFrameFilter.DataFrameFilter import za.co.absa.enceladus.conformance.interpreter.rules.ValidationException -import za.co.absa.enceladus.utils.error.Mapping import za.co.absa.enceladus.utils.validation.ExpressionValidator +import za.co.absa.spark.commons.errorhandling.ErrorMessage.Mapping import za.co.absa.spark.commons.implicits.StructTypeImplicits.StructTypeEnhancements import scala.util.Try diff --git a/spark-jobs/src/main/scala/za/co/absa/enceladus/conformance/interpreter/rules/mapping/MappingRuleInterpreter.scala b/spark-jobs/src/main/scala/za/co/absa/enceladus/conformance/interpreter/rules/mapping/MappingRuleInterpreter.scala index 0fb2661a3..735b67ce9 100644 --- a/spark-jobs/src/main/scala/za/co/absa/enceladus/conformance/interpreter/rules/mapping/MappingRuleInterpreter.scala +++ b/spark-jobs/src/main/scala/za/co/absa/enceladus/conformance/interpreter/rules/mapping/MappingRuleInterpreter.scala @@ -24,9 +24,9 @@ import za.co.absa.enceladus.conformance.interpreter.{ExplosionState, Interpreter import za.co.absa.enceladus.dao.EnceladusDAO import za.co.absa.enceladus.model.conformanceRule.{ConformanceRule, MappingConformanceRule} import za.co.absa.enceladus.model.{Dataset => ConfDataset} -import za.co.absa.enceladus.utils.error._ import za.co.absa.enceladus.utils.transformations.ArrayTransformations import za.co.absa.enceladus.utils.udf.ConformanceUDFNames +import za.co.absa.spark.commons.errorhandling.ErrorMessage import za.co.absa.spark.commons.implicits.StructTypeImplicits.StructTypeEnhancementsArrays import za.co.absa.spark.commons.sql.functions.col_of_path @@ -52,7 +52,7 @@ case class MappingRuleInterpreter(rule: MappingConformanceRule, conformance: Con val res = handleArrays(rule.outputColumn, withUniqueId) { dfIn => val joined = joinDatasetAndMappingTable(mapTable, dfIn) - val mappings = rule.attributeMappings.map(x => Mapping(x._1, x._2)).toSeq + val mappings = rule.attributeMappings.map(x => ErrorMessage.Mapping(x._1, x._2)).toSeq val mappingErrUdfCall = call_udf(ConformanceUDFNames.confMappingErr, lit(rule.outputColumn), array(rule.attributeMappings.values.toSeq.map(col_of_path(_).cast(StringType)): _*), typedLit(mappings)) @@ -94,7 +94,7 @@ case class MappingRuleInterpreter(rule: MappingConformanceRule, conformance: Con .select($"conf.*", col(s"err.${ErrorMessage.errorColumnName}")).drop(idField) } - private def inclErrorNullArr(mappings: Seq[Mapping], schema: StructType): Column = { + private def inclErrorNullArr(mappings: Seq[ErrorMessage.Mapping], schema: StructType): Column = { val paths = mappings.flatMap { mapping => schema.getAllArraysInPath(mapping.mappedDatasetColumn) } diff --git a/spark-jobs/src/main/scala/za/co/absa/enceladus/conformance/interpreter/rules/mapping/MappingRuleInterpreterBroadcast.scala b/spark-jobs/src/main/scala/za/co/absa/enceladus/conformance/interpreter/rules/mapping/MappingRuleInterpreterBroadcast.scala index 0a7b4b4df..ed026b84d 100644 --- a/spark-jobs/src/main/scala/za/co/absa/enceladus/conformance/interpreter/rules/mapping/MappingRuleInterpreterBroadcast.scala +++ b/spark-jobs/src/main/scala/za/co/absa/enceladus/conformance/interpreter/rules/mapping/MappingRuleInterpreterBroadcast.scala @@ -23,7 +23,7 @@ import za.co.absa.enceladus.dao.EnceladusDAO import za.co.absa.enceladus.model.conformanceRule.{ConformanceRule, MappingConformanceRule} import za.co.absa.enceladus.model.{Dataset => ConfDataset} import za.co.absa.enceladus.utils.broadcast.{BroadcastUtils, LocalMappingTable} -import za.co.absa.enceladus.utils.error.ErrorMessage +import za.co.absa.spark.commons.errorhandling.ErrorMessage import za.co.absa.spark.hats.transformations.NestedArrayTransformations import za.co.absa.spark.hats.transformations.NestedArrayTransformations.GetFieldFunction diff --git a/spark-jobs/src/main/scala/za/co/absa/enceladus/conformance/interpreter/rules/mapping/MappingRuleInterpreterGroupExplode.scala b/spark-jobs/src/main/scala/za/co/absa/enceladus/conformance/interpreter/rules/mapping/MappingRuleInterpreterGroupExplode.scala index 2b4d69758..ec9d7312c 100644 --- a/spark-jobs/src/main/scala/za/co/absa/enceladus/conformance/interpreter/rules/mapping/MappingRuleInterpreterGroupExplode.scala +++ b/spark-jobs/src/main/scala/za/co/absa/enceladus/conformance/interpreter/rules/mapping/MappingRuleInterpreterGroupExplode.scala @@ -23,8 +23,8 @@ import za.co.absa.enceladus.conformance.interpreter.{ExplosionState, Interpreter import za.co.absa.enceladus.dao.EnceladusDAO import za.co.absa.enceladus.model.conformanceRule.{ConformanceRule, MappingConformanceRule} import za.co.absa.enceladus.model.{Dataset => ConfDataset} -import za.co.absa.enceladus.utils.error._ import za.co.absa.enceladus.utils.udf.ConformanceUDFNames +import za.co.absa.spark.commons.errorhandling.ErrorMessage import za.co.absa.spark.commons.sql.functions.col_of_path import za.co.absa.spark.commons.utils.explode.ExplosionContext import za.co.absa.spark.commons.utils.{ExplodeTools, SchemaUtils} @@ -46,7 +46,7 @@ case class MappingRuleInterpreterGroupExplode(rule: MappingConformanceRule, val (mapTable, defaultValues) = conformPreparation(df, enableCrossJoin = true) val (explodedDf, expCtx) = explodeIfNeeded(df, explosionState) - val mappings = rule.attributeMappings.map(x => Mapping(x._1, x._2)).toSeq + val mappings = rule.attributeMappings.map(x => ErrorMessage.Mapping(x._1, x._2)).toSeq val mappingErrUdfCall = call_udf(ConformanceUDFNames.confMappingErr, lit(rule.allOutputColumns().keys.mkString(",")), array(rule.attributeMappings.values.toSeq.map(col_of_path(_).cast(StringType)): _*), typedLit(mappings)) diff --git a/spark-jobs/src/test/scala/za/co/absa/enceladus/common/error/ErrorMessageFactory.scala b/spark-jobs/src/test/scala/za/co/absa/enceladus/common/error/ErrorMessageFactory.scala deleted file mode 100644 index a50869fa7..000000000 --- a/spark-jobs/src/test/scala/za/co/absa/enceladus/common/error/ErrorMessageFactory.scala +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Copyright 2018 ABSA Group Limited - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package za.co.absa.enceladus.common.error - -object ErrorMessageFactory { - def errColSchema(nullable: Boolean): String = s"\n |-- errCol: array (nullable = $nullable)\n"+ - " | |-- element: struct (containsNull = false)\n"+ - " | | |-- errType: string (nullable = true)\n"+ - " | | |-- errCode: string (nullable = true)\n"+ - " | | |-- errMsg: string (nullable = true)\n"+ - " | | |-- errCol: string (nullable = true)\n"+ - " | | |-- rawValues: array (nullable = true)\n"+ - " | | | |-- element: string (containsNull = true)\n"+ - " | | |-- mappings: array (nullable = true)\n"+ - " | | | |-- element: struct (containsNull = true)\n"+ - " | | | | |-- mappingTableColumn: string (nullable = true)\n"+ - " | | | | |-- mappedDatasetColumn: string (nullable = true)\n" - - def attachErrColToSchemaPrint(nullable: Boolean, schemaPrint: String): String = { - schemaPrint + errColSchema(nullable) - } -} diff --git a/spark-jobs/src/test/scala/za/co/absa/enceladus/conformance/interpreter/rules/custom/CustomRuleSuite.scala b/spark-jobs/src/test/scala/za/co/absa/enceladus/conformance/interpreter/rules/custom/CustomRuleSuite.scala index 8693c2fb3..2565267fa 100644 --- a/spark-jobs/src/test/scala/za/co/absa/enceladus/conformance/interpreter/rules/custom/CustomRuleSuite.scala +++ b/spark-jobs/src/test/scala/za/co/absa/enceladus/conformance/interpreter/rules/custom/CustomRuleSuite.scala @@ -25,8 +25,8 @@ import za.co.absa.enceladus.conformance.interpreter.{DynamicInterpreter, Explosi import za.co.absa.enceladus.dao.EnceladusDAO import za.co.absa.enceladus.model.conformanceRule.ConformanceRule import za.co.absa.enceladus.model.{conformanceRule, Dataset => ConfDataset} -import za.co.absa.enceladus.utils.error.ErrorMessage import za.co.absa.enceladus.utils.testUtils.{HadoopFsTestBase, TZNormalizedSparkTestBase} +import za.co.absa.spark.commons.errorhandling.ErrorMessage case class MyCustomRule( order: Int, diff --git a/spark-jobs/src/test/scala/za/co/absa/enceladus/conformance/interpreter/rules/mapping/MappingGroupExplodeSuite.scala b/spark-jobs/src/test/scala/za/co/absa/enceladus/conformance/interpreter/rules/mapping/MappingGroupExplodeSuite.scala index a5540f7db..928eb09c7 100644 --- a/spark-jobs/src/test/scala/za/co/absa/enceladus/conformance/interpreter/rules/mapping/MappingGroupExplodeSuite.scala +++ b/spark-jobs/src/test/scala/za/co/absa/enceladus/conformance/interpreter/rules/mapping/MappingGroupExplodeSuite.scala @@ -18,8 +18,8 @@ package za.co.absa.enceladus.conformance.interpreter.rules.mapping import org.apache.spark.sql.functions.{array, typedLit} import za.co.absa.enceladus.conformance.interpreter.DynamicInterpreter import za.co.absa.enceladus.conformance.interpreter.rules.testcasefactories.NestedTestCaseFactory._ -import za.co.absa.enceladus.conformance.interpreter.rules.testcasefactories.SimpleTestCaseFactory.{simpleMappingRule, simpleMappingRuleMultipleOutputs, simpleMappingRuleMultipleOutputsWithDefaults, simpleMappingRuleWithDefaultValue} -import za.co.absa.enceladus.utils.error.ErrorMessage +import za.co.absa.enceladus.conformance.interpreter.rules.testcasefactories.SimpleTestCaseFactory._ +import za.co.absa.spark.commons.errorhandling.ErrorMessage import za.co.absa.spark.commons.utils.JsonUtils import za.co.absa.spark.commons.implicits.DataFrameImplicits.DataFrameEnhancements diff --git a/spark-jobs/src/test/scala/za/co/absa/enceladus/conformance/interpreter/rules/mapping/MappingRuleBroadcastSuite.scala b/spark-jobs/src/test/scala/za/co/absa/enceladus/conformance/interpreter/rules/mapping/MappingRuleBroadcastSuite.scala index 3b3b1d83d..b54fd2dd5 100644 --- a/spark-jobs/src/test/scala/za/co/absa/enceladus/conformance/interpreter/rules/mapping/MappingRuleBroadcastSuite.scala +++ b/spark-jobs/src/test/scala/za/co/absa/enceladus/conformance/interpreter/rules/mapping/MappingRuleBroadcastSuite.scala @@ -19,10 +19,9 @@ import org.apache.spark.sql.functions._ import za.co.absa.enceladus.conformance.interpreter.DynamicInterpreter import za.co.absa.enceladus.conformance.interpreter.rules.testcasefactories.NestedTestCaseFactory._ import za.co.absa.enceladus.conformance.interpreter.rules.testcasefactories.SimpleTestCaseFactory._ -import za.co.absa.enceladus.utils.error.ErrorMessage +import za.co.absa.spark.commons.errorhandling.ErrorMessage import za.co.absa.spark.commons.utils.JsonUtils import za.co.absa.spark.commons.implicits.DataFrameImplicits.DataFrameEnhancements -import za.co.absa.spark.commons.implicits.DataFrameImplicits.DataFrameEnhancements class MappingRuleBroadcastSuite extends MappingInterpreterSuite { diff --git a/spark-jobs/src/test/scala/za/co/absa/enceladus/conformance/samples/ArraySamples.scala b/spark-jobs/src/test/scala/za/co/absa/enceladus/conformance/samples/ArraySamples.scala index c3b76ab46..272b228aa 100644 --- a/spark-jobs/src/test/scala/za/co/absa/enceladus/conformance/samples/ArraySamples.scala +++ b/spark-jobs/src/test/scala/za/co/absa/enceladus/conformance/samples/ArraySamples.scala @@ -17,7 +17,8 @@ package za.co.absa.enceladus.conformance.samples import za.co.absa.enceladus.model.conformanceRule._ import za.co.absa.enceladus.model.{MappingTable, Dataset => ConfDataset} -import za.co.absa.enceladus.utils.error._ +import za.co.absa.enceladus.utils.error.EnceladusErrorMessage +import za.co.absa.spark.commons.errorhandling.ErrorMessage case class Outer(order: Int, a: Seq[Inner], myFlag: Boolean) case class OuterErr(order: Int, a: Seq[Inner], myFlag: Boolean, errCol: Seq[ErrorMessage]) @@ -142,7 +143,7 @@ object ArraySamples { ConformedInner2(2, "two", "twoDrop me :)", "Hello world", new ConformedInner3("Hello world"), "myConf", "HELLO WORLD") )), ConformedInner(Seq()), - ConformedInner(null)), Seq(ErrorMessage.confMappingErr("a.c.conformedD", List("0", "true"), List(Mapping("ind", "a.c.d"), Mapping("otherFlag", "myFlag"))))), + ConformedInner(null)), Seq(EnceladusErrorMessage.confMappingErr("a.c.conformedD", List("0", "true"), List(ErrorMessage.Mapping("ind", "a.c.d"), ErrorMessage.Mapping("otherFlag", "myFlag"))))), ConformedOuter(2, Seq(), Seq()), ConformedOuter(3, null, Seq())) } diff --git a/spark-jobs/src/test/scala/za/co/absa/enceladus/conformance/samples/EmployeeConformance.scala b/spark-jobs/src/test/scala/za/co/absa/enceladus/conformance/samples/EmployeeConformance.scala index 5b8dda503..ff4e40545 100644 --- a/spark-jobs/src/test/scala/za/co/absa/enceladus/conformance/samples/EmployeeConformance.scala +++ b/spark-jobs/src/test/scala/za/co/absa/enceladus/conformance/samples/EmployeeConformance.scala @@ -17,7 +17,8 @@ package za.co.absa.enceladus.conformance.samples import za.co.absa.enceladus.model.conformanceRule._ import za.co.absa.enceladus.model.{Dataset, DefaultValue, MappingTable} -import za.co.absa.enceladus.utils.error.{ErrorMessage, Mapping} +import za.co.absa.enceladus.utils.error.EnceladusErrorMessage +import za.co.absa.spark.commons.errorhandling.ErrorMessage object EmployeeConformance { val countryMT = MappingTable(name = "country", version = 0, hdfsPath = "src/test/testData/country", schemaName = "country", schemaVersion = 0) @@ -65,8 +66,8 @@ object EmployeeConformance { ConformedRole(1), ConformedEmployeeId = "2"), ConformedEmployee(employee_id = 3, name = "John", surname = "Doe3", dept= 3, role = 2, country = "SWE", conformed_country = null, conformed_department = "Unknown dept", conformed_role = "External dev", errCol= List( - ErrorMessage.confMappingErr("conformed_country", Seq("SWE"), Seq(Mapping("country_code", "country"))), - ErrorMessage.confMappingErr("conformed_department", Seq("3"), Seq(Mapping("dept_id", "dept"))) + EnceladusErrorMessage.confMappingErr("conformed_country", Seq("SWE"), Seq(ErrorMessage.Mapping("country_code", "country"))), + EnceladusErrorMessage.confMappingErr("conformed_department", Seq("3"), Seq(ErrorMessage.Mapping("dept_id", "dept"))) ), MyLiteral = "abcdef", MyUpperLiteral = "ABCDEF", Concatenated = "abcdefABCDEF", SparkConfAttr = "hello :)", ConformedRole(2), ConformedEmployeeId = "3"), ConformedEmployee(employee_id = 4, name = "John", surname = "Doe4", dept= 1, role = 2, country = "IN", conformed_country = "India", conformed_department = "Ingestion Squad", conformed_role = "Ingestion Developer", errCol= List(), MyLiteral = "abcdef", MyUpperLiteral = "ABCDEF", Concatenated = "abcdefABCDEF", SparkConfAttr = "hello :)", diff --git a/spark-jobs/src/test/scala/za/co/absa/enceladus/standardization/StandardizationRerunSuite.scala b/spark-jobs/src/test/scala/za/co/absa/enceladus/standardization/StandardizationRerunSuite.scala index 14e9db954..8d0ffacb8 100644 --- a/spark-jobs/src/test/scala/za/co/absa/enceladus/standardization/StandardizationRerunSuite.scala +++ b/spark-jobs/src/test/scala/za/co/absa/enceladus/standardization/StandardizationRerunSuite.scala @@ -26,8 +26,8 @@ import za.co.absa.enceladus.dao.EnceladusDAO import za.co.absa.enceladus.model.Dataset import za.co.absa.enceladus.standardization.config.StandardizationConfig import za.co.absa.enceladus.standardization.fixtures.TempFileFixture -import za.co.absa.enceladus.utils.error.ErrorMessage import za.co.absa.enceladus.utils.testUtils.TZNormalizedSparkTestBase +import za.co.absa.spark.commons.errorhandling.ErrorMessage import za.co.absa.standardization.ValidationException import za.co.absa.standardization.{RecordIdGeneration, Standardization} import za.co.absa.standardization.config.{BasicMetadataColumnsConfig, BasicStandardizationConfig} @@ -47,6 +47,7 @@ class StandardizationRerunSuite extends FixtureAnyFunSuite with TZNormalizedSpar private val tmpFilePrefix = "test-input-" private val tmpFileSuffix = ".csv" + private val csvContent: String = """101|102|1|2019-05-04|2019-05-04 |201|202|2|2019-05-05|2019-05-05 diff --git a/utils/src/main/scala/za/co/absa/enceladus/utils/broadcast/BroadcastUtils.scala b/utils/src/main/scala/za/co/absa/enceladus/utils/broadcast/BroadcastUtils.scala index 3398d2d85..2e8e2ef81 100644 --- a/utils/src/main/scala/za/co/absa/enceladus/utils/broadcast/BroadcastUtils.scala +++ b/utils/src/main/scala/za/co/absa/enceladus/utils/broadcast/BroadcastUtils.scala @@ -22,7 +22,8 @@ import org.apache.spark.sql.expressions.UserDefinedFunction import org.apache.spark.sql.functions.{expr, udf} import org.apache.spark.sql.types.DataType import org.apache.spark.sql.{Row, SparkSession} -import za.co.absa.enceladus.utils.error.{ErrorMessage, Mapping} +import za.co.absa.enceladus.utils.error.EnceladusErrorMessage +import za.co.absa.spark.commons.errorhandling.ErrorMessage object BroadcastUtils { // scalastyle:off null @@ -108,7 +109,7 @@ object BroadcastUtils { */ def getErrorUdf(mappingTable: Broadcast[LocalMappingTable], outputColumns: Seq[String], - mappings: Seq[Mapping])(implicit spark: SparkSession): UserDefinedFunction = { + mappings: Seq[ErrorMessage.Mapping])(implicit spark: SparkSession): UserDefinedFunction = { val numberOfArguments = mappingTable.value.keyTypes.size @@ -117,7 +118,7 @@ object BroadcastUtils { null } else { val strings: Seq[String] = key.map(a => safeToString(a)) - ErrorMessage.confMappingErr(outputColumns.mkString(","), strings, mappings) + EnceladusErrorMessage.confMappingErr(outputColumns.mkString(","), strings, mappings) } } val errorMessageType = ScalaReflection.schemaFor[ErrorMessage].dataType diff --git a/utils/src/main/scala/za/co/absa/enceladus/utils/error/ErrorMessage.scala b/utils/src/main/scala/za/co/absa/enceladus/utils/error/EnceladusErrorMessage.scala similarity index 88% rename from utils/src/main/scala/za/co/absa/enceladus/utils/error/ErrorMessage.scala rename to utils/src/main/scala/za/co/absa/enceladus/utils/error/EnceladusErrorMessage.scala index 3598f4c45..e606ffa94 100644 --- a/utils/src/main/scala/za/co/absa/enceladus/utils/error/ErrorMessage.scala +++ b/utils/src/main/scala/za/co/absa/enceladus/utils/error/EnceladusErrorMessage.scala @@ -18,6 +18,7 @@ package za.co.absa.enceladus.utils.error import org.apache.spark.sql.SparkSession import org.apache.spark.sql.types.StructType import za.co.absa.standardization.config.DefaultErrorCodesConfig +import za.co.absa.spark.commons.errorhandling.ErrorMessage /** * Case class to represent an error message @@ -29,13 +30,13 @@ import za.co.absa.standardization.config.DefaultErrorCodesConfig * @param rawValues - Sequence of raw values (which are the potential culprits of the error) * @param mappings - Sequence of Mappings i.e Mapping Table Column -> Equivalent Mapped Dataset column */ -case class ErrorMessage(errType: String, errCode: String, errMsg: String, errCol: String, rawValues: Seq[String], mappings: Seq[Mapping] = Seq()) -case class Mapping(mappingTableColumn: String, mappedDatasetColumn: String) +//case class ErrorMessage(errType: String, errCode: String, errMsg: String, errCol: String, rawValues: Seq[String], mappings: Seq[Mapping] = Seq()) +//case class Mapping(mappingTableColumn: String, mappedDatasetColumn: String) -object ErrorMessage { - val errorColumnName = "errCol" +object EnceladusErrorMessage { +// val errorColumnName = "errCol" - def confMappingErr(errCol: String, rawValues: Seq[String], mappings: Seq[Mapping]): ErrorMessage = ErrorMessage( + def confMappingErr(errCol: String, rawValues: Seq[String], mappings: Seq[ErrorMessage.Mapping]): ErrorMessage = ErrorMessage( errType = "confMapError", errCode = ErrorCodes.ConfMapError, errMsg = "Conformance Error - Null produced by mapping conformance rule", diff --git a/utils/src/main/scala/za/co/absa/enceladus/utils/schema/SparkUtils.scala b/utils/src/main/scala/za/co/absa/enceladus/utils/schema/SparkUtils.scala index 2deb77912..17f8b5b77 100644 --- a/utils/src/main/scala/za/co/absa/enceladus/utils/schema/SparkUtils.scala +++ b/utils/src/main/scala/za/co/absa/enceladus/utils/schema/SparkUtils.scala @@ -18,8 +18,8 @@ package za.co.absa.enceladus.utils.schema import org.apache.spark.sql.functions._ import org.apache.spark.sql.types.StructType import org.apache.spark.sql.{Column, DataFrame, SparkSession} -import za.co.absa.enceladus.utils.error.ErrorMessage import za.co.absa.enceladus.utils.udf.ConformanceUDFLibrary +import za.co.absa.spark.commons.errorhandling.ErrorMessage import za.co.absa.spark.commons.implicits.StructTypeImplicits.StructTypeEnhancements import za.co.absa.spark.commons.implicits.DataFrameImplicits.DataFrameEnhancements import za.co.absa.spark.hats.transformations.NestedArrayTransformations diff --git a/utils/src/main/scala/za/co/absa/enceladus/utils/udf/ConformanceUDFLibrary.scala b/utils/src/main/scala/za/co/absa/enceladus/utils/udf/ConformanceUDFLibrary.scala index b081770be..188821188 100644 --- a/utils/src/main/scala/za/co/absa/enceladus/utils/udf/ConformanceUDFLibrary.scala +++ b/utils/src/main/scala/za/co/absa/enceladus/utils/udf/ConformanceUDFLibrary.scala @@ -18,7 +18,8 @@ package za.co.absa.enceladus.utils.udf import org.apache.spark.sql.{Row, SparkSession} import org.apache.spark.sql.api.java.UDF2 import org.apache.spark.sql.types.ArrayType -import za.co.absa.enceladus.utils.error.{ErrorMessage, Mapping} +import za.co.absa.enceladus.utils.error.EnceladusErrorMessage +import za.co.absa.spark.commons.errorhandling.ErrorMessage import za.co.absa.enceladus.utils.udf.ConformanceUDFNames._ import za.co.absa.spark.commons.OncePerSparkSession @@ -28,20 +29,20 @@ import scala.collection.mutable class ConformanceUDFLibrary()(implicit sparkToRegisterTo: SparkSession) extends OncePerSparkSession { override protected def register(implicit spark: SparkSession): Unit = { - spark.udf.register(confMappingErr, { (errCol: String, rawValues: Seq[String], mappings: Seq[Mapping]) => - ErrorMessage.confMappingErr(errCol, rawValues, mappings) + spark.udf.register(confMappingErr, { (errCol: String, rawValues: Seq[String], mappings: Seq[ErrorMessage.Mapping]) => + EnceladusErrorMessage.confMappingErr(errCol, rawValues, mappings) }) spark.udf.register(confCastErr, { (errCol: String, rawValue: String) => - ErrorMessage.confCastErr(errCol, rawValue) + EnceladusErrorMessage.confCastErr(errCol, rawValue) }) spark.udf.register(confNegErr, { (errCol: String, rawValue: String) => - ErrorMessage.confNegErr(errCol, rawValue) + EnceladusErrorMessage.confNegErr(errCol, rawValue) }) spark.udf.register(confLitErr, { (errCol: String, rawValue: String) => - ErrorMessage.confLitErr(errCol, rawValue) + EnceladusErrorMessage.confLitErr(errCol, rawValue) }) spark.udf.register(arrayDistinctErrors, // this UDF is registered for _spark-hats_ library sake diff --git a/utils/src/test/scala/za/co/absa/enceladus/utils/broadcast/BroadcastUtilsSuite.scala b/utils/src/test/scala/za/co/absa/enceladus/utils/broadcast/BroadcastUtilsSuite.scala index 2578e8b1b..25c444f81 100644 --- a/utils/src/test/scala/za/co/absa/enceladus/utils/broadcast/BroadcastUtilsSuite.scala +++ b/utils/src/test/scala/za/co/absa/enceladus/utils/broadcast/BroadcastUtilsSuite.scala @@ -18,8 +18,9 @@ package za.co.absa.enceladus.utils.broadcast import org.apache.spark.sql.functions._ import org.apache.spark.sql.{DataFrame, Row} import org.scalatest.wordspec.AnyWordSpec -import za.co.absa.enceladus.utils.error.Mapping +import za.co.absa.enceladus.utils.error.EnceladusErrorMessage import za.co.absa.enceladus.utils.testUtils.{LoggerTestBase, TZNormalizedSparkTestBase} +import za.co.absa.spark.commons.errorhandling.ErrorMessage import scala.collection.mutable @@ -522,7 +523,7 @@ class BroadcastUtilsSuite extends AnyWordSpec with TZNormalizedSparkTestBase wit val localMt = LocalMappingTable(dfMt, Seq("id"), Map(""->"val")) val broadcastedMt = BroadcastUtils.broadcastMappingTable(localMt) - val mappings = Seq(Mapping("id", "key2")) + val mappings = Seq(ErrorMessage.Mapping("id", "key2")) val errorUdf1 = BroadcastUtils.getErrorUdf(broadcastedMt, Seq("val"), mappings) @@ -540,7 +541,7 @@ class BroadcastUtilsSuite extends AnyWordSpec with TZNormalizedSparkTestBase wit val localMt = LocalMappingTable(dfMt, Seq("id", "id"), Map(""->"val")) val broadcastedMt = BroadcastUtils.broadcastMappingTable(localMt) - val mappings = Seq(Mapping("id", "key2"), Mapping("id", "key2")) + val mappings = Seq(ErrorMessage.Mapping("id", "key2"), ErrorMessage.Mapping("id", "key2")) val errorUdf2 = BroadcastUtils.getErrorUdf(broadcastedMt, Seq("val"), mappings) @@ -552,7 +553,7 @@ class BroadcastUtilsSuite extends AnyWordSpec with TZNormalizedSparkTestBase wit "3 UDF parameter is used" in { val localMt = LocalMappingTable(dfMt, Seq("id", "id", "id"), Map(""->"val")) val broadcastedMt = BroadcastUtils.broadcastMappingTable(localMt) - val mappings = Seq(Mapping("id", "key2"), Mapping("id", "key2"), Mapping("id", "key2")) + val mappings = Seq(ErrorMessage.Mapping("id", "key2"), ErrorMessage.Mapping("id", "key2"), ErrorMessage.Mapping("id", "key2")) val errorUdf3 = BroadcastUtils.getErrorUdf(broadcastedMt, Seq("val"), mappings) @@ -567,7 +568,7 @@ class BroadcastUtilsSuite extends AnyWordSpec with TZNormalizedSparkTestBase wit "4 UDF parameter is used" in { val localMt = LocalMappingTable(dfMt, Seq("id", "id", "id", "id"), Map(""->"val")) val broadcastedMt = BroadcastUtils.broadcastMappingTable(localMt) - val mappings = Seq(Mapping("id", "key2"), Mapping("id", "key2"), Mapping("id", "key2"), Mapping("id", "key2")) + val mappings = Seq(ErrorMessage.Mapping("id", "key2"), ErrorMessage.Mapping("id", "key2"), ErrorMessage.Mapping("id", "key2"), ErrorMessage.Mapping("id", "key2")) val errorUdf4 = BroadcastUtils.getErrorUdf(broadcastedMt, Seq("val"), mappings) @@ -582,8 +583,8 @@ class BroadcastUtilsSuite extends AnyWordSpec with TZNormalizedSparkTestBase wit "5 UDF parameter is used" in { val localMt = LocalMappingTable(dfMt, Seq("id", "id", "id", "id", "id"), Map(""->"val")) val broadcastedMt = BroadcastUtils.broadcastMappingTable(localMt) - val mappings = Seq(Mapping("id", "key2"), Mapping("id", "key2"), Mapping("id", "key2"), Mapping("id", "key2"), - Mapping("id", "key2")) + val mappings = Seq(ErrorMessage.Mapping("id", "key2"), ErrorMessage.Mapping("id", "key2"), ErrorMessage.Mapping("id", "key2"), ErrorMessage.Mapping("id", "key2"), + ErrorMessage.Mapping("id", "key2")) val errorUdf5 = BroadcastUtils.getErrorUdf(broadcastedMt, Seq("val"), mappings) diff --git a/utils/src/test/scala/za/co/absa/enceladus/utils/broadcast/ErrorColumn.scala b/utils/src/test/scala/za/co/absa/enceladus/utils/broadcast/ErrorColumn.scala index 7ee4caa65..bbb5c3952 100644 --- a/utils/src/test/scala/za/co/absa/enceladus/utils/broadcast/ErrorColumn.scala +++ b/utils/src/test/scala/za/co/absa/enceladus/utils/broadcast/ErrorColumn.scala @@ -15,7 +15,8 @@ package za.co.absa.enceladus.utils.broadcast -import za.co.absa.enceladus.utils.error.ErrorMessage +import za.co.absa.spark.commons.errorhandling.ErrorMessage + /** This case class is used to extract error column from test dataframes. */ case class ErrorColumn(errCol: ErrorMessage)