-
Notifications
You must be signed in to change notification settings - Fork 81
Implementing storehaus with Postgres backend #331
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: develop
Are you sure you want to change the base?
Changes from all commits
feffb43
aa80d41
c26eaf8
d772edc
316c640
e6d6dee
e6122e7
2983f92
ee9df7e
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,8 @@ | ||
| package com.twitter.storehaus.postgres | ||
|
|
||
| /** | ||
| * Exception for ibvalid conversions | ||
| * | ||
| * @author Alexey Ponkin | ||
| */ | ||
| case class IllegalConversionException(msg: String) extends Exception |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,69 @@ | ||
| package com.twitter.storehaus.postgres | ||
|
|
||
| import com.twitter.algebird.Semigroup | ||
| import com.twitter.bijection.Injection | ||
| import com.twitter.finagle.postgres.Client | ||
| import com.twitter.storehaus.ConvertedStore | ||
| import com.twitter.storehaus.algebra.MergeableStore | ||
| import com.twitter.util.Future | ||
|
|
||
| import scala.util.{Failure, Success} | ||
|
|
||
| /** | ||
| * @author Alexey Ponkin | ||
| */ | ||
| class MergeablePostgresStore[K, V](underlying: PostgresStore[K, V]) | ||
| (implicit override val semigroup: Semigroup[V]) | ||
| extends MergeableStore[K, V] { | ||
|
|
||
| /** | ||
| * Delegate other ooperations to underlying store | ||
| */ | ||
| override def get(key: K): Future[Option[V]] = underlying.get(key) | ||
| override def multiGet[K1 <: K](ks: Set[K1]): Map[K1, Future[Option[V]]] = underlying.multiGet(ks) | ||
| override def put(kv: (K, Option[V])): Future[Unit] = underlying.put(kv) | ||
| override def multiPut[K1 <: K](kvs: Map[K1, Option[V]]): Map[K1, Future[Unit]] = underlying.multiPut(kvs) | ||
|
|
||
| /** merge a set of keys. */ | ||
| override def multiMerge[K1 <: K](kvs: Map[K1, V]): Map[K1, Future[Option[V]]] = { | ||
| val result = doUpdate(kvs)(underlying.client) | ||
| kvs.keySet.iterator.map { key => | ||
| (key, result.map(_.get(key))) | ||
| }.toMap | ||
| } | ||
|
|
||
| /** | ||
| * Select keys from store, merging existing values with Semigroup | ||
| * non-existing keys are inserted as is. | ||
| * NOTE: Here we are using simple INSERT not UPSERT. During this | ||
| * transaction non existent keys can be inserted in another transaction, | ||
| * if so, method will fail, and client must repeat request with the same arguments. | ||
| */ | ||
| protected[postgres] def doUpdate[K1 <: K](kvs: Map[K1, V]): DBTxRequest[Map[K, V]] = { | ||
| _.inTransaction(implicit cli => | ||
| for{ existingKeys <- underlying.doGet(kvs.keySet, forUpdate = true) // lock selected rows | ||
| toUpdate = kvs.filterKeys( existingKeys.contains(_) ) // must be merged withSemigroup | ||
| toInsert = kvs.filterKeys( !existingKeys.contains(_) ) // must be inserted as is | ||
| _ <- underlying.doUpdate( | ||
| mergeWithSemigroup( | ||
| existingKeys, | ||
| toUpdate).toList) | ||
| _ <- underlying.doInsert(toInsert.toList) | ||
| } yield existingKeys | ||
| ) | ||
| } | ||
|
|
||
| /** | ||
| * Merge two maps(merging by the key). | ||
| * Method will merge all values from m1 into m2. | ||
| * All values with the same key will be merged with Semigroup[V2].plus method | ||
| */ | ||
| private def mergeWithSemigroup[K1 <: K](m1: Map[K, V], m2: Map[K1, V]): Map[K1, V] = { | ||
| m2.keySet.iterator.map{ k2 => | ||
| (k2, m1.get(k2) match { | ||
| case Some(v1) => semigroup.plus(v1, m2(k2)) | ||
| case None => m2(k2) | ||
| }) | ||
| }.toMap | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,178 @@ | ||
| package com.twitter.storehaus.postgres | ||
|
|
||
| import com.twitter.finagle.postgres.Client | ||
| import com.twitter.finagle.postgres.values.Value | ||
| import com.twitter.storehaus.Store | ||
| import com.twitter.util.{Future, Time} | ||
|
|
||
| import scala.util.{Failure, Success} | ||
|
|
||
| /** | ||
| * Key value storage with PostgreSQL (>9.5) as backend | ||
| * | ||
| * @author Alexey Ponkin | ||
| */ | ||
| object PostgresStore { | ||
|
|
||
| def apply[K, V] | ||
| (client: Client, table: String, kCol: String, vCol: String) | ||
| (implicit kInj: PostgresValueConverter[K], vInj: PostgresValueConverter[V]) = | ||
| new PostgresStore[K, V](client, table, kCol, vCol) | ||
| } | ||
|
|
||
| class PostgresStore[K, V] | ||
| (protected[postgres] val client: Client, table: String, kCol: String, vCol: String) | ||
| (implicit kInj: PostgresValueConverter[K], vInj: PostgresValueConverter[V]) | ||
| extends Store[K, V] { | ||
|
|
||
| private val toV: PostgresValue => V = vInj.invert(_) match { | ||
| case Success(v) => v | ||
| case Failure(e) => throw IllegalConversionException(e.getMessage) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why do we need to throw here? Why not just put this in the future? private def toV(p: PostgresValue): Future[V] = vInj.invert(p) match {
case Success(v) => Future.value(v)
case Failure(e) => Future.exception(e)
}then use |
||
| } | ||
|
|
||
| private val toK: PostgresValue => K = kInj.invert(_) match { | ||
| case Success(k) => k | ||
| case Failure(e) => throw IllegalConversionException(e.getMessage) | ||
| } | ||
|
|
||
| implicit def toPostgresValue(e: Value[Any]): PostgresValue = Column(e) | ||
|
|
||
| private val EMPTY_STRING = "" | ||
|
|
||
| private val SELECT_SQL_PREFIX = | ||
| s"SELECT $kCol, $vCol FROM $table WHERE $kCol" | ||
|
|
||
| private val DELETE_SQL_PREFIX = | ||
| s"DELETE FROM $table WHERE $kCol" | ||
|
|
||
| private val INSERT_SQL_PREFIX = | ||
| s"INSERT INTO $table($kCol, $vCol) VALUES" | ||
|
|
||
| override def get(key: K): Future[Option[V]] = { | ||
| doGet(key)(client) | ||
| } | ||
|
|
||
| override def multiGet[K1 <: K](ks: Set[K1]): Map[K1, Future[Option[V]]] = { | ||
| val result = doGet(ks)(client) | ||
| ks.iterator.map { key => | ||
| (key, result.map(_.get(key))) | ||
| }.toMap | ||
| } | ||
|
|
||
| override def put(kv: (K, Option[V])): Future[Unit] = { | ||
| implicit val cli = client | ||
| kv match { | ||
| case (key, Some(value)) => doUpsert(List((key, value))).unit | ||
| case (key, None) => doDelete(List(key)).unit | ||
| } | ||
| } | ||
|
|
||
| override def multiPut[K1 <: K](kvs: Map[K1, Option[V]]): Map[K1, Future[Unit]] = { | ||
| if (kvs.isEmpty) { | ||
| Map.empty | ||
| } else { | ||
| val result = doMultiput(kvs)(client) | ||
| kvs.mapValues(_ => result) | ||
| } | ||
| } | ||
|
|
||
| private def selectSQL[K1 <: K](ks: List[K1], forUpdate: Boolean): String = { | ||
| val sql = ks match { | ||
| case Nil => EMPTY_STRING | ||
| case x :: Nil => SELECT_SQL_PREFIX + s" = ${toEscString(x)}" | ||
| case res => SELECT_SQL_PREFIX + " IN " + res.map(toEscString).mkString("(", ", ", ")") | ||
| } | ||
| if(forUpdate) { | ||
| sql + " FOR UPDATE" | ||
| } else { | ||
| sql | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Convert list of tuples (key, value) to String | ||
| * to insert in SQL query | ||
| */ | ||
| private def valuesStmt(values: List[(K, V)]): String = | ||
| values.map { | ||
| case (key, value) => s"(${toEscString(key)}, ${toEscString(value)})" | ||
| }.mkString(", ") | ||
|
|
||
| private def upsertSQL(values: List[(K, V)]): String = { | ||
| // Insert keys and values in the table | ||
| // overwrite if key is already exists in table | ||
| // 'upsert' is available since PostgreSQL 9.5 | ||
| // https://wiki.postgresql.org/wiki/UPSERT | ||
| if (values.nonEmpty) { | ||
| s"""$INSERT_SQL_PREFIX ${valuesStmt(values)} | ||
| |ON CONFLICT ($kCol) DO UPDATE SET $vCol = EXCLUDED.$vCol; | ||
| |""".stripMargin | ||
| } else { | ||
| EMPTY_STRING | ||
| } | ||
| } | ||
|
|
||
| private def insertSQL(values: List[(K, V)]): String = { | ||
| // simple insert, will fail if some keys exists | ||
| if (values.nonEmpty) { | ||
| s"$INSERT_SQL_PREFIX ${valuesStmt(values)}" | ||
| } else { | ||
| EMPTY_STRING | ||
| } | ||
| } | ||
|
|
||
| private def deleteSQL(keys: List[K]): String = { | ||
| keys match { | ||
| case Nil => EMPTY_STRING | ||
| case x :: Nil => DELETE_SQL_PREFIX + s" = ${toEscString(x)};" | ||
| case _ => DELETE_SQL_PREFIX + s" IN ${keys.map(toEscString).mkString("(", ", ", ")")};" | ||
| } | ||
| } | ||
|
|
||
| private def updateSQL(values: List[(K, V)]): String = values.map{ | ||
| case (key, value) => s"UPDATE $table SET $vCol = ${toEscString(value)} WHERE $kCol = ${toEscString(key)};" | ||
| }.mkString("\n") | ||
|
|
||
| protected[postgres] def doGet[K1 <: K](k: K1) | ||
| (implicit cli: Client): Future[Option[V]] = | ||
| doGet(Set(k)).map(_.values.headOption) | ||
|
|
||
| protected[postgres] def doGet[K1 <: K](ks: Set[K1], | ||
| forUpdate: Boolean = false) | ||
| (implicit cli: Client): Future[Map[K, V]] = | ||
| if (ks.isEmpty) { | ||
| Future(Map.empty) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
| } else { | ||
| val query = selectSQL(ks.toList, forUpdate) | ||
| cli.prepareAndQuery(query){ row => | ||
| (toK(row.get(0)), toV(row.get(1))) | ||
| }.map( _.toMap ) | ||
| } | ||
|
|
||
| protected[postgres] def doInsert[K1 <: K](values: List[(K1, V)])(implicit cli: Client): Future[Unit] = | ||
| cli.query(insertSQL(values)).unit | ||
|
|
||
| protected[postgres] def doUpsert[K1 <: K](values: List[(K1, V)])(implicit cli: Client): Future[Unit] = | ||
| cli.query(upsertSQL(values)).unit | ||
|
|
||
| protected[postgres] def doUpdate[K1 <: K](values: List[(K1, V)])(implicit cli: Client): Future[Unit] = | ||
| cli.query(updateSQL(values)).unit | ||
|
|
||
| protected[postgres] def doDelete[K1 <: K](values: List[K1])(implicit cli: Client): Future[Unit] = | ||
| cli.query(deleteSQL(values)).unit | ||
|
|
||
| protected[postgres] def doMultiput[K1 <: K](kvs: Map[K1, Option[V]]): DBTxRequest[Unit] = { | ||
| val (keysToUpsert, toDelete) = kvs.keySet.partition(k => kvs.get(k).exists(_.isDefined)) | ||
| val toUpsert = kvs.filterKeys(keysToUpsert.contains).mapValues(_.get).toList | ||
| _.inTransaction(implicit cli => | ||
| for{ | ||
| _ <- doUpsert(toUpsert) | ||
| _ <- doDelete(toDelete.toList) | ||
| } yield () | ||
| ) | ||
| } | ||
|
|
||
| override def close(t: Time): Future[Unit] = client.close() | ||
|
|
||
| } | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,14 @@ | ||
| package com.twitter.storehaus.postgres | ||
|
|
||
| import com.twitter.finagle.postgres.values.Value | ||
|
|
||
|
|
||
| /** | ||
| * General trait to represent | ||
| * Postgres column value. | ||
| * NOTE: Actual implementation depends on driver for PostgreSQL | ||
| * | ||
| * @author Alexey Ponkin | ||
| */ | ||
| sealed trait PostgresValue | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why the extra wrapper on
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. to change Postgres driver(finagle-roc) in future(if we we want to).
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I see. That's fine if we are going to automatically handle the conversions I think. |
||
| case class Column(v: Value[Any]) extends PostgresValue | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,79 @@ | ||
| package com.twitter.storehaus | ||
|
|
||
| import com.twitter.bijection.Injection | ||
| import com.twitter.finagle.postgres.Client | ||
| import com.twitter.util.Future | ||
|
|
||
|
|
||
| import scala.util.Try | ||
|
|
||
| /** | ||
| * @author Alexey Ponkin | ||
| */ | ||
| package object postgres { | ||
|
|
||
| type PostgresValueConverter[A] = Injection[A, PostgresValue] | ||
| type DBTxRequest[A] = Client => Future[A] // request in transaction | ||
|
|
||
| trait OneWayInjection[T] extends Injection[T, PostgresValue] { | ||
| override def apply(t: T): PostgresValue = | ||
| throw IllegalConversionException( | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can we avoid the throwing? I know we don't use it, but I really don't want to add code that throws unless we have no other way. Here I can see two ways:
|
||
| s"Can`t convert ${t.getClass.getCanonicalName} to PostgresValue") | ||
| } | ||
|
|
||
| implicit val boolInjection= new OneWayInjection[Boolean] { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. okay, I'm really sorry. I overlooked that your I'd actually prefer the way you had it to throwing exceptions. I mistakenly thought we could combine What about a direct trait like: trait PostgresCodec[A] {
def encode(a: A): String
def decode(p: PostgresValue): Try[A]
}I don't see a need to decouple these since they are so special purpose. I thought Injection would do, but the types are not the same on encode and decode. |
||
| override def invert(b: PostgresValue): Try[Boolean] = b match { | ||
| case Column(element) => Try(element.value.asInstanceOf[Boolean]) | ||
| } | ||
| } | ||
|
|
||
| implicit val doubleInjection = new OneWayInjection[Double] { | ||
| override def invert(b: PostgresValue): Try[Double] = b match { | ||
| case Column(element) => Try(element.value.asInstanceOf[Double]) | ||
| } | ||
| } | ||
|
|
||
| implicit val floatInjection = new OneWayInjection[Float] { | ||
| override def invert(b: PostgresValue): Try[Float] = b match { | ||
| case Column(element) => Try(element.value.asInstanceOf[Float]) | ||
| } | ||
| } | ||
|
|
||
| implicit val shortInjection = new OneWayInjection[Short] { | ||
| override def invert(b: PostgresValue): Try[Short] = b match { | ||
| case Column(element) => Try(element.value.asInstanceOf[Short]) | ||
| } | ||
| } | ||
|
|
||
| implicit val intInjection = new OneWayInjection[Int] { | ||
| override def invert(b: PostgresValue): Try[Int] = b match { | ||
| case Column(element) => Try(element.value.asInstanceOf[Int]) | ||
| } | ||
| } | ||
|
|
||
| implicit val longInjection = new OneWayInjection[Long] { | ||
| override def invert(b: PostgresValue): Try[Long] = b match { | ||
| case Column(element) => Try(element.value.asInstanceOf[Long]) | ||
| } | ||
| } | ||
|
|
||
| implicit val stringInjection = new OneWayInjection[String] { | ||
| override def invert(b: PostgresValue): Try[String] = b match { | ||
| case Column(element) => Try(element.value.asInstanceOf[String]) | ||
| } | ||
| } | ||
|
|
||
| implicit val byteaInjection = new OneWayInjection[Array[Byte]] { | ||
|
|
||
| override def invert(b: PostgresValue): Try[Array[Byte]] = b match { | ||
| case Column(element) => Try(element.value.asInstanceOf[Array[Byte]]) | ||
| } | ||
| } | ||
|
|
||
| def toEscString(a: Any): String = a match { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't like this approach since we have no guarantee that |
||
| case string: String => s"'$string'" | ||
| case array: Array[Byte] => s"E'\\\\x${array.map("%02X" format _).mkString}'" | ||
| case rest => rest.toString | ||
| } | ||
|
|
||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
foldLeft is generally faster because you may get some sharing:
may need to bump the algebird dependency to get
maybePlus, but that is fine.