abstract class BaseF[I, R, F[_]: Async](functionNameOverride: Option[String])(implicit val schema: DBSchema) {
def toFragmentsSeq: I => Seq[Fragment]
val functionName: String = {
val fn = functionNameOverride.getOrElse(schema.objectNameFromClassName(getClass))
if (schema.schemaName.isEmpty) {
fn
} else {
s"${schema.schemaName}.$fn"
}
}
protected val alias = "FNC"
def fieldsToSelect: Seq[String] = Seq.empty
protected def selectEntry: String = {
val fieldsSeq = fieldsToSelect
if (fieldsSeq.isEmpty) {
"*"
} else {
val aliasToUse = if (alias.isEmpty) {
""
} else {
s"$alias."
}
fieldsToSelect.map(aliasToUse + _).mkString(",")
}
}
protected final def composeFragments(fragments: Seq[Fragment]): Fragment = {
val args = fragments.toList match {
case head :: tail => tail.foldLeft(head)((acc, frag) => acc ++ fr"," ++ frag)
case Nil => fr""
}
sql"SELECT ${Fragment.const(selectEntry)} FROM ${Fragment.const(functionName)}($args) ${Fragment.const(alias)};"
}
protected def executeQuery[T](input: I, query: Fragment => F[T]): F[T] = {
MonadError[F, Throwable]
.catchNonFatal(composeFragments(toFragmentsSeq(input)))
.flatMap(query)
}
}
trait StatusSupport {
def checkStatus[A](statusWithData: FunctionStatusWithData[A]): Either[StatusException, FunctionStatusWithData[A]]
}
trait StandardStatusSupport extends StatusSupport {
override def checkStatus[A](statusWithData: FunctionStatusWithData[A]): Either[StatusException, FunctionStatusWithData[A]] = {
val functionStatus = statusWithData.functionStatus
functionStatus.statusCode / 10 match {
case 1 => Right(statusWithData)
case 2 => Left(ServerMisconfigurationException(functionStatus))
case 3 => Left(DataConflictException(functionStatus))
case 4 => Left(DataNotFoundException(functionStatus))
case 5 | 6 | 7 | 8 => Left(ErrorInDataException(functionStatus))
case 9 => Left(OtherStatusException(functionStatus))
case _ => Left(StatusOutOfRangeException(functionStatus))
}
}
}
class SingleF[I, R, F[_]: Async](
val toFragmentsSeq: I => Seq[Fragment],
val functionNameOverride: Option[String] = None
)(implicit dbSchema: DBSchema, read: Read[R])
extends BaseF[I, R, F](functionNameOverride) {
def apply(input: I)(implicit transactor: Transactor[F]): F[R] = {
executeQuery(input, fragment => fragment.query[R](read).unique.transact(transactor))
}
}
class SingleFConnectionIO[I, R, F[_]: Async](
val toFragmentsSeq: I => Seq[Fragment],
val functionNameOverride: Option[String] = None
)(implicit dbSchema: DBSchema, read: Read[R])
extends BaseF[I, R, F](functionNameOverride) {
def apply(input: I): F[ConnectionIO[R]] = {
executeQuery(input, fragment => Async[F].delay(fragment.query[R](read).unique))
}
}
abstract class SingleFWithStatusSupport[I, R, F[_]: Async](
val toFragmentsSeq: I => Seq[Fragment],
val functionNameOverride: Option[String] = None
)(implicit dbSchema: DBSchema, read: Read[StatusWithData[R]])
extends BaseF[I, R, F](functionNameOverride)
with StatusSupport {
def apply(input: I)(implicit transactor: Transactor[F]): F[Either[StatusException, FunctionStatusWithData[R]]] = {
executeQuery(input, fragment => fragment.query[StatusWithData[R]](read).unique.transact(transactor))
.map(statusWithData =>
FunctionStatusWithData(FunctionStatus(statusWithData.status, statusWithData.statusText), statusWithData.data)
)
.map(functionStatusWithData => checkStatus(functionStatusWithData))
}
}
class MultiF[I, R, F[_]: Async](
val toFragmentsSeq: I => Seq[Fragment],
val functionNameOverride: Option[String] = None
)(implicit dbSchema: DBSchema, read: Read[R])
extends BaseF[I, R, F](functionNameOverride) {
def apply(input: I)(implicit transactor: Transactor[F]): F[Seq[R]] = {
executeQuery(input, fragment => fragment.query[R](read).to[Seq].transact(transactor))
}
}
class MultiFConnectionIO[I, R, F[_]: Async](
val toFragmentsSeq: I => Seq[Fragment],
val functionNameOverride: Option[String] = None
)(implicit dbSchema: DBSchema, read: Read[R])
extends BaseF[I, R, F](functionNameOverride) {
def apply(input: I): F[ConnectionIO[Seq[R]]] = {
executeQuery(input, fragment => Async[F].delay(fragment.query[R](read).to[Seq]))
}
}
class MultiFStreaming[I, R, F[_]: Async](
val toFragmentsSeq: I => Seq[Fragment],
val functionNameOverride: Option[String] = None
)(implicit dbSchema: DBSchema, read: Read[R])
extends BaseF[I, R, F](functionNameOverride) {
def apply(input: I, chunkSize: Int = 512)(implicit transactor: Transactor[F]): fs2.Stream[F, R] = {
fs2.Stream
.eval(MonadError[F, Throwable].catchNonFatal(composeFragments(toFragmentsSeq(input))))
.flatMap(fragment => fragment.query[R](read).streamWithChunkSize(chunkSize).transact(transactor))
}
}
class OptionF[I, R, F[_]: Async](
val toFragmentsSeq: I => Seq[Fragment],
val functionNameOverride: Option[String] = None
)(implicit dbSchema: DBSchema, read: Read[R])
extends BaseF[I, R, F](functionNameOverride) {
def apply(input: I)(implicit transactor: Transactor[F]): F[Option[R]] = {
executeQuery(input, fragment => fragment.query[R](read).to[Seq].map(_.headOption).transact(transactor))
}
}
class OptionFConnectionIO[I, R, F[_]: Async](
val toFragmentsSeq: I => Seq[Fragment],
val functionNameOverride: Option[String] = None
)(implicit dbSchema: DBSchema, read: Read[R])
extends BaseF[I, R, F](functionNameOverride) {
def apply(input: I): F[ConnectionIO[Option[R]]] = {
executeQuery(input, fragment => Async[F].delay(fragment.query[R](read).to[Seq].map(_.headOption)))
}
}
// Usage
import za.co.absa.fadb.naming.implementations.SnakeCaseNaming.Implicits._
implicit object Runs extends DBSchema
case class Actor(name: String)
class GetActor extends SingleF[Int, Actor, IO]((i: Int) => Seq(fr"$i"))
class GetActorWithStatus extends SingleFWithStatusSupport[Int, Actor, IO]((i: Int) => Seq(fr"$i")) with StandardStatusSupport
class GetActorConnectionIO extends SingleFConnectionIO[Int, Actor, IO]((i: Int) => Seq(fr"$i"))
class Repository(getActorF: GetActor, getActorFWithStatus: GetActorWithStatus, getActorC: GetActorConnectionIO)
(implicit transactor: Transactor[IO]) {
def getActor(input: Int): IO[Actor] = getActorF(input)
def getActorWithStatus(input: Int): IO[Either[StatusException, FunctionStatusWithData[Actor]]] = getActorFWithStatus(input)
def groupedExecution(input: Int): IO[Actor] = {
val connectionIO = for {
actorConnectionIO <- getActorC(input)
// here could be another call
} yield actorConnectionIO
connectionIO.flatMap(connectionIO => connectionIO.transact(transactor))
}
}
// If you need to return Future you can always call unsafeToFuture
class Repository(getActorF: GetActor)(implicit transactor: Transactor[IO]) {
def getActor(input: Int): Future[Actor] = getActorF(input).unsafeToFuture
}
Background