From aaedd6a0d0567ba208483986b4a6229a46739355 Mon Sep 17 00:00:00 2001 From: blaval Date: Fri, 21 May 2021 13:16:22 +0100 Subject: [PATCH 1/4] create a DfApiQcResultsRepository that can contact the API --- build.sbt | 4 +- project/Dependencies.scala | 2 + .../repository/DfApiQcResultsRepository.scala | 53 ++++++++++++ .../dataflare/generators/Generators.scala | 39 ++++++++- .../DfApiQcResultsRepositoryTest.scala | 86 +++++++++++++++++++ 5 files changed, 182 insertions(+), 2 deletions(-) create mode 100644 src/main/scala/com/github/timgent/dataflare/repository/DfApiQcResultsRepository.scala create mode 100644 src/test/scala/com/github/timgent/dataflare/repository/DfApiQcResultsRepositoryTest.scala diff --git a/build.sbt b/build.sbt index 75c316b..4e17469 100644 --- a/build.sbt +++ b/build.sbt @@ -38,7 +38,9 @@ lazy val root = (project in file(".")) scalacheck, scalacheckToolboxDatetime, scalacheckToolboxMagic, - scalacheckToolboxCombinators + scalacheckToolboxCombinators, + sttp, + asyncSttp ), fork in Test := true, parallelExecution in Test := false, diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 594719c..6d8fabd 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -22,4 +22,6 @@ object Dependencies { lazy val scalacheckToolboxMagic = "com.47deg" %% "scalacheck-toolbox-magic" % "0.3.5" % Test lazy val scalacheckToolboxCombinators = "com.47deg" %% "scalacheck-toolbox-combinators" % "0.3.5" % Test lazy val spire = "org.typelevel" %% "spire" % "0.14.1" + lazy val sttp = "com.softwaremill.sttp.client3" %% "core" % "3.3.4" + lazy val asyncSttp = "com.softwaremill.sttp.client3" %% "async-http-client-backend-future" % "3.3.4" } diff --git a/src/main/scala/com/github/timgent/dataflare/repository/DfApiQcResultsRepository.scala b/src/main/scala/com/github/timgent/dataflare/repository/DfApiQcResultsRepository.scala new file mode 100644 index 0000000..aabc9a7 --- /dev/null +++ b/src/main/scala/com/github/timgent/dataflare/repository/DfApiQcResultsRepository.scala @@ -0,0 +1,53 @@ +package com.github.timgent.dataflare.repository + +import cats.implicits._ +import com.github.timgent.dataflare.checkssuite.ChecksSuiteResult +import com.github.timgent.dataflare.json.CustomEncodings.{checksSuiteResultDecoder, checksSuiteResultEncoder} +import io.circe.parser._ +import io.circe.syntax._ +import sttp.client3._ +import sttp.client3.asynchttpclient.future.AsyncHttpClientFutureBackend +import sttp.model.Uri + +import scala.concurrent.{ExecutionContext, Future} +class DfApiQcResultsRepository(host: Uri)(implicit + ec: ExecutionContext +) extends QcResultsRepository { + + private lazy val backend = AsyncHttpClientFutureBackend() + + /** + * Save Quality Check results to some repository + * + * @param qcResults A list of results + * @return A Future of Unit + */ + override def save(qcResults: List[ChecksSuiteResult]): Future[Unit] = { + qcResults.traverse(qcResult => + basicRequest + .contentType("application/json") + .body(qcResult.asJson.noSpaces) + .post(host.addPath("qcresults")) + .send(backend) + .map(_ => ()) + ) + }.map(_ => ()) + + /** + * Load all check results in the repository + * + * @return + */ + override def loadAll: Future[List[ChecksSuiteResult]] = + basicRequest + .contentType("application/json") + .get(host.addPath("qcresults")) + .send(backend) + .map { r => + val o: Either[String, String] = r.body + val p = parse(o.right.get).right + val pp = p.get.as[List[ChecksSuiteResult]] + val ppp = pp.right.get + ppp + } +} diff --git a/src/test/scala/com/github/timgent/dataflare/generators/Generators.scala b/src/test/scala/com/github/timgent/dataflare/generators/Generators.scala index 6d1604d..e922c8a 100644 --- a/src/test/scala/com/github/timgent/dataflare/generators/Generators.scala +++ b/src/test/scala/com/github/timgent/dataflare/generators/Generators.scala @@ -1,8 +1,15 @@ package com.github.timgent.dataflare.generators +import com.fortysevendeg.scalacheck.datetime.jdk8.ArbitraryJdk8.arbInstantJdk8 +import com.github.timgent.dataflare.checks.CheckDescription.{DualMetricCheckDescription, SimpleCheckDescription, SingleMetricCheckDescription} +import com.github.timgent.dataflare.checks.DatasourceDescription.{DualDsDescription, OtherDsDescription, SingleDsDescription} +import com.github.timgent.dataflare.checks._ +import com.github.timgent.dataflare.checkssuite.{CheckSuiteStatus, ChecksSuiteResult} import com.github.timgent.dataflare.metrics.SimpleMetricDescriptor -import org.scalacheck.Arbitrary import org.scalacheck.Arbitrary.arbitrary +import org.scalacheck.{Arbitrary, Gen} + +import java.time.Instant object Generators { implicit val arbSimpleMetricDescriptor: Arbitrary[SimpleMetricDescriptor] = Arbitrary(for { @@ -12,4 +19,34 @@ object Generators { onColumns <- arbitrary[Option[List[String]]] onColumn <- arbitrary[Option[String]] } yield SimpleMetricDescriptor(metricName, filterDescription, complianceDescription, onColumns, onColumn)) + implicit val checkSuiteStatusArb = Arbitrary(Gen.oneOf(CheckSuiteStatus.values)) + implicit val checkStatusArb = Arbitrary(Gen.oneOf(CheckStatus.values)) + implicit val simpleCheckDescriptionArb = Arbitrary(Gen.resultOf(SimpleCheckDescription)) + implicit val dualMetricCheckDescriptionArb = Arbitrary(Gen.resultOf(DualMetricCheckDescription)) + implicit val singleMetricCheckDescriptionArb = Arbitrary(Gen.resultOf(SingleMetricCheckDescription)) + implicit val checkDescriptionArb: Arbitrary[CheckDescription] = Arbitrary( + Gen.oneOf(arbitrary[SimpleCheckDescription], arbitrary[DualMetricCheckDescription], arbitrary[SingleMetricCheckDescription]) + ) + implicit val qcTypeArb = Arbitrary(Gen.oneOf(QcType.values)) + implicit val singleDsDescriptionArb = Arbitrary(Gen.resultOf(SingleDsDescription)) + implicit val dualDsDescriptionArb = Arbitrary(Gen.resultOf(DualDsDescription)) + implicit val otherDsDescriptionArb = Arbitrary(Gen.resultOf(OtherDsDescription)) + implicit val datasourceDescriptionArb: Arbitrary[Option[DatasourceDescription]] = + Arbitrary(Gen.option(Gen.oneOf(arbitrary[SingleDsDescription], arbitrary[DualDsDescription], arbitrary[OtherDsDescription]))) + implicit val checkResultArb = Arbitrary(for { + qcType <- arbitrary[QcType] + status <- arbitrary[CheckStatus] + resultDescription <- arbitrary[String] + checkDescription <- arbitrary[CheckDescription] + datasourceDescription <- arbitrary[Option[DatasourceDescription]] + } yield CheckResult(qcType, status, resultDescription, checkDescription, datasourceDescription)) + val checksSuiteResultGen: Gen[ChecksSuiteResult] = for { + overallStatus <- arbitrary[CheckSuiteStatus] + checkSuiteDescription <- arbitrary[String] + checkResults <- arbitrary[Seq[CheckResult]] + timestamp <- arbitrary[Instant] + checkTags <- arbitrary[Map[String, String]] + } yield ChecksSuiteResult(overallStatus, checkSuiteDescription, checkResults, timestamp, checkTags) + implicit val checksSuiteResultArb: Arbitrary[ChecksSuiteResult] = Arbitrary(checksSuiteResultGen) + } diff --git a/src/test/scala/com/github/timgent/dataflare/repository/DfApiQcResultsRepositoryTest.scala b/src/test/scala/com/github/timgent/dataflare/repository/DfApiQcResultsRepositoryTest.scala new file mode 100644 index 0000000..3542826 --- /dev/null +++ b/src/test/scala/com/github/timgent/dataflare/repository/DfApiQcResultsRepositoryTest.scala @@ -0,0 +1,86 @@ +package com.github.timgent.dataflare.repository + +import com.github.timgent.dataflare.checks.CheckDescription.SimpleCheckDescription +import com.github.timgent.dataflare.checks.QcType.{ArbDualDsCheck, ArbSingleDsCheck} +import com.github.timgent.dataflare.checks._ +import com.github.timgent.dataflare.checkssuite.CheckSuiteStatus.{Error, Success} +import com.github.timgent.dataflare.checkssuite.{CheckSuiteStatus, ChecksSuiteResult} +import com.github.timgent.dataflare.utils.CommonFixtures._ +import org.scalatest.concurrent.Eventually +import org.scalatest.matchers.should.Matchers +import org.scalatest.wordspec.AsyncWordSpec +import org.scalatestplus.scalacheck.ScalaCheckDrivenPropertyChecks +import sttp.client3._ + +import scala.concurrent.Future +import scala.concurrent.duration._ + +class DfApiQcResultsRepositoryTest + extends AsyncWordSpec + with Matchers + with Eventually + with EsTestUtils + with ScalaCheckDrivenPropertyChecks { + + "DfApiResultsRepository.save" should { + def generateRawCheckResult(qcType: QcType, suffix: String, status: CheckStatus) = + CheckResult(qcType, status, s"checkResult$suffix", SimpleCheckDescription(s"checkDescription$suffix")) + + val uri = uri"http://127.0.0.1:8080" + implicit val patienceConfig: PatienceConfig = PatienceConfig(5 seconds, 1 second) + + "post check suite results to the API" in { + val repo: DfApiQcResultsRepository = + new DfApiQcResultsRepository(uri) + + val checkResultA1 = + generateRawCheckResult(ArbSingleDsCheck, "A1", CheckStatus.Success) + val checkResultA2 = + generateRawCheckResult(ArbSingleDsCheck, "A2", CheckStatus.Success) + val checkResultB1 = + generateRawCheckResult(ArbDualDsCheck, "B1", CheckStatus.Error) + val checkResultB2 = + generateRawCheckResult(ArbDualDsCheck, "B2", CheckStatus.Error) + val checkResultB1Success = + generateRawCheckResult(ArbDualDsCheck, "B1", CheckStatus.Error) + val checkResultB2Success = + generateRawCheckResult(ArbDualDsCheck, "B2", CheckStatus.Error) + val initialResultsToInsert: List[ChecksSuiteResult] = List( + ChecksSuiteResult( + Success, + "checkSuiteA", + Seq(checkResultA1, checkResultA2), + now, + someTags + ), + ChecksSuiteResult( + Error, + "checkSuiteB", + Seq(checkResultB1, checkResultB2), + now, + someTags + ) + ) + val moreResultsToInsert: List[ChecksSuiteResult] = List( + ChecksSuiteResult( + CheckSuiteStatus.Success, + "checkSuiteB", + Seq(checkResultB1Success, checkResultB2Success), + now.plusSeconds(10), + someTags + ) + ) + + def storedResultsFut(): Future[List[ChecksSuiteResult]] = repo.loadAll + + for { + _ <- repo.save(initialResultsToInsert) + _ <- checkStoredResultsAre(storedResultsFut, initialResultsToInsert) + _ <- repo.save(moreResultsToInsert) + finalAssertion <- checkStoredResultsAre(storedResultsFut, initialResultsToInsert ++ moreResultsToInsert) + } yield { + finalAssertion + } + } + } +} From 75cc6ce6853c5c94262c248007920305fd3c65e7 Mon Sep 17 00:00:00 2001 From: pbroda Date: Fri, 21 May 2021 14:53:01 +0200 Subject: [PATCH 2/4] updated example --- .../timgent/dataflare/examples/Example.scala | 29 ++++++++++++------- 1 file changed, 18 insertions(+), 11 deletions(-) diff --git a/src/main/scala/com/github/timgent/dataflare/examples/Example.scala b/src/main/scala/com/github/timgent/dataflare/examples/Example.scala index 38bb556..59a8593 100644 --- a/src/main/scala/com/github/timgent/dataflare/examples/Example.scala +++ b/src/main/scala/com/github/timgent/dataflare/examples/Example.scala @@ -1,25 +1,25 @@ package com.github.timgent.dataflare.examples -import java.time.{LocalDateTime, ZoneOffset} - import cats.implicits._ import com.github.timgent.dataflare.checks.metrics.{DualMetricCheck, SingleMetricCheck} import com.github.timgent.dataflare.checks.{ArbSingleDsCheck, CheckStatus, RawCheckResult} import com.github.timgent.dataflare.checkssuite._ -import com.github.timgent.dataflare.examples.Day1Checks.qcResults import com.github.timgent.dataflare.examples.ExampleHelpers.{Customer, Order, _} import com.github.timgent.dataflare.metrics.MetricDescriptor.{CountDistinctValuesMetric, SizeMetric} -import com.github.timgent.dataflare.metrics.{ComplianceFn, MetricComparator, MetricFilter} -import com.github.timgent.dataflare.repository.{ElasticSearchMetricsPersister, ElasticSearchQcResultsRepository} +import com.github.timgent.dataflare.metrics.{ComplianceFn, MetricComparator} +import com.github.timgent.dataflare.repository.{DfApiQcResultsRepository, ElasticSearchMetricsPersister, ElasticSearchQcResultsRepository} import com.github.timgent.dataflare.thresholds.AbsoluteThreshold import com.github.timgent.dataflare.utils.DateTimeUtils.InstantExtension import org.apache.spark.SparkConf -import org.apache.spark.sql.{Dataset, SparkSession} +import org.apache.spark.sql.SparkSession import org.apache.spark.sql.functions._ +import sttp.client3.UriContext +import java.time.{LocalDateTime, ZoneOffset} import scala.concurrent.Await import scala.concurrent.ExecutionContext.Implicits.global import scala.concurrent.duration._ +import scala.sys.exit object ExampleHelpers { val sparkConf = new SparkConf().setAppName("SparkDataQualityExample").setMaster("local") @@ -131,6 +131,7 @@ object Helpers { ElasticSearchQcResultsRepository(List("http://127.0.0.1:9200"), "orders_qc_results") val esMetricsPersister = ElasticSearchMetricsPersister(List("http://127.0.0.1:9200"), "order_metrics") + val apiQcResultsRepository = new DfApiQcResultsRepository(uri"http://127.0.0.1:8080") def getCheckSuite( orderDs: DescribedDs, @@ -185,7 +186,7 @@ object Helpers { singleDsChecks = singleDsChecks |+| Map(customerDs -> List(expectedCustomerColumnsCheck)), dualDsChecks = dualDsMetricChecks, metricsPersister = esMetricsPersister, - qcResultsRepository = qcResultsRepository + qcResultsRepository = apiQcResultsRepository ) checksSuite @@ -199,12 +200,14 @@ object Day1Checks extends App { val checksSuite = Helpers.getCheckSuite(orderDs, customerDs, customersWithOrdersDs) val allQcResultsFuture = checksSuite.run(monday) - val qcResults = Await.result(allQcResultsFuture, 10 seconds) + val allQcResults = Await.result(allQcResultsFuture, 10 seconds) - if (qcResults.overallStatus == CheckSuiteStatus.Success) + if (allQcResults.overallStatus == CheckSuiteStatus.Success) println("All checks completed successfully!!") else println("Checks failed :(") + + exit(0) } object Day2Checks extends App { @@ -216,10 +219,12 @@ object Day2Checks extends App { val allQcResultsFuture = checksSuite.run(tuesday) val allQcResults = Await.result(allQcResultsFuture, 10 seconds) - if (qcResults.overallStatus == CheckSuiteStatus.Success) + if (allQcResults.overallStatus == CheckSuiteStatus.Success) println("All checks completed successfully!!") else println("Checks failed :(") + + exit(0) } object Day3Checks extends App { @@ -231,8 +236,10 @@ object Day3Checks extends App { val allQcResultsFuture = checksSuite.run(wednesday) val allQcResults = Await.result(allQcResultsFuture, 10 seconds) - if (qcResults.overallStatus == CheckSuiteStatus.Success) + if (allQcResults.overallStatus == CheckSuiteStatus.Success) println("All checks completed successfully!!") else println("Checks failed :(") + + exit(0) } From 4e670ac1f93d46e3ceae51ce7bbdac05b6504552 Mon Sep 17 00:00:00 2001 From: Tim Gent Date: Wed, 7 Jul 2021 11:52:54 +0100 Subject: [PATCH 3/4] Surface any errors when saving QC Results --- .../dataflare/checkssuite/ChecksSuite.scala | 39 ++++++++++++++-- .../repository/DfApiQcResultsRepository.scala | 17 ++++--- .../ElasticSearchQcResultsRepository.scala | 14 +++--- .../repository/QcResultsRepository.scala | 45 ++++++++++++++++--- 4 files changed, 94 insertions(+), 21 deletions(-) diff --git a/src/main/scala/com/github/timgent/dataflare/checkssuite/ChecksSuite.scala b/src/main/scala/com/github/timgent/dataflare/checkssuite/ChecksSuite.scala index 2ca1ae3..fa2b58d 100644 --- a/src/main/scala/com/github/timgent/dataflare/checkssuite/ChecksSuite.scala +++ b/src/main/scala/com/github/timgent/dataflare/checkssuite/ChecksSuite.scala @@ -37,6 +37,10 @@ case class DescribedDsPair(ds: DescribedDs, dsToCompare: DescribedDs) { private[dataflare] def rawDatasetPair = DatasetPair(ds.ds, dsToCompare.ds) } +trait ChecksSuiteErr { + def throwErr: Nothing +} + /** * Main entry point which contains the suite of checks you want to perform * @param checkSuiteDescription - description of the check suite @@ -91,9 +95,22 @@ case class ChecksSuite( * @param ec - execution context * @return */ - def runBlocking(timestamp: Instant, timeout: Duration = 1 minute)(implicit ec: ExecutionContext) = + @deprecated("Will be replaced by runBlockingV2 which surfaces errors", "July 2021") + def runBlocking(timestamp: Instant, timeout: Duration = 1 minute)(implicit ec: ExecutionContext): ChecksSuiteResult = Await.result(run(timestamp), timeout) + /** + * Run all checks in the ChecksSuite and waits for computations to finish before returning (blocking the thread) + * + * @param timestamp - time the checks are being run + * @param ec - execution context + * @return either an error or the ChecksSuiteResult + */ + def runBlockingV2(timestamp: Instant, timeout: Duration = 1 minute)(implicit + ec: ExecutionContext + ): Either[ChecksSuiteErr, ChecksSuiteResult] = + Await.result(runV2(timestamp), timeout) + /** * Run all checks in the ChecksSuite asynchronously, returning a Future * @@ -101,7 +118,23 @@ case class ChecksSuite( * @param ec - execution context * @return */ + @deprecated("Will be replaced by runV2 which surfaces errors in the return type", "July 2021") def run(timestamp: Instant)(implicit ec: ExecutionContext): Future[ChecksSuiteResult] = { + runV2(timestamp).map { + case Left(err) => err.throwErr + case Right(checkSuiteResult) => checkSuiteResult + } + } + + /** + * Run all checks in the ChecksSuite asynchronously, returning a Future with either an + * error of the ChecksSuiteResult + * + * @param timestamp - time the checks are being run + * @param ec - execution context + * @return + */ + def runV2(timestamp: Instant)(implicit ec: ExecutionContext): Future[Either[ChecksSuiteErr, ChecksSuiteResult]] = { val metricBasedCheckResultsFut: Future[Seq[CheckResult]] = runMetricBasedChecks(timestamp) val singleDatasetCheckResults: Seq[CheckResult] = for { (dds, checks) <- arbSingleDsChecks.toSeq @@ -127,9 +160,9 @@ case class ChecksSuite( timestamp = timestamp, tags ) - _ <- qcResultsRepository.save(checkSuiteResult) + maybeSavedCheckSuiteResult <- qcResultsRepository.saveV2(checkSuiteResult) } yield { - checkSuiteResult + maybeSavedCheckSuiteResult } } diff --git a/src/main/scala/com/github/timgent/dataflare/repository/DfApiQcResultsRepository.scala b/src/main/scala/com/github/timgent/dataflare/repository/DfApiQcResultsRepository.scala index aabc9a7..79eea39 100644 --- a/src/main/scala/com/github/timgent/dataflare/repository/DfApiQcResultsRepository.scala +++ b/src/main/scala/com/github/timgent/dataflare/repository/DfApiQcResultsRepository.scala @@ -3,6 +3,7 @@ package com.github.timgent.dataflare.repository import cats.implicits._ import com.github.timgent.dataflare.checkssuite.ChecksSuiteResult import com.github.timgent.dataflare.json.CustomEncodings.{checksSuiteResultDecoder, checksSuiteResultEncoder} +import com.github.timgent.dataflare.repository.QcResultsRepoErr.SaveQcResultErr import io.circe.parser._ import io.circe.syntax._ import sttp.client3._ @@ -10,9 +11,7 @@ import sttp.client3.asynchttpclient.future.AsyncHttpClientFutureBackend import sttp.model.Uri import scala.concurrent.{ExecutionContext, Future} -class DfApiQcResultsRepository(host: Uri)(implicit - ec: ExecutionContext -) extends QcResultsRepository { +class DfApiQcResultsRepository(host: Uri)(implicit val ec: ExecutionContext) extends QcResultsRepository { private lazy val backend = AsyncHttpClientFutureBackend() @@ -22,16 +21,22 @@ class DfApiQcResultsRepository(host: Uri)(implicit * @param qcResults A list of results * @return A Future of Unit */ - override def save(qcResults: List[ChecksSuiteResult]): Future[Unit] = { + override def saveV2(qcResults: List[ChecksSuiteResult]): Future[List[Either[QcResultsRepoErr, ChecksSuiteResult]]] = { qcResults.traverse(qcResult => basicRequest .contentType("application/json") .body(qcResult.asJson.noSpaces) .post(host.addPath("qcresults")) .send(backend) - .map(_ => ()) + .map { res => + val mapped: Either[SaveQcResultErr, ChecksSuiteResult] = res.body match { + case Left(err) => Left(SaveQcResultErr(err)) + case Right(_) => Right(qcResult) + } + mapped + } ) - }.map(_ => ()) + } /** * Load all check results in the repository diff --git a/src/main/scala/com/github/timgent/dataflare/repository/ElasticSearchQcResultsRepository.scala b/src/main/scala/com/github/timgent/dataflare/repository/ElasticSearchQcResultsRepository.scala index cc9014c..74e0fec 100644 --- a/src/main/scala/com/github/timgent/dataflare/repository/ElasticSearchQcResultsRepository.scala +++ b/src/main/scala/com/github/timgent/dataflare/repository/ElasticSearchQcResultsRepository.scala @@ -2,10 +2,11 @@ package com.github.timgent.dataflare.repository import com.github.timgent.dataflare.checkssuite.ChecksSuiteResult import com.github.timgent.dataflare.json.CustomEncodings.{checksSuiteResultDecoder, checksSuiteResultEncoder} +import com.github.timgent.dataflare.repository.QcResultsRepoErr.SaveQcResultErr import com.sksamuel.elastic4s.ElasticDsl._ import com.sksamuel.elastic4s.circe._ import com.sksamuel.elastic4s.http.JavaClient -import com.sksamuel.elastic4s.{ElasticClient, ElasticProperties, Index} +import com.sksamuel.elastic4s.{ElasticClient, ElasticProperties, Index, RequestFailure, RequestSuccess} import scala.concurrent.{ExecutionContext, Future} @@ -15,17 +16,18 @@ import scala.concurrent.{ExecutionContext, Future} * @param index - the name of the index to save QC results to * @param ec - the execution context */ -class ElasticSearchQcResultsRepository(client: ElasticClient, index: Index)(implicit - ec: ExecutionContext -) extends QcResultsRepository { - override def save(qcResults: List[ChecksSuiteResult]): Future[Unit] = { +class ElasticSearchQcResultsRepository(client: ElasticClient, index: Index)(implicit val ec: ExecutionContext) extends QcResultsRepository { + override def saveV2(qcResults: List[ChecksSuiteResult]): Future[List[Either[QcResultsRepoErr, ChecksSuiteResult]]] = { client .execute { bulk( qcResults.map(indexInto(index).doc(_)) ) } - .map(_ => {}) + .map { + case RequestSuccess(status, body, headers, result) => qcResults.map(Right(_)) + case RequestFailure(status, body, headers, error) => List(Left(SaveQcResultErr(error.reason))) + } } override def loadAll: Future[List[ChecksSuiteResult]] = { diff --git a/src/main/scala/com/github/timgent/dataflare/repository/QcResultsRepository.scala b/src/main/scala/com/github/timgent/dataflare/repository/QcResultsRepository.scala index c60ad54..b8344d1 100644 --- a/src/main/scala/com/github/timgent/dataflare/repository/QcResultsRepository.scala +++ b/src/main/scala/com/github/timgent/dataflare/repository/QcResultsRepository.scala @@ -1,19 +1,32 @@ package com.github.timgent.dataflare.repository -import com.github.timgent.dataflare.checkssuite.ChecksSuiteResult +import com.github.timgent.dataflare.checkssuite.{ChecksSuiteErr, ChecksSuiteResult} import scala.collection.mutable.ListBuffer -import scala.concurrent.Future +import scala.concurrent.{ExecutionContext, Future} trait QcResultsRepository { + implicit def ec: ExecutionContext + + /** + * Save Quality Check results to some repository. Will replace save method over time + * @param qcResults A list of results + * @return A Future of either an error or unit + */ + def saveV2(qcResults: List[ChecksSuiteResult]): Future[List[Either[QcResultsRepoErr, ChecksSuiteResult]]] /** * Save Quality Check results to some repository * @param qcResults A list of results * @return A Future of Unit */ - def save(qcResults: List[ChecksSuiteResult]): Future[Unit] + @deprecated("will be replaced by saveV2 method which has better error handling", "Jul 2021") + def save(qcResults: List[ChecksSuiteResult]): Future[Unit] = saveV2(qcResults).map(_ => ()) + def saveV2(qcResult: ChecksSuiteResult): Future[Either[QcResultsRepoErr, ChecksSuiteResult]] = + saveV2(List(qcResult)).map(_.head) + + @deprecated("will be replaced by saveV2 method which has better error handling", "Jul 2021") def save(qcResult: ChecksSuiteResult): Future[Unit] = save(List(qcResult)) /** @@ -23,15 +36,32 @@ trait QcResultsRepository { def loadAll: Future[List[ChecksSuiteResult]] } +sealed trait QcResultsRepoErr extends ChecksSuiteErr + +object QcResultsRepoErr { + + class QcResultsRepoException(msg: String) extends Exception(msg) + + /** + * Represents an error that occurred when saving QC Results + * @param err describes the error that was encountered + */ + case class SaveQcResultErr(err: String) extends QcResultsRepoErr { + override def throwErr: Nothing = throw new QcResultsRepoException(err) + } +} + /** * In memory storage of QC Results. Not recommended for production use */ class InMemoryQcResultsRepository extends QcResultsRepository { + override implicit def ec: ExecutionContext = scala.concurrent.ExecutionContext.global + val savedResults: ListBuffer[ChecksSuiteResult] = ListBuffer.empty - override def save(qcResults: List[ChecksSuiteResult]): Future[Unit] = { + override def saveV2(qcResults: List[ChecksSuiteResult]): Future[List[Either[QcResultsRepoErr, ChecksSuiteResult]]] = { savedResults ++= qcResults - Future.successful({}) + Future.successful(qcResults.map(Right(_))) } override def loadAll: Future[List[ChecksSuiteResult]] = Future.successful(savedResults.toList) @@ -41,7 +71,10 @@ class InMemoryQcResultsRepository extends QcResultsRepository { * Use the NullQcResultsRepository if you don't need to store QC Results */ class NullQcResultsRepository extends QcResultsRepository { - override def save(qcResults: List[ChecksSuiteResult]): Future[Unit] = Future.successful({}) + override implicit def ec: ExecutionContext = scala.concurrent.ExecutionContext.global + + override def saveV2(qcResults: List[ChecksSuiteResult]): Future[List[Either[QcResultsRepoErr, ChecksSuiteResult]]] = + Future.successful(qcResults.map(Right(_))) override def loadAll: Future[List[ChecksSuiteResult]] = Future.successful(List.empty) } From e245371999925de2a2c9001625d21579e8fe55ef Mon Sep 17 00:00:00 2001 From: Tim Gent Date: Wed, 7 Jul 2021 12:21:16 +0100 Subject: [PATCH 4/4] Handle errors in loadAll method of repos --- .../dataflare/checkssuite/ChecksSuite.scala | 2 +- .../repository/DfApiQcResultsRepository.scala | 20 ++++++++------ .../ElasticSearchQcResultsRepository.scala | 6 ++--- .../repository/QcResultsRepository.scala | 27 ++++++++++++++----- .../checkssuite/ChecksSuiteTest.scala | 10 +++---- .../DfApiQcResultsRepositoryTest.scala | 2 +- ...ElasticSearchQcResultsRepositoryTest.scala | 2 +- 7 files changed, 44 insertions(+), 25 deletions(-) diff --git a/src/main/scala/com/github/timgent/dataflare/checkssuite/ChecksSuite.scala b/src/main/scala/com/github/timgent/dataflare/checkssuite/ChecksSuite.scala index fa2b58d..0d7289c 100644 --- a/src/main/scala/com/github/timgent/dataflare/checkssuite/ChecksSuite.scala +++ b/src/main/scala/com/github/timgent/dataflare/checkssuite/ChecksSuite.scala @@ -1,7 +1,6 @@ package com.github.timgent.dataflare.checkssuite import java.time.Instant - import cats.implicits._ import com.github.timgent.dataflare.FlareError.MetricCalculationError import com.github.timgent.dataflare.checks.ArbDualDsCheck.DatasetPair @@ -10,6 +9,7 @@ import com.github.timgent.dataflare.checks.QCCheck.{DualDsQCCheck, SingleDsCheck import com.github.timgent.dataflare.checks._ import com.github.timgent.dataflare.checks.metrics.{DualMetricCheck, SingleMetricCheck} import com.github.timgent.dataflare.metrics.{MetricDescriptor, MetricValue, MetricsCalculator} +import com.github.timgent.dataflare.repository.QcResultsRepoErr.QcResultsRepoException import com.github.timgent.dataflare.repository.{MetricsPersister, NullMetricsPersister, NullQcResultsRepository, QcResultsRepository} import org.apache.spark.sql.Dataset diff --git a/src/main/scala/com/github/timgent/dataflare/repository/DfApiQcResultsRepository.scala b/src/main/scala/com/github/timgent/dataflare/repository/DfApiQcResultsRepository.scala index 79eea39..cabb4a7 100644 --- a/src/main/scala/com/github/timgent/dataflare/repository/DfApiQcResultsRepository.scala +++ b/src/main/scala/com/github/timgent/dataflare/repository/DfApiQcResultsRepository.scala @@ -3,7 +3,7 @@ package com.github.timgent.dataflare.repository import cats.implicits._ import com.github.timgent.dataflare.checkssuite.ChecksSuiteResult import com.github.timgent.dataflare.json.CustomEncodings.{checksSuiteResultDecoder, checksSuiteResultEncoder} -import com.github.timgent.dataflare.repository.QcResultsRepoErr.SaveQcResultErr +import com.github.timgent.dataflare.repository.QcResultsRepoErr.{LoadQcResultErr, SaveQcResultErr} import io.circe.parser._ import io.circe.syntax._ import sttp.client3._ @@ -43,16 +43,20 @@ class DfApiQcResultsRepository(host: Uri)(implicit val ec: ExecutionContext) ext * * @return */ - override def loadAll: Future[List[ChecksSuiteResult]] = + override def loadAll: Future[Either[LoadQcResultErr, List[ChecksSuiteResult]]] = basicRequest .contentType("application/json") .get(host.addPath("qcresults")) .send(backend) - .map { r => - val o: Either[String, String] = r.body - val p = parse(o.right.get).right - val pp = p.get.as[List[ChecksSuiteResult]] - val ppp = pp.right.get - ppp + .map { response => + for { + bodyStr <- response.body.leftMap(err => LoadQcResultErr("Received an unsuccessful response from the API: " + err, None)) + bodyJson <- + parse(bodyStr).leftMap(err => LoadQcResultErr("Response json was not valid JSON: " + err.message, Some(err.underlying))) + deserializedBody <- + bodyJson + .as[List[ChecksSuiteResult]] + .leftMap(err => LoadQcResultErr("Response JSON could not be deserialized: " + err.message, None)) + } yield deserializedBody } } diff --git a/src/main/scala/com/github/timgent/dataflare/repository/ElasticSearchQcResultsRepository.scala b/src/main/scala/com/github/timgent/dataflare/repository/ElasticSearchQcResultsRepository.scala index 74e0fec..b397b44 100644 --- a/src/main/scala/com/github/timgent/dataflare/repository/ElasticSearchQcResultsRepository.scala +++ b/src/main/scala/com/github/timgent/dataflare/repository/ElasticSearchQcResultsRepository.scala @@ -2,7 +2,7 @@ package com.github.timgent.dataflare.repository import com.github.timgent.dataflare.checkssuite.ChecksSuiteResult import com.github.timgent.dataflare.json.CustomEncodings.{checksSuiteResultDecoder, checksSuiteResultEncoder} -import com.github.timgent.dataflare.repository.QcResultsRepoErr.SaveQcResultErr +import com.github.timgent.dataflare.repository.QcResultsRepoErr.{LoadQcResultErr, SaveQcResultErr} import com.sksamuel.elastic4s.ElasticDsl._ import com.sksamuel.elastic4s.circe._ import com.sksamuel.elastic4s.http.JavaClient @@ -30,11 +30,11 @@ class ElasticSearchQcResultsRepository(client: ElasticClient, index: Index)(impl } } - override def loadAll: Future[List[ChecksSuiteResult]] = { + override def loadAll: Future[Either[LoadQcResultErr, List[ChecksSuiteResult]]] = { val resp = client.execute { search(index) query matchAllQuery } - resp.map(_.result.hits.hits.map(_.to[ChecksSuiteResult]).toList) + resp.map(response => Right(response.result.hits.hits.map(_.to[ChecksSuiteResult]).toList)) } } diff --git a/src/main/scala/com/github/timgent/dataflare/repository/QcResultsRepository.scala b/src/main/scala/com/github/timgent/dataflare/repository/QcResultsRepository.scala index b8344d1..20ad787 100644 --- a/src/main/scala/com/github/timgent/dataflare/repository/QcResultsRepository.scala +++ b/src/main/scala/com/github/timgent/dataflare/repository/QcResultsRepository.scala @@ -1,6 +1,7 @@ package com.github.timgent.dataflare.repository import com.github.timgent.dataflare.checkssuite.{ChecksSuiteErr, ChecksSuiteResult} +import com.github.timgent.dataflare.repository.QcResultsRepoErr.{LoadQcResultErr, QcResultsRepoException} import scala.collection.mutable.ListBuffer import scala.concurrent.{ExecutionContext, Future} @@ -33,22 +34,36 @@ trait QcResultsRepository { * Load all check results in the repository * @return */ - def loadAll: Future[List[ChecksSuiteResult]] + def loadAll: Future[Either[LoadQcResultErr, List[ChecksSuiteResult]]] } -sealed trait QcResultsRepoErr extends ChecksSuiteErr +sealed trait QcResultsRepoErr extends ChecksSuiteErr { + def throwErr: Nothing = + e match { + case Some(e) => throw new QcResultsRepoException(err, e) + case None => throw new QcResultsRepoException(err) + } + def err: String + def e: Option[Throwable] +} object QcResultsRepoErr { - class QcResultsRepoException(msg: String) extends Exception(msg) + class QcResultsRepoException(msg: String) extends Exception(msg) { + def this(message: String, cause: Throwable) { + this(message) + initCause(cause) + } + } /** * Represents an error that occurred when saving QC Results * @param err describes the error that was encountered */ case class SaveQcResultErr(err: String) extends QcResultsRepoErr { - override def throwErr: Nothing = throw new QcResultsRepoException(err) + override def e: Option[Throwable] = None } + case class LoadQcResultErr(err: String, e: Option[Throwable]) extends QcResultsRepoErr } /** @@ -64,7 +79,7 @@ class InMemoryQcResultsRepository extends QcResultsRepository { Future.successful(qcResults.map(Right(_))) } - override def loadAll: Future[List[ChecksSuiteResult]] = Future.successful(savedResults.toList) + override def loadAll: Future[Either[LoadQcResultErr, List[ChecksSuiteResult]]] = Future.successful(Right(savedResults.toList)) } /** @@ -76,5 +91,5 @@ class NullQcResultsRepository extends QcResultsRepository { override def saveV2(qcResults: List[ChecksSuiteResult]): Future[List[Either[QcResultsRepoErr, ChecksSuiteResult]]] = Future.successful(qcResults.map(Right(_))) - override def loadAll: Future[List[ChecksSuiteResult]] = Future.successful(List.empty) + override def loadAll: Future[Either[LoadQcResultErr, List[ChecksSuiteResult]]] = Future.successful(Right(List.empty)) } diff --git a/src/test/scala/com/github/timgent/dataflare/checkssuite/ChecksSuiteTest.scala b/src/test/scala/com/github/timgent/dataflare/checkssuite/ChecksSuiteTest.scala index 665ba90..9f422f2 100644 --- a/src/test/scala/com/github/timgent/dataflare/checkssuite/ChecksSuiteTest.scala +++ b/src/test/scala/com/github/timgent/dataflare/checkssuite/ChecksSuiteTest.scala @@ -420,9 +420,9 @@ class ChecksSuiteTest extends AsyncWordSpec with DatasetSuiteBase with Matchers for { qcResults: ChecksSuiteResult <- qualityChecks.run(now) - persistedQcResults: Seq[ChecksSuiteResult] <- qcResultsRepository.loadAll + persistedQcResults <- qcResultsRepository.loadAll } yield { - checkResultAndPersistedResult(qcResults, persistedQcResults.head)( + checkResultAndPersistedResult(qcResults, persistedQcResults.right.get.head)( timestamp = now, checkSuiteDescription = "DB: X, table: Y", checkStatus = CheckSuiteStatus.Error, @@ -485,9 +485,9 @@ class ChecksSuiteTest extends AsyncWordSpec with DatasetSuiteBase with Matchers for { qcResults: ChecksSuiteResult <- qualityChecks.run(now) - persistedQcResults: Seq[ChecksSuiteResult] <- qcResultsRepository.loadAll + persistedQcResults <- qcResultsRepository.loadAll } yield { - checkResultAndPersistedResult(qcResults, persistedQcResults.head)( + checkResultAndPersistedResult(qcResults, persistedQcResults.right.get.head)( timestamp = now, checkSuiteDescription = "table A vs table B comparison", checkStatus = CheckSuiteStatus.Error, @@ -541,7 +541,7 @@ class ChecksSuiteTest extends AsyncWordSpec with DatasetSuiteBase with Matchers for { qcResults: ChecksSuiteResult <- qualityChecks.run(now) - persistedQcResults: Seq[ChecksSuiteResult] <- qcResultsRepository.loadAll + persistedQcResults <- qcResultsRepository.loadAll.map(_.right.get) } yield { qcResults.timestamp shouldBe now qcResults.checkSuiteDescription shouldBe "table A, table B, and table C comparison" diff --git a/src/test/scala/com/github/timgent/dataflare/repository/DfApiQcResultsRepositoryTest.scala b/src/test/scala/com/github/timgent/dataflare/repository/DfApiQcResultsRepositoryTest.scala index 3542826..13a5bd8 100644 --- a/src/test/scala/com/github/timgent/dataflare/repository/DfApiQcResultsRepositoryTest.scala +++ b/src/test/scala/com/github/timgent/dataflare/repository/DfApiQcResultsRepositoryTest.scala @@ -71,7 +71,7 @@ class DfApiQcResultsRepositoryTest ) ) - def storedResultsFut(): Future[List[ChecksSuiteResult]] = repo.loadAll + def storedResultsFut(): Future[List[ChecksSuiteResult]] = repo.loadAll.map(_.right.get) for { _ <- repo.save(initialResultsToInsert) diff --git a/src/test/scala/com/github/timgent/dataflare/repository/ElasticSearchQcResultsRepositoryTest.scala b/src/test/scala/com/github/timgent/dataflare/repository/ElasticSearchQcResultsRepositoryTest.scala index 9f32e82..83e5d85 100644 --- a/src/test/scala/com/github/timgent/dataflare/repository/ElasticSearchQcResultsRepositoryTest.scala +++ b/src/test/scala/com/github/timgent/dataflare/repository/ElasticSearchQcResultsRepositoryTest.scala @@ -116,7 +116,7 @@ class ElasticSearchQcResultsRepositoryTest ) ) - def storedResultsFut(): Future[List[ChecksSuiteResult]] = repo.loadAll + def storedResultsFut(): Future[List[ChecksSuiteResult]] = repo.loadAll.map(_.right.get) for { _ <- repo.save(initialResultsToInsert)