Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ services:
- redis-server
- memcache
- mongodb
- postgresql
script:
- umask 0022 && sbt ++$TRAVIS_SCALA_VERSION test
- umask 0022 && sbt ++$TRAVIS_SCALA_VERSION mimaReportBinaryIssues
8 changes: 8 additions & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ lazy val storehaus = Project(
storehausAlgebra,
storehausMemcache,
storehausMySQL,
storehausPostgres,
storehausRedis,
storehausHBase,
storehausDynamoDB,
Expand Down Expand Up @@ -234,6 +235,13 @@ lazy val storehausMySQL = module("mysql").settings(
libraryDependencies += "com.twitter" %% "finagle-mysql" % finagleVersion
).dependsOn(storehausAlgebra % "test->test;compile->compile")

lazy val storehausPostgres = module("postgresql").settings(
libraryDependencies ++= Seq(
"io.github.finagle" %% "finagle-postgres" % "0.2.0"
),
testOptions in Test += Tests.Argument(TestFrameworks.ScalaCheck, "-verbosity", "3")
).dependsOn(storehausAlgebra % "test->test;compile->compile")

lazy val storehausRedis = module("redis").settings(
libraryDependencies ++= Seq (
"com.twitter" %% "bijection-core" % bijectionVersion,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package com.twitter.storehaus.postgres

/**
* Exception for ibvalid conversions
*
* @author Alexey Ponkin
*/
case class IllegalConversionException(msg: String) extends Exception
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package com.twitter.storehaus.postgres

import com.twitter.algebird.Semigroup
import com.twitter.bijection.Injection
import com.twitter.finagle.postgres.Client
import com.twitter.storehaus.ConvertedStore
import com.twitter.storehaus.algebra.MergeableStore
import com.twitter.util.Future

import scala.util.{Failure, Success}

/**
* @author Alexey Ponkin
*/
class MergeablePostgresStore[K, V](underlying: PostgresStore[K, V])
(implicit override val semigroup: Semigroup[V])
extends MergeableStore[K, V] {

/**
* Delegate other ooperations to underlying store
*/
override def get(key: K): Future[Option[V]] = underlying.get(key)
override def multiGet[K1 <: K](ks: Set[K1]): Map[K1, Future[Option[V]]] = underlying.multiGet(ks)
override def put(kv: (K, Option[V])): Future[Unit] = underlying.put(kv)
override def multiPut[K1 <: K](kvs: Map[K1, Option[V]]): Map[K1, Future[Unit]] = underlying.multiPut(kvs)

/** merge a set of keys. */
override def multiMerge[K1 <: K](kvs: Map[K1, V]): Map[K1, Future[Option[V]]] = {
val result = doUpdate(kvs)(underlying.client)
kvs.keySet.iterator.map { key =>
(key, result.map(_.get(key)))
}.toMap
}

/**
* Select keys from store, merging existing values with Semigroup
* non-existing keys are inserted as is.
* NOTE: Here we are using simple INSERT not UPSERT. During this
* transaction non existent keys can be inserted in another transaction,
* if so, method will fail, and client must repeat request with the same arguments.
*/
protected[postgres] def doUpdate[K1 <: K](kvs: Map[K1, V]): DBTxRequest[Map[K, V]] = {
_.inTransaction(implicit cli =>
for{ existingKeys <- underlying.doGet(kvs.keySet, forUpdate = true) // lock selected rows
toUpdate = kvs.filterKeys( existingKeys.contains(_) ) // must be merged withSemigroup
toInsert = kvs.filterKeys( !existingKeys.contains(_) ) // must be inserted as is
_ <- underlying.doUpdate(
mergeWithSemigroup(
existingKeys,
toUpdate).toList)
_ <- underlying.doInsert(toInsert.toList)
} yield existingKeys
)
}

/**
* Merge two maps(merging by the key).
* Method will merge all values from m1 into m2.
* All values with the same key will be merged with Semigroup[V2].plus method
*/
private def mergeWithSemigroup[K1 <: K](m1: Map[K, V], m2: Map[K1, V]): Map[K1, V] = {
m2.keySet.iterator.map{ k2 =>
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

foldLeft is generally faster because you may get some sharing:

val smaller = if (m1.size < m2.size) m1 else m2
val bigger = if (m1.size < m2.size) m2 else m1
smaller.foldLeft(bigger) { case (m, (k, v)) =>
  val newV = Semigroup.maybePlus(v, m.get(k))
  m + ((k, newV))
}

may need to bump the algebird dependency to get maybePlus, but that is fine.

(k2, m1.get(k2) match {
case Some(v1) => semigroup.plus(v1, m2(k2))
case None => m2(k2)
})
}.toMap
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
package com.twitter.storehaus.postgres

import com.twitter.finagle.postgres.Client
import com.twitter.finagle.postgres.values.Value
import com.twitter.storehaus.Store
import com.twitter.util.{Future, Time}

import scala.util.{Failure, Success}

/**
* Key value storage with PostgreSQL (>9.5) as backend
*
* @author Alexey Ponkin
*/
object PostgresStore {

def apply[K, V]
(client: Client, table: String, kCol: String, vCol: String)
(implicit kInj: PostgresValueConverter[K], vInj: PostgresValueConverter[V]) =
new PostgresStore[K, V](client, table, kCol, vCol)
}

class PostgresStore[K, V]
(protected[postgres] val client: Client, table: String, kCol: String, vCol: String)
(implicit kInj: PostgresValueConverter[K], vInj: PostgresValueConverter[V])
extends Store[K, V] {

private val toV: PostgresValue => V = vInj.invert(_) match {
case Success(v) => v
case Failure(e) => throw IllegalConversionException(e.getMessage)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need to throw here? Why not just put this in the future?

private def toV(p: PostgresValue): Future[V] = vInj.invert(p) match {
  case Success(v) => Future.value(v)
  case Failure(e) => Future.exception(e)
}

then use toV(v).flatMap { ... rather than throwing?

}

private val toK: PostgresValue => K = kInj.invert(_) match {
case Success(k) => k
case Failure(e) => throw IllegalConversionException(e.getMessage)
}

implicit def toPostgresValue(e: Value[Any]): PostgresValue = Column(e)

private val EMPTY_STRING = ""

private val SELECT_SQL_PREFIX =
s"SELECT $kCol, $vCol FROM $table WHERE $kCol"

private val DELETE_SQL_PREFIX =
s"DELETE FROM $table WHERE $kCol"

private val INSERT_SQL_PREFIX =
s"INSERT INTO $table($kCol, $vCol) VALUES"

override def get(key: K): Future[Option[V]] = {
doGet(key)(client)
}

override def multiGet[K1 <: K](ks: Set[K1]): Map[K1, Future[Option[V]]] = {
val result = doGet(ks)(client)
ks.iterator.map { key =>
(key, result.map(_.get(key)))
}.toMap
}

override def put(kv: (K, Option[V])): Future[Unit] = {
implicit val cli = client
kv match {
case (key, Some(value)) => doUpsert(List((key, value))).unit
case (key, None) => doDelete(List(key)).unit
}
}

override def multiPut[K1 <: K](kvs: Map[K1, Option[V]]): Map[K1, Future[Unit]] = {
if (kvs.isEmpty) {
Map.empty
} else {
val result = doMultiput(kvs)(client)
kvs.mapValues(_ => result)
}
}

private def selectSQL[K1 <: K](ks: List[K1], forUpdate: Boolean): String = {
val sql = ks match {
case Nil => EMPTY_STRING
case x :: Nil => SELECT_SQL_PREFIX + s" = ${toEscString(x)}"
case res => SELECT_SQL_PREFIX + " IN " + res.map(toEscString).mkString("(", ", ", ")")
}
if(forUpdate) {
sql + " FOR UPDATE"
} else {
sql
}
}

/**
* Convert list of tuples (key, value) to String
* to insert in SQL query
*/
private def valuesStmt(values: List[(K, V)]): String =
values.map {
case (key, value) => s"(${toEscString(key)}, ${toEscString(value)})"
}.mkString(", ")

private def upsertSQL(values: List[(K, V)]): String = {
// Insert keys and values in the table
// overwrite if key is already exists in table
// 'upsert' is available since PostgreSQL 9.5
// https://wiki.postgresql.org/wiki/UPSERT
if (values.nonEmpty) {
s"""$INSERT_SQL_PREFIX ${valuesStmt(values)}
|ON CONFLICT ($kCol) DO UPDATE SET $vCol = EXCLUDED.$vCol;
|""".stripMargin
} else {
EMPTY_STRING
}
}

private def insertSQL(values: List[(K, V)]): String = {
// simple insert, will fail if some keys exists
if (values.nonEmpty) {
s"$INSERT_SQL_PREFIX ${valuesStmt(values)}"
} else {
EMPTY_STRING
}
}

private def deleteSQL(keys: List[K]): String = {
keys match {
case Nil => EMPTY_STRING
case x :: Nil => DELETE_SQL_PREFIX + s" = ${toEscString(x)};"
case _ => DELETE_SQL_PREFIX + s" IN ${keys.map(toEscString).mkString("(", ", ", ")")};"
}
}

private def updateSQL(values: List[(K, V)]): String = values.map{
case (key, value) => s"UPDATE $table SET $vCol = ${toEscString(value)} WHERE $kCol = ${toEscString(key)};"
}.mkString("\n")

protected[postgres] def doGet[K1 <: K](k: K1)
(implicit cli: Client): Future[Option[V]] =
doGet(Set(k)).map(_.values.headOption)

protected[postgres] def doGet[K1 <: K](ks: Set[K1],
forUpdate: Boolean = false)
(implicit cli: Client): Future[Map[K, V]] =
if (ks.isEmpty) {
Future(Map.empty)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Future.value(Map.empty) is better since it avoids the call-by-name parameter, and since that is a constant, you might save it as a private[this] val and avoid the reallocation.

} else {
val query = selectSQL(ks.toList, forUpdate)
cli.prepareAndQuery(query){ row =>
(toK(row.get(0)), toV(row.get(1)))
}.map( _.toMap )
}

protected[postgres] def doInsert[K1 <: K](values: List[(K1, V)])(implicit cli: Client): Future[Unit] =
cli.query(insertSQL(values)).unit

protected[postgres] def doUpsert[K1 <: K](values: List[(K1, V)])(implicit cli: Client): Future[Unit] =
cli.query(upsertSQL(values)).unit

protected[postgres] def doUpdate[K1 <: K](values: List[(K1, V)])(implicit cli: Client): Future[Unit] =
cli.query(updateSQL(values)).unit

protected[postgres] def doDelete[K1 <: K](values: List[K1])(implicit cli: Client): Future[Unit] =
cli.query(deleteSQL(values)).unit

protected[postgres] def doMultiput[K1 <: K](kvs: Map[K1, Option[V]]): DBTxRequest[Unit] = {
val (keysToUpsert, toDelete) = kvs.keySet.partition(k => kvs.get(k).exists(_.isDefined))
val toUpsert = kvs.filterKeys(keysToUpsert.contains).mapValues(_.get).toList
_.inTransaction(implicit cli =>
for{
_ <- doUpsert(toUpsert)
_ <- doDelete(toDelete.toList)
} yield ()
)
}

override def close(t: Time): Future[Unit] = client.close()

}

Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package com.twitter.storehaus.postgres

import com.twitter.finagle.postgres.values.Value


/**
* General trait to represent
* Postgres column value.
* NOTE: Actual implementation depends on driver for PostgreSQL
*
* @author Alexey Ponkin
*/
sealed trait PostgresValue
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why the extra wrapper on Element?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to change Postgres driver(finagle-roc) in future(if we we want to).

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. That's fine if we are going to automatically handle the conversions I think.

case class Column(v: Value[Any]) extends PostgresValue
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package com.twitter.storehaus

import com.twitter.bijection.Injection
import com.twitter.finagle.postgres.Client
import com.twitter.util.Future


import scala.util.Try

/**
* @author Alexey Ponkin
*/
package object postgres {

type PostgresValueConverter[A] = Injection[A, PostgresValue]
type DBTxRequest[A] = Client => Future[A] // request in transaction

trait OneWayInjection[T] extends Injection[T, PostgresValue] {
override def apply(t: T): PostgresValue =
throw IllegalConversionException(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we avoid the throwing? I know we don't use it, but I really don't want to add code that throws unless we have no other way.

Here I can see two ways:

  1. just add the code to wrap the primitives below in PostgresValue, which does not seem hard.
  2. make a type-class that only goes in one direction.

s"Can`t convert ${t.getClass.getCanonicalName} to PostgresValue")
}

implicit val boolInjection= new OneWayInjection[Boolean] {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

okay, I'm really sorry. I overlooked that your Read and Show were not inverses of each other. One is going to String the other is coming from a PostgresValue. I wish I had noticed that.

I'd actually prefer the way you had it to throwing exceptions. I mistakenly thought we could combine Read and Show into Injection but not the way you have the code.

What about a direct trait like:

trait PostgresCodec[A] {
  def encode(a: A): String
  def decode(p: PostgresValue): Try[A]
}

I don't see a need to decouple these since they are so special purpose. I thought Injection would do, but the types are not the same on encode and decode.

override def invert(b: PostgresValue): Try[Boolean] = b match {
case Column(element) => Try(element.value.asInstanceOf[Boolean])
}
}

implicit val doubleInjection = new OneWayInjection[Double] {
override def invert(b: PostgresValue): Try[Double] = b match {
case Column(element) => Try(element.value.asInstanceOf[Double])
}
}

implicit val floatInjection = new OneWayInjection[Float] {
override def invert(b: PostgresValue): Try[Float] = b match {
case Column(element) => Try(element.value.asInstanceOf[Float])
}
}

implicit val shortInjection = new OneWayInjection[Short] {
override def invert(b: PostgresValue): Try[Short] = b match {
case Column(element) => Try(element.value.asInstanceOf[Short])
}
}

implicit val intInjection = new OneWayInjection[Int] {
override def invert(b: PostgresValue): Try[Int] = b match {
case Column(element) => Try(element.value.asInstanceOf[Int])
}
}

implicit val longInjection = new OneWayInjection[Long] {
override def invert(b: PostgresValue): Try[Long] = b match {
case Column(element) => Try(element.value.asInstanceOf[Long])
}
}

implicit val stringInjection = new OneWayInjection[String] {
override def invert(b: PostgresValue): Try[String] = b match {
case Column(element) => Try(element.value.asInstanceOf[String])
}
}

implicit val byteaInjection = new OneWayInjection[Array[Byte]] {

override def invert(b: PostgresValue): Try[Array[Byte]] = b match {
case Column(element) => Try(element.value.asInstanceOf[Array[Byte]])
}
}

def toEscString(a: Any): String = a match {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't like this approach since we have no guarantee that toString is going to work. Why can't use a full injection to PostgresValue then use some safe mechanism to go from Value[Any] to String that can be interpolated in a SQL statement. I assume the Value wrapper has some support for that, no?

case string: String => s"'$string'"
case array: Array[Byte] => s"E'\\\\x${array.map("%02X" format _).mkString}'"
case rest => rest.toString
}

}
Loading