From eff8ef0eb7cba084665004ca2b4fb19beabe64eb Mon Sep 17 00:00:00 2001 From: AB019TC Date: Fri, 24 Feb 2023 11:34:35 +0200 Subject: [PATCH 1/7] Fixes #2170 - Replaced ErrorMessages and Mapping with spark-common ErrorMessages and Mapping --- pom.xml | 2 +- .../enceladus/utils/broadcast/BroadcastUtils.scala | 7 ++++--- ...rorMessage.scala => EnceladusErrorMessage.scala} | 11 ++++++----- .../co/absa/enceladus/utils/schema/SparkUtils.scala | 2 +- .../enceladus/utils/udf/ConformanceUDFLibrary.scala | 13 +++++++------ 5 files changed, 19 insertions(+), 16 deletions(-) rename utils/src/main/scala/za/co/absa/enceladus/utils/error/{ErrorMessage.scala => EnceladusErrorMessage.scala} (88%) diff --git a/pom.xml b/pom.xml index b6752234b..b14e453ae 100644 --- a/pom.xml +++ b/pom.xml @@ -161,7 +161,7 @@ 2.4 6.2.0 1.1.0 - 0.4.0 + 0.5.0 3.9.0 2.7.3 3.5.4 diff --git a/utils/src/main/scala/za/co/absa/enceladus/utils/broadcast/BroadcastUtils.scala b/utils/src/main/scala/za/co/absa/enceladus/utils/broadcast/BroadcastUtils.scala index 3398d2d85..2e8e2ef81 100644 --- a/utils/src/main/scala/za/co/absa/enceladus/utils/broadcast/BroadcastUtils.scala +++ b/utils/src/main/scala/za/co/absa/enceladus/utils/broadcast/BroadcastUtils.scala @@ -22,7 +22,8 @@ import org.apache.spark.sql.expressions.UserDefinedFunction import org.apache.spark.sql.functions.{expr, udf} import org.apache.spark.sql.types.DataType import org.apache.spark.sql.{Row, SparkSession} -import za.co.absa.enceladus.utils.error.{ErrorMessage, Mapping} +import za.co.absa.enceladus.utils.error.EnceladusErrorMessage +import za.co.absa.spark.commons.errorhandling.ErrorMessage object BroadcastUtils { // scalastyle:off null @@ -108,7 +109,7 @@ object BroadcastUtils { */ def getErrorUdf(mappingTable: Broadcast[LocalMappingTable], outputColumns: Seq[String], - mappings: Seq[Mapping])(implicit spark: SparkSession): UserDefinedFunction = { + mappings: Seq[ErrorMessage.Mapping])(implicit spark: SparkSession): UserDefinedFunction = { val numberOfArguments = mappingTable.value.keyTypes.size @@ -117,7 +118,7 @@ object BroadcastUtils { null } else { val strings: Seq[String] = key.map(a => safeToString(a)) - ErrorMessage.confMappingErr(outputColumns.mkString(","), strings, mappings) + EnceladusErrorMessage.confMappingErr(outputColumns.mkString(","), strings, mappings) } } val errorMessageType = ScalaReflection.schemaFor[ErrorMessage].dataType diff --git a/utils/src/main/scala/za/co/absa/enceladus/utils/error/ErrorMessage.scala b/utils/src/main/scala/za/co/absa/enceladus/utils/error/EnceladusErrorMessage.scala similarity index 88% rename from utils/src/main/scala/za/co/absa/enceladus/utils/error/ErrorMessage.scala rename to utils/src/main/scala/za/co/absa/enceladus/utils/error/EnceladusErrorMessage.scala index 3598f4c45..e606ffa94 100644 --- a/utils/src/main/scala/za/co/absa/enceladus/utils/error/ErrorMessage.scala +++ b/utils/src/main/scala/za/co/absa/enceladus/utils/error/EnceladusErrorMessage.scala @@ -18,6 +18,7 @@ package za.co.absa.enceladus.utils.error import org.apache.spark.sql.SparkSession import org.apache.spark.sql.types.StructType import za.co.absa.standardization.config.DefaultErrorCodesConfig +import za.co.absa.spark.commons.errorhandling.ErrorMessage /** * Case class to represent an error message @@ -29,13 +30,13 @@ import za.co.absa.standardization.config.DefaultErrorCodesConfig * @param rawValues - Sequence of raw values (which are the potential culprits of the error) * @param mappings - Sequence of Mappings i.e Mapping Table Column -> Equivalent Mapped Dataset column */ -case class ErrorMessage(errType: String, errCode: String, errMsg: String, errCol: String, rawValues: Seq[String], mappings: Seq[Mapping] = Seq()) -case class Mapping(mappingTableColumn: String, mappedDatasetColumn: String) +//case class ErrorMessage(errType: String, errCode: String, errMsg: String, errCol: String, rawValues: Seq[String], mappings: Seq[Mapping] = Seq()) +//case class Mapping(mappingTableColumn: String, mappedDatasetColumn: String) -object ErrorMessage { - val errorColumnName = "errCol" +object EnceladusErrorMessage { +// val errorColumnName = "errCol" - def confMappingErr(errCol: String, rawValues: Seq[String], mappings: Seq[Mapping]): ErrorMessage = ErrorMessage( + def confMappingErr(errCol: String, rawValues: Seq[String], mappings: Seq[ErrorMessage.Mapping]): ErrorMessage = ErrorMessage( errType = "confMapError", errCode = ErrorCodes.ConfMapError, errMsg = "Conformance Error - Null produced by mapping conformance rule", diff --git a/utils/src/main/scala/za/co/absa/enceladus/utils/schema/SparkUtils.scala b/utils/src/main/scala/za/co/absa/enceladus/utils/schema/SparkUtils.scala index 2deb77912..17f8b5b77 100644 --- a/utils/src/main/scala/za/co/absa/enceladus/utils/schema/SparkUtils.scala +++ b/utils/src/main/scala/za/co/absa/enceladus/utils/schema/SparkUtils.scala @@ -18,8 +18,8 @@ package za.co.absa.enceladus.utils.schema import org.apache.spark.sql.functions._ import org.apache.spark.sql.types.StructType import org.apache.spark.sql.{Column, DataFrame, SparkSession} -import za.co.absa.enceladus.utils.error.ErrorMessage import za.co.absa.enceladus.utils.udf.ConformanceUDFLibrary +import za.co.absa.spark.commons.errorhandling.ErrorMessage import za.co.absa.spark.commons.implicits.StructTypeImplicits.StructTypeEnhancements import za.co.absa.spark.commons.implicits.DataFrameImplicits.DataFrameEnhancements import za.co.absa.spark.hats.transformations.NestedArrayTransformations diff --git a/utils/src/main/scala/za/co/absa/enceladus/utils/udf/ConformanceUDFLibrary.scala b/utils/src/main/scala/za/co/absa/enceladus/utils/udf/ConformanceUDFLibrary.scala index b081770be..188821188 100644 --- a/utils/src/main/scala/za/co/absa/enceladus/utils/udf/ConformanceUDFLibrary.scala +++ b/utils/src/main/scala/za/co/absa/enceladus/utils/udf/ConformanceUDFLibrary.scala @@ -18,7 +18,8 @@ package za.co.absa.enceladus.utils.udf import org.apache.spark.sql.{Row, SparkSession} import org.apache.spark.sql.api.java.UDF2 import org.apache.spark.sql.types.ArrayType -import za.co.absa.enceladus.utils.error.{ErrorMessage, Mapping} +import za.co.absa.enceladus.utils.error.EnceladusErrorMessage +import za.co.absa.spark.commons.errorhandling.ErrorMessage import za.co.absa.enceladus.utils.udf.ConformanceUDFNames._ import za.co.absa.spark.commons.OncePerSparkSession @@ -28,20 +29,20 @@ import scala.collection.mutable class ConformanceUDFLibrary()(implicit sparkToRegisterTo: SparkSession) extends OncePerSparkSession { override protected def register(implicit spark: SparkSession): Unit = { - spark.udf.register(confMappingErr, { (errCol: String, rawValues: Seq[String], mappings: Seq[Mapping]) => - ErrorMessage.confMappingErr(errCol, rawValues, mappings) + spark.udf.register(confMappingErr, { (errCol: String, rawValues: Seq[String], mappings: Seq[ErrorMessage.Mapping]) => + EnceladusErrorMessage.confMappingErr(errCol, rawValues, mappings) }) spark.udf.register(confCastErr, { (errCol: String, rawValue: String) => - ErrorMessage.confCastErr(errCol, rawValue) + EnceladusErrorMessage.confCastErr(errCol, rawValue) }) spark.udf.register(confNegErr, { (errCol: String, rawValue: String) => - ErrorMessage.confNegErr(errCol, rawValue) + EnceladusErrorMessage.confNegErr(errCol, rawValue) }) spark.udf.register(confLitErr, { (errCol: String, rawValue: String) => - ErrorMessage.confLitErr(errCol, rawValue) + EnceladusErrorMessage.confLitErr(errCol, rawValue) }) spark.udf.register(arrayDistinctErrors, // this UDF is registered for _spark-hats_ library sake From 133bf0abc36b2cedbba6f463068670ad2bd155b0 Mon Sep 17 00:00:00 2001 From: AB019TC Date: Fri, 24 Feb 2023 14:01:36 +0200 Subject: [PATCH 2/7] Fixes #2170 - Refactored some errorMessages imports --- .../builtin/errorsender/mq/KafkaErrorSenderPluginImpl.scala | 2 +- .../za/co/absa/enceladus/common/ErrorColNormalization.scala | 2 +- .../common/performance/PerformanceMetricTools.scala | 2 +- .../conformance/interpreter/DynamicInterpreter.scala | 5 ++--- .../interpreter/rules/ArrayCollapseInterpreter.scala | 2 +- .../rules/mapping/CommonMappingRuleInterpreter.scala | 2 +- .../interpreter/rules/mapping/MappingRuleInterpreter.scala | 6 +++--- .../rules/mapping/MappingRuleInterpreterBroadcast.scala | 2 +- .../rules/mapping/MappingRuleInterpreterGroupExplode.scala | 3 ++- 9 files changed, 13 insertions(+), 13 deletions(-) diff --git a/plugins-builtin/src/main/scala/za/co/absa/enceladus/plugins/builtin/errorsender/mq/KafkaErrorSenderPluginImpl.scala b/plugins-builtin/src/main/scala/za/co/absa/enceladus/plugins/builtin/errorsender/mq/KafkaErrorSenderPluginImpl.scala index b25ba2237..9c886217e 100644 --- a/plugins-builtin/src/main/scala/za/co/absa/enceladus/plugins/builtin/errorsender/mq/KafkaErrorSenderPluginImpl.scala +++ b/plugins-builtin/src/main/scala/za/co/absa/enceladus/plugins/builtin/errorsender/mq/KafkaErrorSenderPluginImpl.scala @@ -26,12 +26,12 @@ import za.co.absa.enceladus.plugins.builtin.errorsender.mq.KafkaErrorSenderPlugi import KafkaErrorSenderPluginImpl._ import za.co.absa.enceladus.plugins.builtin.errorsender.mq.kafka.KafkaErrorSenderPlugin import za.co.absa.enceladus.plugins.builtin.errorsender.params.ErrorSenderPluginParams -import za.co.absa.enceladus.utils.error.ErrorMessage.ErrorCodes import za.co.absa.enceladus.utils.modules._ import za.co.absa.spark.commons.implicits.StructTypeImplicits.StructTypeEnhancements import za.co.absa.abris.avro.functions.to_avro import za.co.absa.abris.config.{AbrisConfig, ToAvroConfig} import za.co.absa.enceladus.plugins.builtin.errorsender.mq.kafka.KafkaErrorSenderPlugin.{avroKeySchemaRegistryConfig, avroValueSchemaRegistryConfig, registerSchemas} +import za.co.absa.enceladus.utils.error.EnceladusErrorMessage.ErrorCodes import scala.util.{Failure, Success, Try} diff --git a/spark-jobs/src/main/scala/za/co/absa/enceladus/common/ErrorColNormalization.scala b/spark-jobs/src/main/scala/za/co/absa/enceladus/common/ErrorColNormalization.scala index fad916543..c1bd033d0 100644 --- a/spark-jobs/src/main/scala/za/co/absa/enceladus/common/ErrorColNormalization.scala +++ b/spark-jobs/src/main/scala/za/co/absa/enceladus/common/ErrorColNormalization.scala @@ -17,7 +17,7 @@ package za.co.absa.enceladus.common import com.typesafe.config.Config import org.apache.spark.sql.DataFrame -import za.co.absa.enceladus.utils.error.ErrorMessage +import za.co.absa.spark.commons.errorhandling.ErrorMessage import za.co.absa.enceladus.utils.implicits.EnceladusDataFrameImplicits.EnceladusDataframeEnhancements object ErrorColNormalization { diff --git a/spark-jobs/src/main/scala/za/co/absa/enceladus/common/performance/PerformanceMetricTools.scala b/spark-jobs/src/main/scala/za/co/absa/enceladus/common/performance/PerformanceMetricTools.scala index 878d2f2be..82c013933 100644 --- a/spark-jobs/src/main/scala/za/co/absa/enceladus/common/performance/PerformanceMetricTools.scala +++ b/spark-jobs/src/main/scala/za/co/absa/enceladus/common/performance/PerformanceMetricTools.scala @@ -20,9 +20,9 @@ import org.apache.spark.sql.functions.{col, size, sum} import org.slf4j.{Logger, LoggerFactory} import za.co.absa.atum.core.Atum import za.co.absa.enceladus.utils.config.PathWithFs -import za.co.absa.enceladus.utils.error.ErrorMessage import za.co.absa.enceladus.utils.general.ProjectMetadata import za.co.absa.enceladus.utils.fs.HadoopFsUtils +import za.co.absa.spark.commons.errorhandling.ErrorMessage import za.co.absa.spark.commons.implicits.StructTypeImplicits.StructTypeEnhancements object PerformanceMetricTools extends ProjectMetadata { diff --git a/spark-jobs/src/main/scala/za/co/absa/enceladus/conformance/interpreter/DynamicInterpreter.scala b/spark-jobs/src/main/scala/za/co/absa/enceladus/conformance/interpreter/DynamicInterpreter.scala index cae1ea984..cb3dffb67 100644 --- a/spark-jobs/src/main/scala/za/co/absa/enceladus/conformance/interpreter/DynamicInterpreter.scala +++ b/spark-jobs/src/main/scala/za/co/absa/enceladus/conformance/interpreter/DynamicInterpreter.scala @@ -28,19 +28,18 @@ import za.co.absa.enceladus.conformance.config.ConformanceConfigParser import za.co.absa.enceladus.conformance.datasource.PartitioningUtils import za.co.absa.enceladus.conformance.interpreter.rules._ import za.co.absa.enceladus.conformance.interpreter.rules.custom.CustomConformanceRule -import za.co.absa.enceladus.conformance.interpreter.rules.mapping.{MappingRuleInterpreter, MappingRuleInterpreterBroadcast, - MappingRuleInterpreterGroupExplode} +import za.co.absa.enceladus.conformance.interpreter.rules.mapping.{MappingRuleInterpreter, MappingRuleInterpreterBroadcast, MappingRuleInterpreterGroupExplode} import za.co.absa.enceladus.dao.EnceladusDAO import za.co.absa.enceladus.model.conformanceRule._ import za.co.absa.enceladus.model.{Dataset => ConfDataset} import za.co.absa.enceladus.utils.config.PathWithFs -import za.co.absa.enceladus.utils.error.ErrorMessage import za.co.absa.enceladus.utils.fs.HadoopFsUtils import za.co.absa.enceladus.utils.udf.ConformanceUDFLibrary import za.co.absa.spark.commons.utils.explode.ExplosionContext import za.co.absa.spark.commons.implicits.StructTypeImplicits.StructTypeEnhancementsArrays import za.co.absa.spark.commons.implicits.DataFrameImplicits.DataFrameEnhancements import za.co.absa.commons.lang.extensions.SeqExtension._ +import za.co.absa.spark.commons.errorhandling.ErrorMessage case class DynamicInterpreter()(implicit inputFs: FileSystem) { private val log = LoggerFactory.getLogger(this.getClass) diff --git a/spark-jobs/src/main/scala/za/co/absa/enceladus/conformance/interpreter/rules/ArrayCollapseInterpreter.scala b/spark-jobs/src/main/scala/za/co/absa/enceladus/conformance/interpreter/rules/ArrayCollapseInterpreter.scala index e11acaa47..7cb3925a1 100644 --- a/spark-jobs/src/main/scala/za/co/absa/enceladus/conformance/interpreter/rules/ArrayCollapseInterpreter.scala +++ b/spark-jobs/src/main/scala/za/co/absa/enceladus/conformance/interpreter/rules/ArrayCollapseInterpreter.scala @@ -19,7 +19,7 @@ import org.apache.spark.sql.{Dataset, Row, SparkSession} import za.co.absa.enceladus.conformance.interpreter.{ExplosionState, InterpreterContextArgs} import za.co.absa.enceladus.dao.EnceladusDAO import za.co.absa.enceladus.model.conformanceRule.ConformanceRule -import za.co.absa.enceladus.utils.error.ErrorMessage +import za.co.absa.spark.commons.errorhandling.ErrorMessage import za.co.absa.spark.commons.utils.ExplodeTools import za.co.absa.spark.commons.utils.explode.ExplosionContext diff --git a/spark-jobs/src/main/scala/za/co/absa/enceladus/conformance/interpreter/rules/mapping/CommonMappingRuleInterpreter.scala b/spark-jobs/src/main/scala/za/co/absa/enceladus/conformance/interpreter/rules/mapping/CommonMappingRuleInterpreter.scala index c92d264bb..75d757d8c 100644 --- a/spark-jobs/src/main/scala/za/co/absa/enceladus/conformance/interpreter/rules/mapping/CommonMappingRuleInterpreter.scala +++ b/spark-jobs/src/main/scala/za/co/absa/enceladus/conformance/interpreter/rules/mapping/CommonMappingRuleInterpreter.scala @@ -27,8 +27,8 @@ import za.co.absa.enceladus.model.MappingTable import za.co.absa.enceladus.model.conformanceRule.MappingConformanceRule import za.co.absa.enceladus.model.dataFrameFilter.DataFrameFilter import za.co.absa.enceladus.conformance.interpreter.rules.ValidationException -import za.co.absa.enceladus.utils.error.Mapping import za.co.absa.enceladus.utils.validation.ExpressionValidator +import za.co.absa.spark.commons.errorhandling.ErrorMessage.Mapping import za.co.absa.spark.commons.implicits.StructTypeImplicits.StructTypeEnhancements import scala.util.Try diff --git a/spark-jobs/src/main/scala/za/co/absa/enceladus/conformance/interpreter/rules/mapping/MappingRuleInterpreter.scala b/spark-jobs/src/main/scala/za/co/absa/enceladus/conformance/interpreter/rules/mapping/MappingRuleInterpreter.scala index 0fb2661a3..735b67ce9 100644 --- a/spark-jobs/src/main/scala/za/co/absa/enceladus/conformance/interpreter/rules/mapping/MappingRuleInterpreter.scala +++ b/spark-jobs/src/main/scala/za/co/absa/enceladus/conformance/interpreter/rules/mapping/MappingRuleInterpreter.scala @@ -24,9 +24,9 @@ import za.co.absa.enceladus.conformance.interpreter.{ExplosionState, Interpreter import za.co.absa.enceladus.dao.EnceladusDAO import za.co.absa.enceladus.model.conformanceRule.{ConformanceRule, MappingConformanceRule} import za.co.absa.enceladus.model.{Dataset => ConfDataset} -import za.co.absa.enceladus.utils.error._ import za.co.absa.enceladus.utils.transformations.ArrayTransformations import za.co.absa.enceladus.utils.udf.ConformanceUDFNames +import za.co.absa.spark.commons.errorhandling.ErrorMessage import za.co.absa.spark.commons.implicits.StructTypeImplicits.StructTypeEnhancementsArrays import za.co.absa.spark.commons.sql.functions.col_of_path @@ -52,7 +52,7 @@ case class MappingRuleInterpreter(rule: MappingConformanceRule, conformance: Con val res = handleArrays(rule.outputColumn, withUniqueId) { dfIn => val joined = joinDatasetAndMappingTable(mapTable, dfIn) - val mappings = rule.attributeMappings.map(x => Mapping(x._1, x._2)).toSeq + val mappings = rule.attributeMappings.map(x => ErrorMessage.Mapping(x._1, x._2)).toSeq val mappingErrUdfCall = call_udf(ConformanceUDFNames.confMappingErr, lit(rule.outputColumn), array(rule.attributeMappings.values.toSeq.map(col_of_path(_).cast(StringType)): _*), typedLit(mappings)) @@ -94,7 +94,7 @@ case class MappingRuleInterpreter(rule: MappingConformanceRule, conformance: Con .select($"conf.*", col(s"err.${ErrorMessage.errorColumnName}")).drop(idField) } - private def inclErrorNullArr(mappings: Seq[Mapping], schema: StructType): Column = { + private def inclErrorNullArr(mappings: Seq[ErrorMessage.Mapping], schema: StructType): Column = { val paths = mappings.flatMap { mapping => schema.getAllArraysInPath(mapping.mappedDatasetColumn) } diff --git a/spark-jobs/src/main/scala/za/co/absa/enceladus/conformance/interpreter/rules/mapping/MappingRuleInterpreterBroadcast.scala b/spark-jobs/src/main/scala/za/co/absa/enceladus/conformance/interpreter/rules/mapping/MappingRuleInterpreterBroadcast.scala index 0a7b4b4df..ed026b84d 100644 --- a/spark-jobs/src/main/scala/za/co/absa/enceladus/conformance/interpreter/rules/mapping/MappingRuleInterpreterBroadcast.scala +++ b/spark-jobs/src/main/scala/za/co/absa/enceladus/conformance/interpreter/rules/mapping/MappingRuleInterpreterBroadcast.scala @@ -23,7 +23,7 @@ import za.co.absa.enceladus.dao.EnceladusDAO import za.co.absa.enceladus.model.conformanceRule.{ConformanceRule, MappingConformanceRule} import za.co.absa.enceladus.model.{Dataset => ConfDataset} import za.co.absa.enceladus.utils.broadcast.{BroadcastUtils, LocalMappingTable} -import za.co.absa.enceladus.utils.error.ErrorMessage +import za.co.absa.spark.commons.errorhandling.ErrorMessage import za.co.absa.spark.hats.transformations.NestedArrayTransformations import za.co.absa.spark.hats.transformations.NestedArrayTransformations.GetFieldFunction diff --git a/spark-jobs/src/main/scala/za/co/absa/enceladus/conformance/interpreter/rules/mapping/MappingRuleInterpreterGroupExplode.scala b/spark-jobs/src/main/scala/za/co/absa/enceladus/conformance/interpreter/rules/mapping/MappingRuleInterpreterGroupExplode.scala index 2b4d69758..8613b4600 100644 --- a/spark-jobs/src/main/scala/za/co/absa/enceladus/conformance/interpreter/rules/mapping/MappingRuleInterpreterGroupExplode.scala +++ b/spark-jobs/src/main/scala/za/co/absa/enceladus/conformance/interpreter/rules/mapping/MappingRuleInterpreterGroupExplode.scala @@ -25,6 +25,7 @@ import za.co.absa.enceladus.model.conformanceRule.{ConformanceRule, MappingConfo import za.co.absa.enceladus.model.{Dataset => ConfDataset} import za.co.absa.enceladus.utils.error._ import za.co.absa.enceladus.utils.udf.ConformanceUDFNames +import za.co.absa.spark.commons.errorhandling.ErrorMessage import za.co.absa.spark.commons.sql.functions.col_of_path import za.co.absa.spark.commons.utils.explode.ExplosionContext import za.co.absa.spark.commons.utils.{ExplodeTools, SchemaUtils} @@ -46,7 +47,7 @@ case class MappingRuleInterpreterGroupExplode(rule: MappingConformanceRule, val (mapTable, defaultValues) = conformPreparation(df, enableCrossJoin = true) val (explodedDf, expCtx) = explodeIfNeeded(df, explosionState) - val mappings = rule.attributeMappings.map(x => Mapping(x._1, x._2)).toSeq + val mappings = rule.attributeMappings.map(x => ErrorMessage.Mapping(x._1, x._2)).toSeq val mappingErrUdfCall = call_udf(ConformanceUDFNames.confMappingErr, lit(rule.allOutputColumns().keys.mkString(",")), array(rule.attributeMappings.values.toSeq.map(col_of_path(_).cast(StringType)): _*), typedLit(mappings)) From 8d4b3c6a2dcb43c6aa6b7fc8508dfe899f7d362f Mon Sep 17 00:00:00 2001 From: AB019TC Date: Fri, 24 Feb 2023 14:31:30 +0200 Subject: [PATCH 3/7] Fixes #2170 - Removed irrelevant doc --- .../utils/error/EnceladusErrorMessage.scala | 13 ------------- 1 file changed, 13 deletions(-) diff --git a/utils/src/main/scala/za/co/absa/enceladus/utils/error/EnceladusErrorMessage.scala b/utils/src/main/scala/za/co/absa/enceladus/utils/error/EnceladusErrorMessage.scala index e606ffa94..a4e3fcaed 100644 --- a/utils/src/main/scala/za/co/absa/enceladus/utils/error/EnceladusErrorMessage.scala +++ b/utils/src/main/scala/za/co/absa/enceladus/utils/error/EnceladusErrorMessage.scala @@ -20,21 +20,8 @@ import org.apache.spark.sql.types.StructType import za.co.absa.standardization.config.DefaultErrorCodesConfig import za.co.absa.spark.commons.errorhandling.ErrorMessage -/** - * Case class to represent an error message - * - * @param errType - Type or source of the error - * @param errCode - Internal error code - * @param errMsg - Textual description of the error - * @param errCol - The name of the column where the error occurred - * @param rawValues - Sequence of raw values (which are the potential culprits of the error) - * @param mappings - Sequence of Mappings i.e Mapping Table Column -> Equivalent Mapped Dataset column - */ -//case class ErrorMessage(errType: String, errCode: String, errMsg: String, errCol: String, rawValues: Seq[String], mappings: Seq[Mapping] = Seq()) -//case class Mapping(mappingTableColumn: String, mappedDatasetColumn: String) object EnceladusErrorMessage { -// val errorColumnName = "errCol" def confMappingErr(errCol: String, rawValues: Seq[String], mappings: Seq[ErrorMessage.Mapping]): ErrorMessage = ErrorMessage( errType = "confMapError", From af390cb2723c075856bd661f3b7b084e86ed24b1 Mon Sep 17 00:00:00 2001 From: AB019TC Date: Thu, 2 Mar 2023 11:27:00 +0200 Subject: [PATCH 4/7] Revert "Fixes #2170 - Removed irrelevant doc" This reverts commit 8d4b3c6a2dcb43c6aa6b7fc8508dfe899f7d362f. --- .../utils/error/EnceladusErrorMessage.scala | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/utils/src/main/scala/za/co/absa/enceladus/utils/error/EnceladusErrorMessage.scala b/utils/src/main/scala/za/co/absa/enceladus/utils/error/EnceladusErrorMessage.scala index a4e3fcaed..e606ffa94 100644 --- a/utils/src/main/scala/za/co/absa/enceladus/utils/error/EnceladusErrorMessage.scala +++ b/utils/src/main/scala/za/co/absa/enceladus/utils/error/EnceladusErrorMessage.scala @@ -20,8 +20,21 @@ import org.apache.spark.sql.types.StructType import za.co.absa.standardization.config.DefaultErrorCodesConfig import za.co.absa.spark.commons.errorhandling.ErrorMessage +/** + * Case class to represent an error message + * + * @param errType - Type or source of the error + * @param errCode - Internal error code + * @param errMsg - Textual description of the error + * @param errCol - The name of the column where the error occurred + * @param rawValues - Sequence of raw values (which are the potential culprits of the error) + * @param mappings - Sequence of Mappings i.e Mapping Table Column -> Equivalent Mapped Dataset column + */ +//case class ErrorMessage(errType: String, errCode: String, errMsg: String, errCol: String, rawValues: Seq[String], mappings: Seq[Mapping] = Seq()) +//case class Mapping(mappingTableColumn: String, mappedDatasetColumn: String) object EnceladusErrorMessage { +// val errorColumnName = "errCol" def confMappingErr(errCol: String, rawValues: Seq[String], mappings: Seq[ErrorMessage.Mapping]): ErrorMessage = ErrorMessage( errType = "confMapError", From cfd669bf6f294ec73505d6140397f2b8cf7d092e Mon Sep 17 00:00:00 2001 From: AB019TC Date: Thu, 2 Mar 2023 14:43:50 +0200 Subject: [PATCH 5/7] Removed ErrorMessageFactory --- .../common/error/ErrorMessageFactory.scala | 35 ------------------- .../rules/custom/CustomRuleSuite.scala | 2 +- .../mapping/MappingGroupExplodeSuite.scala | 4 +-- .../mapping/MappingRuleBroadcastSuite.scala | 3 +- .../conformance/samples/ArraySamples.scala | 5 +-- .../samples/EmployeeConformance.scala | 7 ++-- .../StandardizationRerunSuite.scala | 2 +- .../utils/broadcast/BroadcastUtilsSuite.scala | 15 ++++---- .../utils/broadcast/ErrorColumn.scala | 3 +- 9 files changed, 22 insertions(+), 54 deletions(-) delete mode 100644 spark-jobs/src/test/scala/za/co/absa/enceladus/common/error/ErrorMessageFactory.scala diff --git a/spark-jobs/src/test/scala/za/co/absa/enceladus/common/error/ErrorMessageFactory.scala b/spark-jobs/src/test/scala/za/co/absa/enceladus/common/error/ErrorMessageFactory.scala deleted file mode 100644 index a50869fa7..000000000 --- a/spark-jobs/src/test/scala/za/co/absa/enceladus/common/error/ErrorMessageFactory.scala +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Copyright 2018 ABSA Group Limited - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package za.co.absa.enceladus.common.error - -object ErrorMessageFactory { - def errColSchema(nullable: Boolean): String = s"\n |-- errCol: array (nullable = $nullable)\n"+ - " | |-- element: struct (containsNull = false)\n"+ - " | | |-- errType: string (nullable = true)\n"+ - " | | |-- errCode: string (nullable = true)\n"+ - " | | |-- errMsg: string (nullable = true)\n"+ - " | | |-- errCol: string (nullable = true)\n"+ - " | | |-- rawValues: array (nullable = true)\n"+ - " | | | |-- element: string (containsNull = true)\n"+ - " | | |-- mappings: array (nullable = true)\n"+ - " | | | |-- element: struct (containsNull = true)\n"+ - " | | | | |-- mappingTableColumn: string (nullable = true)\n"+ - " | | | | |-- mappedDatasetColumn: string (nullable = true)\n" - - def attachErrColToSchemaPrint(nullable: Boolean, schemaPrint: String): String = { - schemaPrint + errColSchema(nullable) - } -} diff --git a/spark-jobs/src/test/scala/za/co/absa/enceladus/conformance/interpreter/rules/custom/CustomRuleSuite.scala b/spark-jobs/src/test/scala/za/co/absa/enceladus/conformance/interpreter/rules/custom/CustomRuleSuite.scala index b6995a4d3..797f24ed6 100644 --- a/spark-jobs/src/test/scala/za/co/absa/enceladus/conformance/interpreter/rules/custom/CustomRuleSuite.scala +++ b/spark-jobs/src/test/scala/za/co/absa/enceladus/conformance/interpreter/rules/custom/CustomRuleSuite.scala @@ -25,8 +25,8 @@ import za.co.absa.enceladus.conformance.interpreter.{DynamicInterpreter, Explosi import za.co.absa.enceladus.dao.EnceladusDAO import za.co.absa.enceladus.model.conformanceRule.ConformanceRule import za.co.absa.enceladus.model.{conformanceRule, Dataset => ConfDataset} -import za.co.absa.enceladus.utils.error.ErrorMessage import za.co.absa.enceladus.utils.testUtils.{HadoopFsTestBase, TZNormalizedSparkTestBase} +import za.co.absa.spark.commons.errorhandling.ErrorMessage case class MyCustomRule( order: Int, diff --git a/spark-jobs/src/test/scala/za/co/absa/enceladus/conformance/interpreter/rules/mapping/MappingGroupExplodeSuite.scala b/spark-jobs/src/test/scala/za/co/absa/enceladus/conformance/interpreter/rules/mapping/MappingGroupExplodeSuite.scala index a5540f7db..928eb09c7 100644 --- a/spark-jobs/src/test/scala/za/co/absa/enceladus/conformance/interpreter/rules/mapping/MappingGroupExplodeSuite.scala +++ b/spark-jobs/src/test/scala/za/co/absa/enceladus/conformance/interpreter/rules/mapping/MappingGroupExplodeSuite.scala @@ -18,8 +18,8 @@ package za.co.absa.enceladus.conformance.interpreter.rules.mapping import org.apache.spark.sql.functions.{array, typedLit} import za.co.absa.enceladus.conformance.interpreter.DynamicInterpreter import za.co.absa.enceladus.conformance.interpreter.rules.testcasefactories.NestedTestCaseFactory._ -import za.co.absa.enceladus.conformance.interpreter.rules.testcasefactories.SimpleTestCaseFactory.{simpleMappingRule, simpleMappingRuleMultipleOutputs, simpleMappingRuleMultipleOutputsWithDefaults, simpleMappingRuleWithDefaultValue} -import za.co.absa.enceladus.utils.error.ErrorMessage +import za.co.absa.enceladus.conformance.interpreter.rules.testcasefactories.SimpleTestCaseFactory._ +import za.co.absa.spark.commons.errorhandling.ErrorMessage import za.co.absa.spark.commons.utils.JsonUtils import za.co.absa.spark.commons.implicits.DataFrameImplicits.DataFrameEnhancements diff --git a/spark-jobs/src/test/scala/za/co/absa/enceladus/conformance/interpreter/rules/mapping/MappingRuleBroadcastSuite.scala b/spark-jobs/src/test/scala/za/co/absa/enceladus/conformance/interpreter/rules/mapping/MappingRuleBroadcastSuite.scala index 3b3b1d83d..b54fd2dd5 100644 --- a/spark-jobs/src/test/scala/za/co/absa/enceladus/conformance/interpreter/rules/mapping/MappingRuleBroadcastSuite.scala +++ b/spark-jobs/src/test/scala/za/co/absa/enceladus/conformance/interpreter/rules/mapping/MappingRuleBroadcastSuite.scala @@ -19,10 +19,9 @@ import org.apache.spark.sql.functions._ import za.co.absa.enceladus.conformance.interpreter.DynamicInterpreter import za.co.absa.enceladus.conformance.interpreter.rules.testcasefactories.NestedTestCaseFactory._ import za.co.absa.enceladus.conformance.interpreter.rules.testcasefactories.SimpleTestCaseFactory._ -import za.co.absa.enceladus.utils.error.ErrorMessage +import za.co.absa.spark.commons.errorhandling.ErrorMessage import za.co.absa.spark.commons.utils.JsonUtils import za.co.absa.spark.commons.implicits.DataFrameImplicits.DataFrameEnhancements -import za.co.absa.spark.commons.implicits.DataFrameImplicits.DataFrameEnhancements class MappingRuleBroadcastSuite extends MappingInterpreterSuite { diff --git a/spark-jobs/src/test/scala/za/co/absa/enceladus/conformance/samples/ArraySamples.scala b/spark-jobs/src/test/scala/za/co/absa/enceladus/conformance/samples/ArraySamples.scala index c3b76ab46..272b228aa 100644 --- a/spark-jobs/src/test/scala/za/co/absa/enceladus/conformance/samples/ArraySamples.scala +++ b/spark-jobs/src/test/scala/za/co/absa/enceladus/conformance/samples/ArraySamples.scala @@ -17,7 +17,8 @@ package za.co.absa.enceladus.conformance.samples import za.co.absa.enceladus.model.conformanceRule._ import za.co.absa.enceladus.model.{MappingTable, Dataset => ConfDataset} -import za.co.absa.enceladus.utils.error._ +import za.co.absa.enceladus.utils.error.EnceladusErrorMessage +import za.co.absa.spark.commons.errorhandling.ErrorMessage case class Outer(order: Int, a: Seq[Inner], myFlag: Boolean) case class OuterErr(order: Int, a: Seq[Inner], myFlag: Boolean, errCol: Seq[ErrorMessage]) @@ -142,7 +143,7 @@ object ArraySamples { ConformedInner2(2, "two", "twoDrop me :)", "Hello world", new ConformedInner3("Hello world"), "myConf", "HELLO WORLD") )), ConformedInner(Seq()), - ConformedInner(null)), Seq(ErrorMessage.confMappingErr("a.c.conformedD", List("0", "true"), List(Mapping("ind", "a.c.d"), Mapping("otherFlag", "myFlag"))))), + ConformedInner(null)), Seq(EnceladusErrorMessage.confMappingErr("a.c.conformedD", List("0", "true"), List(ErrorMessage.Mapping("ind", "a.c.d"), ErrorMessage.Mapping("otherFlag", "myFlag"))))), ConformedOuter(2, Seq(), Seq()), ConformedOuter(3, null, Seq())) } diff --git a/spark-jobs/src/test/scala/za/co/absa/enceladus/conformance/samples/EmployeeConformance.scala b/spark-jobs/src/test/scala/za/co/absa/enceladus/conformance/samples/EmployeeConformance.scala index 5b8dda503..ff4e40545 100644 --- a/spark-jobs/src/test/scala/za/co/absa/enceladus/conformance/samples/EmployeeConformance.scala +++ b/spark-jobs/src/test/scala/za/co/absa/enceladus/conformance/samples/EmployeeConformance.scala @@ -17,7 +17,8 @@ package za.co.absa.enceladus.conformance.samples import za.co.absa.enceladus.model.conformanceRule._ import za.co.absa.enceladus.model.{Dataset, DefaultValue, MappingTable} -import za.co.absa.enceladus.utils.error.{ErrorMessage, Mapping} +import za.co.absa.enceladus.utils.error.EnceladusErrorMessage +import za.co.absa.spark.commons.errorhandling.ErrorMessage object EmployeeConformance { val countryMT = MappingTable(name = "country", version = 0, hdfsPath = "src/test/testData/country", schemaName = "country", schemaVersion = 0) @@ -65,8 +66,8 @@ object EmployeeConformance { ConformedRole(1), ConformedEmployeeId = "2"), ConformedEmployee(employee_id = 3, name = "John", surname = "Doe3", dept= 3, role = 2, country = "SWE", conformed_country = null, conformed_department = "Unknown dept", conformed_role = "External dev", errCol= List( - ErrorMessage.confMappingErr("conformed_country", Seq("SWE"), Seq(Mapping("country_code", "country"))), - ErrorMessage.confMappingErr("conformed_department", Seq("3"), Seq(Mapping("dept_id", "dept"))) + EnceladusErrorMessage.confMappingErr("conformed_country", Seq("SWE"), Seq(ErrorMessage.Mapping("country_code", "country"))), + EnceladusErrorMessage.confMappingErr("conformed_department", Seq("3"), Seq(ErrorMessage.Mapping("dept_id", "dept"))) ), MyLiteral = "abcdef", MyUpperLiteral = "ABCDEF", Concatenated = "abcdefABCDEF", SparkConfAttr = "hello :)", ConformedRole(2), ConformedEmployeeId = "3"), ConformedEmployee(employee_id = 4, name = "John", surname = "Doe4", dept= 1, role = 2, country = "IN", conformed_country = "India", conformed_department = "Ingestion Squad", conformed_role = "Ingestion Developer", errCol= List(), MyLiteral = "abcdef", MyUpperLiteral = "ABCDEF", Concatenated = "abcdefABCDEF", SparkConfAttr = "hello :)", diff --git a/spark-jobs/src/test/scala/za/co/absa/enceladus/standardization/StandardizationRerunSuite.scala b/spark-jobs/src/test/scala/za/co/absa/enceladus/standardization/StandardizationRerunSuite.scala index 14e9db954..bcba5c00e 100644 --- a/spark-jobs/src/test/scala/za/co/absa/enceladus/standardization/StandardizationRerunSuite.scala +++ b/spark-jobs/src/test/scala/za/co/absa/enceladus/standardization/StandardizationRerunSuite.scala @@ -26,8 +26,8 @@ import za.co.absa.enceladus.dao.EnceladusDAO import za.co.absa.enceladus.model.Dataset import za.co.absa.enceladus.standardization.config.StandardizationConfig import za.co.absa.enceladus.standardization.fixtures.TempFileFixture -import za.co.absa.enceladus.utils.error.ErrorMessage import za.co.absa.enceladus.utils.testUtils.TZNormalizedSparkTestBase +import za.co.absa.spark.commons.errorhandling.ErrorMessage import za.co.absa.standardization.ValidationException import za.co.absa.standardization.{RecordIdGeneration, Standardization} import za.co.absa.standardization.config.{BasicMetadataColumnsConfig, BasicStandardizationConfig} diff --git a/utils/src/test/scala/za/co/absa/enceladus/utils/broadcast/BroadcastUtilsSuite.scala b/utils/src/test/scala/za/co/absa/enceladus/utils/broadcast/BroadcastUtilsSuite.scala index 2578e8b1b..25c444f81 100644 --- a/utils/src/test/scala/za/co/absa/enceladus/utils/broadcast/BroadcastUtilsSuite.scala +++ b/utils/src/test/scala/za/co/absa/enceladus/utils/broadcast/BroadcastUtilsSuite.scala @@ -18,8 +18,9 @@ package za.co.absa.enceladus.utils.broadcast import org.apache.spark.sql.functions._ import org.apache.spark.sql.{DataFrame, Row} import org.scalatest.wordspec.AnyWordSpec -import za.co.absa.enceladus.utils.error.Mapping +import za.co.absa.enceladus.utils.error.EnceladusErrorMessage import za.co.absa.enceladus.utils.testUtils.{LoggerTestBase, TZNormalizedSparkTestBase} +import za.co.absa.spark.commons.errorhandling.ErrorMessage import scala.collection.mutable @@ -522,7 +523,7 @@ class BroadcastUtilsSuite extends AnyWordSpec with TZNormalizedSparkTestBase wit val localMt = LocalMappingTable(dfMt, Seq("id"), Map(""->"val")) val broadcastedMt = BroadcastUtils.broadcastMappingTable(localMt) - val mappings = Seq(Mapping("id", "key2")) + val mappings = Seq(ErrorMessage.Mapping("id", "key2")) val errorUdf1 = BroadcastUtils.getErrorUdf(broadcastedMt, Seq("val"), mappings) @@ -540,7 +541,7 @@ class BroadcastUtilsSuite extends AnyWordSpec with TZNormalizedSparkTestBase wit val localMt = LocalMappingTable(dfMt, Seq("id", "id"), Map(""->"val")) val broadcastedMt = BroadcastUtils.broadcastMappingTable(localMt) - val mappings = Seq(Mapping("id", "key2"), Mapping("id", "key2")) + val mappings = Seq(ErrorMessage.Mapping("id", "key2"), ErrorMessage.Mapping("id", "key2")) val errorUdf2 = BroadcastUtils.getErrorUdf(broadcastedMt, Seq("val"), mappings) @@ -552,7 +553,7 @@ class BroadcastUtilsSuite extends AnyWordSpec with TZNormalizedSparkTestBase wit "3 UDF parameter is used" in { val localMt = LocalMappingTable(dfMt, Seq("id", "id", "id"), Map(""->"val")) val broadcastedMt = BroadcastUtils.broadcastMappingTable(localMt) - val mappings = Seq(Mapping("id", "key2"), Mapping("id", "key2"), Mapping("id", "key2")) + val mappings = Seq(ErrorMessage.Mapping("id", "key2"), ErrorMessage.Mapping("id", "key2"), ErrorMessage.Mapping("id", "key2")) val errorUdf3 = BroadcastUtils.getErrorUdf(broadcastedMt, Seq("val"), mappings) @@ -567,7 +568,7 @@ class BroadcastUtilsSuite extends AnyWordSpec with TZNormalizedSparkTestBase wit "4 UDF parameter is used" in { val localMt = LocalMappingTable(dfMt, Seq("id", "id", "id", "id"), Map(""->"val")) val broadcastedMt = BroadcastUtils.broadcastMappingTable(localMt) - val mappings = Seq(Mapping("id", "key2"), Mapping("id", "key2"), Mapping("id", "key2"), Mapping("id", "key2")) + val mappings = Seq(ErrorMessage.Mapping("id", "key2"), ErrorMessage.Mapping("id", "key2"), ErrorMessage.Mapping("id", "key2"), ErrorMessage.Mapping("id", "key2")) val errorUdf4 = BroadcastUtils.getErrorUdf(broadcastedMt, Seq("val"), mappings) @@ -582,8 +583,8 @@ class BroadcastUtilsSuite extends AnyWordSpec with TZNormalizedSparkTestBase wit "5 UDF parameter is used" in { val localMt = LocalMappingTable(dfMt, Seq("id", "id", "id", "id", "id"), Map(""->"val")) val broadcastedMt = BroadcastUtils.broadcastMappingTable(localMt) - val mappings = Seq(Mapping("id", "key2"), Mapping("id", "key2"), Mapping("id", "key2"), Mapping("id", "key2"), - Mapping("id", "key2")) + val mappings = Seq(ErrorMessage.Mapping("id", "key2"), ErrorMessage.Mapping("id", "key2"), ErrorMessage.Mapping("id", "key2"), ErrorMessage.Mapping("id", "key2"), + ErrorMessage.Mapping("id", "key2")) val errorUdf5 = BroadcastUtils.getErrorUdf(broadcastedMt, Seq("val"), mappings) diff --git a/utils/src/test/scala/za/co/absa/enceladus/utils/broadcast/ErrorColumn.scala b/utils/src/test/scala/za/co/absa/enceladus/utils/broadcast/ErrorColumn.scala index 7ee4caa65..bbb5c3952 100644 --- a/utils/src/test/scala/za/co/absa/enceladus/utils/broadcast/ErrorColumn.scala +++ b/utils/src/test/scala/za/co/absa/enceladus/utils/broadcast/ErrorColumn.scala @@ -15,7 +15,8 @@ package za.co.absa.enceladus.utils.broadcast -import za.co.absa.enceladus.utils.error.ErrorMessage +import za.co.absa.spark.commons.errorhandling.ErrorMessage + /** This case class is used to extract error column from test dataframes. */ case class ErrorColumn(errCol: ErrorMessage) From f70cf45522e8e1472dbb5e3f03bb0acabb928df3 Mon Sep 17 00:00:00 2001 From: AB019TC Date: Thu, 2 Mar 2023 15:49:38 +0200 Subject: [PATCH 6/7] Fixes #2170 - Removed unused imports and refactored complex imports --- .../builtin/errorsender/mq/KafkaErrorSenderPluginImpl.scala | 4 ++-- .../conformance/interpreter/DynamicInterpreter.scala | 2 +- .../rules/mapping/MappingRuleInterpreterGroupExplode.scala | 1 - .../enceladus/standardization/StandardizationRerunSuite.scala | 1 + 4 files changed, 4 insertions(+), 4 deletions(-) diff --git a/plugins-builtin/src/main/scala/za/co/absa/enceladus/plugins/builtin/errorsender/mq/KafkaErrorSenderPluginImpl.scala b/plugins-builtin/src/main/scala/za/co/absa/enceladus/plugins/builtin/errorsender/mq/KafkaErrorSenderPluginImpl.scala index 9c886217e..dec2ef929 100644 --- a/plugins-builtin/src/main/scala/za/co/absa/enceladus/plugins/builtin/errorsender/mq/KafkaErrorSenderPluginImpl.scala +++ b/plugins-builtin/src/main/scala/za/co/absa/enceladus/plugins/builtin/errorsender/mq/KafkaErrorSenderPluginImpl.scala @@ -20,7 +20,7 @@ import org.apache.spark.sql.functions.{col, explode, lit, size, struct, typedLit import org.apache.spark.sql.types.DataTypes import org.apache.spark.sql.{Column, DataFrame, DataFrameWriter, Encoder, Encoders} import za.co.absa.enceladus.plugins.api.postprocessor.PostProcessor -import za.co.absa.enceladus.plugins.builtin.common.mq.kafka.{KafkaConnectionParams, KafkaSecurityParams, SchemaRegistrySecurityParams} +import za.co.absa.enceladus.plugins.builtin.common.mq.kafka.{KafkaConnectionParams, KafkaSecurityParams} import za.co.absa.enceladus.plugins.builtin.errorsender.DceError import za.co.absa.enceladus.plugins.builtin.errorsender.mq.KafkaErrorSenderPluginImpl.SingleErrorStardardized import KafkaErrorSenderPluginImpl._ @@ -29,7 +29,7 @@ import za.co.absa.enceladus.plugins.builtin.errorsender.params.ErrorSenderPlugin import za.co.absa.enceladus.utils.modules._ import za.co.absa.spark.commons.implicits.StructTypeImplicits.StructTypeEnhancements import za.co.absa.abris.avro.functions.to_avro -import za.co.absa.abris.config.{AbrisConfig, ToAvroConfig} +import za.co.absa.abris.config.{ToAvroConfig} import za.co.absa.enceladus.plugins.builtin.errorsender.mq.kafka.KafkaErrorSenderPlugin.{avroKeySchemaRegistryConfig, avroValueSchemaRegistryConfig, registerSchemas} import za.co.absa.enceladus.utils.error.EnceladusErrorMessage.ErrorCodes diff --git a/spark-jobs/src/main/scala/za/co/absa/enceladus/conformance/interpreter/DynamicInterpreter.scala b/spark-jobs/src/main/scala/za/co/absa/enceladus/conformance/interpreter/DynamicInterpreter.scala index cb3dffb67..89754781c 100644 --- a/spark-jobs/src/main/scala/za/co/absa/enceladus/conformance/interpreter/DynamicInterpreter.scala +++ b/spark-jobs/src/main/scala/za/co/absa/enceladus/conformance/interpreter/DynamicInterpreter.scala @@ -28,7 +28,7 @@ import za.co.absa.enceladus.conformance.config.ConformanceConfigParser import za.co.absa.enceladus.conformance.datasource.PartitioningUtils import za.co.absa.enceladus.conformance.interpreter.rules._ import za.co.absa.enceladus.conformance.interpreter.rules.custom.CustomConformanceRule -import za.co.absa.enceladus.conformance.interpreter.rules.mapping.{MappingRuleInterpreter, MappingRuleInterpreterBroadcast, MappingRuleInterpreterGroupExplode} +import za.co.absa.enceladus.conformance.interpreter.rules.mapping._ import za.co.absa.enceladus.dao.EnceladusDAO import za.co.absa.enceladus.model.conformanceRule._ import za.co.absa.enceladus.model.{Dataset => ConfDataset} diff --git a/spark-jobs/src/main/scala/za/co/absa/enceladus/conformance/interpreter/rules/mapping/MappingRuleInterpreterGroupExplode.scala b/spark-jobs/src/main/scala/za/co/absa/enceladus/conformance/interpreter/rules/mapping/MappingRuleInterpreterGroupExplode.scala index 8613b4600..ec9d7312c 100644 --- a/spark-jobs/src/main/scala/za/co/absa/enceladus/conformance/interpreter/rules/mapping/MappingRuleInterpreterGroupExplode.scala +++ b/spark-jobs/src/main/scala/za/co/absa/enceladus/conformance/interpreter/rules/mapping/MappingRuleInterpreterGroupExplode.scala @@ -23,7 +23,6 @@ import za.co.absa.enceladus.conformance.interpreter.{ExplosionState, Interpreter import za.co.absa.enceladus.dao.EnceladusDAO import za.co.absa.enceladus.model.conformanceRule.{ConformanceRule, MappingConformanceRule} import za.co.absa.enceladus.model.{Dataset => ConfDataset} -import za.co.absa.enceladus.utils.error._ import za.co.absa.enceladus.utils.udf.ConformanceUDFNames import za.co.absa.spark.commons.errorhandling.ErrorMessage import za.co.absa.spark.commons.sql.functions.col_of_path diff --git a/spark-jobs/src/test/scala/za/co/absa/enceladus/standardization/StandardizationRerunSuite.scala b/spark-jobs/src/test/scala/za/co/absa/enceladus/standardization/StandardizationRerunSuite.scala index bcba5c00e..8d0ffacb8 100644 --- a/spark-jobs/src/test/scala/za/co/absa/enceladus/standardization/StandardizationRerunSuite.scala +++ b/spark-jobs/src/test/scala/za/co/absa/enceladus/standardization/StandardizationRerunSuite.scala @@ -47,6 +47,7 @@ class StandardizationRerunSuite extends FixtureAnyFunSuite with TZNormalizedSpar private val tmpFilePrefix = "test-input-" private val tmpFileSuffix = ".csv" + private val csvContent: String = """101|102|1|2019-05-04|2019-05-04 |201|202|2|2019-05-05|2019-05-05 From e2e7ddbab2083cd62c4f187665628174542c8419 Mon Sep 17 00:00:00 2001 From: TebaleloS <107194332+TebaleloS@users.noreply.github.com> Date: Fri, 3 Mar 2023 12:42:28 +0200 Subject: [PATCH 7/7] Update plugins-builtin/src/main/scala/za/co/absa/enceladus/plugins/builtin/errorsender/mq/KafkaErrorSenderPluginImpl.scala Co-authored-by: Ladislav Sulak --- .../builtin/errorsender/mq/KafkaErrorSenderPluginImpl.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/plugins-builtin/src/main/scala/za/co/absa/enceladus/plugins/builtin/errorsender/mq/KafkaErrorSenderPluginImpl.scala b/plugins-builtin/src/main/scala/za/co/absa/enceladus/plugins/builtin/errorsender/mq/KafkaErrorSenderPluginImpl.scala index dec2ef929..5e2354d07 100644 --- a/plugins-builtin/src/main/scala/za/co/absa/enceladus/plugins/builtin/errorsender/mq/KafkaErrorSenderPluginImpl.scala +++ b/plugins-builtin/src/main/scala/za/co/absa/enceladus/plugins/builtin/errorsender/mq/KafkaErrorSenderPluginImpl.scala @@ -29,7 +29,7 @@ import za.co.absa.enceladus.plugins.builtin.errorsender.params.ErrorSenderPlugin import za.co.absa.enceladus.utils.modules._ import za.co.absa.spark.commons.implicits.StructTypeImplicits.StructTypeEnhancements import za.co.absa.abris.avro.functions.to_avro -import za.co.absa.abris.config.{ToAvroConfig} +import za.co.absa.abris.config.ToAvroConfig import za.co.absa.enceladus.plugins.builtin.errorsender.mq.kafka.KafkaErrorSenderPlugin.{avroKeySchemaRegistryConfig, avroValueSchemaRegistryConfig, registerSchemas} import za.co.absa.enceladus.utils.error.EnceladusErrorMessage.ErrorCodes