Implementing storehaus with Postgres backend#331
Implementing storehaus with Postgres backend#331ponkin wants to merge 9 commits intotwitter:developfrom
Conversation
johnynek
left a comment
There was a problem hiding this comment.
Thanks for the contribution. I made some suggestions. Let me know if you have any questions.
| * | ||
| * Created by ponkin on 10/20/16. | ||
| */ | ||
| trait Read[A] { |
There was a problem hiding this comment.
can we instead follow the style of the rest of the project and use Injection[A, PostgresValue] here instead?
There was a problem hiding this comment.
I was using Injections at first but it went me to many ugly implicit arguments like
PostgresStoreK, V(implicit inj1: Injection[K, PostgresValue], inj2: Injection[V, PostgresValue], inj3: Injection[K, Element], inj1: Injection[V, Element]) - and also expose details of implementation( Element is object from finagle-roc postgres driver). May be you can point me some example, how to do it right?
There was a problem hiding this comment.
you could do in your package:
type PostgresValueConverter[A] = Injection[A, PostgresValue]then in the object PostgresValue { ... define the implicits and it should be pretty clean and pick them up automatically.
| case (key, Some(value)) => upsertSQL(List((key, value))) | ||
| case (key, None) => deleteSQL(List(key)) | ||
| } | ||
| println(sql) |
There was a problem hiding this comment.
I don't think we should have the println around.
| } | ||
| ks.foldLeft(Map.empty[K1, Future[Option[V]]]) { (acc, key) => | ||
| acc + (key -> reqMap.map(_.get(key))) | ||
| } |
There was a problem hiding this comment.
ks.iterator.map { key =>
(key, reqMap.map(_.get(key)))
}.toMapwould be more direct than building the map yourself (and maybe faster since it can use a MapBuilder).
| s"DELETE FROM $table WHERE $kCol" | ||
|
|
||
| override def get(key: K): Future[Option[V]] = { | ||
| val result = run(selectSQL(List(key))) |
There was a problem hiding this comment.
can we pass a limit here? I'm also worried that we may have more than one item. I would rather limit to 2, then do:
result.flatMap {
case Nil => Future.None
case h :: Nil => Future.value(Read[V].read(h.get(Symbol(vCol))))
case other => Future.exception(new Exception(s"for key: $key expected 0 or 1 items, found: $other")| * | ||
| * Created by ponkin on 10/18/16. | ||
| */ | ||
| sealed trait PostgresValue |
There was a problem hiding this comment.
why the extra wrapper on Element?
There was a problem hiding this comment.
to change Postgres driver(finagle-roc) in future(if we we want to).
There was a problem hiding this comment.
I see. That's fine if we are going to automatically handle the conversions I think.
| Map.empty | ||
| } else { | ||
| val sql = doMultiPut(kvs) | ||
| println(sql) |
| multiPutSQL(toUpsert, toDelete.toList) | ||
| } | ||
|
|
||
| override def close(t: Time): Future[Unit] = { |
There was a problem hiding this comment.
don't need { } for single expression methods.
| new PostgresStore[K, V](client, table, kCol, vCol) | ||
| } | ||
|
|
||
| class PostgresStore[K: Show : Read, V: Show : Read] |
There was a problem hiding this comment.
would it be too much work to add a PostgresMergeableStore that does merge in a transaction? That would be nice to have for several use cases.
There was a problem hiding this comment.
Well, that part turns out to be kind of tricky). How can we implement merge since we use async driver( separate calls client.query("BEGIN") and client.query("COMMIT;") can lead to unknown result, since some requests can be made simultaneously and can be added in transaction). I do not know ho to tie Futures to particular connection. Any ideas? We can use optimistic locking but we need additional column to keep lock, like timestamp or version.
build.sbt
Outdated
| "com.github.finagle" %% "roc-core" % "0.0.5", | ||
| "com.github.finagle" %% "roc-types" % "0.0.5" | ||
| "com.github.finagle" %% "roc-types" % "0.0.5", | ||
| "com.chuusai" %% "shapeless" % "2.3.2" |
There was a problem hiding this comment.
do we need this? I don't see where.
| /** | ||
| * Created by ponkin on 10/26/16. | ||
| */ | ||
| class MergeablePostgresStore[V] { |
| implicit val boolRead = new Read[Boolean] { | ||
| override def read(v: PostgresValue): Boolean = v match { | ||
| case RocPostgresValue(e) => e.as[Boolean] | ||
| implicit val boolInjection= new OneWayInjection[Boolean] { |
There was a problem hiding this comment.
okay, I'm really sorry. I overlooked that your Read and Show were not inverses of each other. One is going to String the other is coming from a PostgresValue. I wish I had noticed that.
I'd actually prefer the way you had it to throwing exceptions. I mistakenly thought we could combine Read and Show into Injection but not the way you have the code.
What about a direct trait like:
trait PostgresCodec[A] {
def encode(a: A): String
def decode(p: PostgresValue): Try[A]
}I don't see a need to decouple these since they are so special purpose. I thought Injection would do, but the types are not the same on encode and decode.
|
@johnynek Sorry for long delay. I added MergeablePostgresStore and tests for it. Also I rewrote all with Injections. I was trying to keep overall style. please review) |
| * All values with the same key will be merged with Semigroup[V2].plus method | ||
| */ | ||
| private def mergeWithSemigroup[K1 <: K](m1: Map[K, V], m2: Map[K1, V]): Map[K1, V] = { | ||
| m2.keySet.iterator.map{ k2 => |
There was a problem hiding this comment.
foldLeft is generally faster because you may get some sharing:
val smaller = if (m1.size < m2.size) m1 else m2
val bigger = if (m1.size < m2.size) m2 else m1
smaller.foldLeft(bigger) { case (m, (k, v)) =>
val newV = Semigroup.maybePlus(v, m.get(k))
m + ((k, newV))
}may need to bump the algebird dependency to get maybePlus, but that is fine.
| forUpdate: Boolean = false) | ||
| (implicit cli: Client): Future[Map[K, V]] = | ||
| if (ks.isEmpty) { | ||
| Future(Map.empty) |
There was a problem hiding this comment.
Future.value(Map.empty) is better since it avoids the call-by-name parameter, and since that is a constant, you might save it as a private[this] val and avoid the reallocation.
|
|
||
| trait OneWayInjection[T] extends Injection[T, PostgresValue] { | ||
| override def apply(t: T): PostgresValue = | ||
| throw IllegalConversionException( |
There was a problem hiding this comment.
can we avoid the throwing? I know we don't use it, but I really don't want to add code that throws unless we have no other way.
Here I can see two ways:
- just add the code to wrap the primitives below in
PostgresValue, which does not seem hard. - make a type-class that only goes in one direction.
|
|
||
| private val toV: PostgresValue => V = vInj.invert(_) match { | ||
| case Success(v) => v | ||
| case Failure(e) => throw IllegalConversionException(e.getMessage) |
There was a problem hiding this comment.
why do we need to throw here? Why not just put this in the future?
private def toV(p: PostgresValue): Future[V] = vInj.invert(p) match {
case Success(v) => Future.value(v)
case Failure(e) => Future.exception(e)
}then use toV(v).flatMap { ... rather than throwing?
| } | ||
| } | ||
|
|
||
| def toEscString(a: Any): String = a match { |
There was a problem hiding this comment.
I don't like this approach since we have no guarantee that toString is going to work. Why can't use a full injection to PostgresValue then use some safe mechanism to go from Value[Any] to String that can be interpolated in a SQL statement. I assume the Value wrapper has some support for that, no?
|
Alexey Ponkin seems not to be a GitHub user. You need a GitHub account to be able to sign the CLA. If you have already a GitHub account, please add the email address used for this commit to your account. You have signed the CLA already but the status is still pending? Let us recheck it. |
Hi everybody. Here is storehouse implementation for postgres.
Need your review and comments.