Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 15 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,15 @@ Graphite backends which will recieve messages in the normal Graphite Pickle
format. This should be a line-delimited list of `host:port` pairs. For example:

relay.backends: \
localhost:1234 \
localhost:1235 \
localhost:1236 \
localhost:1237
127.0.0.1:1234:a \
127.0.0.1:1235:b \
127.0.0.1:1236:c \
127.0.0.1:1237:d

(You can omit instance part(:a,:b...), but for ConsistentHash routing its mandatory
Also please use same hostname as for CARBONLINK_HOSTS in graphite-web, i.e. not mixed localhost
and 127.0.0.1, fqdn with hostname etc. Instance names and hostname are using in hashing algorithm
and any inconsistency will break caching, which is mandatory if you have significant load)

### `relay.hostbuffer`
Number of updates to buffer for each host in the event that it goes down
Expand All @@ -69,7 +74,8 @@ values are:
- `graphite.relay.backend.strategy.RoundRobin`
You may also set to the FQCN of any other `BackendStrategy` in the `CLASSPATH`.
If using the `ConsistentHash` strategy, you will also have to set
`hash.replicas` in the config. A reasonable default value is `10`.
`hash.replicas` in the config. A reasonable default value is `100`.
(NB: Please do not change it, unless Graphite-web use 100 as hardcoded default)

### `relay.overflowhandler`
Handler which will recieve updates that no backend is available to handle. This
Expand All @@ -95,10 +101,10 @@ A complete config file might be as follows:
overflow.directory: /mnt/overflow/

relay.backends: \
localhost:1234 \
localhost:1235 \
localhost:1236 \
localhost:1237
localhost:1234:a \
localhost:1235:b \
localhost:1236:c \
localhost:1237:d


Pickle Format
Expand Down
6 changes: 3 additions & 3 deletions project/build/GraphiteRelayProject.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,15 @@ class GraphiteRelayProject(info: ProjectInfo) extends DefaultProject(info)
"http://repository.jboss.org/nexus/content/groups/public/"

def nexusSnapshots = "Nexus Snapshots" at
"http://nexus.scala-tools.org/content/repositories/snapshots/"
"https://oss.sonatype.org/content/repositories/snapshots/"
}

object Dependencies {
def guice = "com.google.inject" % "guice" % "3.0"
def jodaTime = "joda-time" % "joda-time" % "1.6.2"
def log4j = "log4j" % "log4j" % "1.2.16"
def netty = "org.jboss.netty" % "netty" % "3.2.4.Final"
def netty = "org.jboss.netty" % "netty" % "3.7.0.Final"
def scalatest = "org.scalatest" % "scalatest" % "1.3"
def scopt = "com.github.scopt" %% "scopt" % "1.0.0-SNAPSHOT"
def scopt = "com.github.scopt" %% "scopt" % "1.1.1"
}
}
2 changes: 1 addition & 1 deletion project/plugins/project/build.properties
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
#Project properties
#Mon Apr 18 11:27:21 EDT 2011
#Thu Nov 07 17:48:43 CET 2013
plugin.uptodate=true
2 changes: 1 addition & 1 deletion src/main/scala/Main.scala
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ object Main {
.filter(_ != "")
val backends = backendStrs.map { backendStr ⇒
val parts = backendStr.split(":")
Backend(parts(0), parts(1).toInt)
Backend(parts(0), parts(1).toInt, parts(2))
}
Backends(backends:_*)
}
Expand Down
3 changes: 2 additions & 1 deletion src/main/scala/backend/Backend.scala
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package graphite.relay.backend

case class Backend(host: String, port: Int) {
case class Backend(host: String, port: Int, instance: String = "") {
override def toString = "%s:%s".format(host, port)
def toInstanceString = "('%s', '%s')".format(host, instance)
}
13 changes: 5 additions & 8 deletions src/main/scala/backend/strategy/ConsistentHash.scala
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package graphite.relay.backend.strategy

import java.util.zip.CRC32
import java.security.MessageDigest
import javax.inject.Inject
import javax.inject.Named
import scala.collection.SortedMap
Expand Down Expand Up @@ -33,21 +33,18 @@ class ConsistentHash @Inject() (backends: Backends,
}
}

/** Hash a given string. Note: The object instantation might hurt a bit, but
* at least it will make this method thread safe. Since it's in the tight
* look, it may need a set of eyes later on. */
/** Hash a given string in proper Graphite compatible format */
private def hash(string: String): Long = {
val hasher = new CRC32()
hasher.update(string.getBytes)
return hasher.getValue
val hash = MessageDigest.getInstance("MD5").digest(string.getBytes)
return Integer.parseInt(hash.map("%02X".format(_)).mkString.slice(0,4), 16)
}

/** Initialize the hashing circle basted on the backends this class was
* constructed with. */
private def getCircle = {
val backendKeys = backends.map { backend ⇒
(0 to replicas).map { i ⇒
hash("%s-%s".format(backend.toString, i)) → backend
hash("%s:%s".format(backend.toInstanceString, i)) → backend
}
}
SortedMap[Long, Backend](backendKeys.flatten:_*)
Expand Down
14 changes: 11 additions & 3 deletions src/main/scala/server/RelayUpdateHandler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package graphite.relay.server
import javax.inject.Singleton
import javax.inject.Inject

import java.lang.NumberFormatException

import org.apache.log4j.Logger

import org.jboss.netty.buffer.ChannelBuffers
Expand All @@ -28,10 +30,16 @@ class RelayUpdateHandler @Inject() (backendManager: BackendManager)
val message = e.getMessage().toString()
val parts = message.split(" ")
if(parts.length == 3) {
val update = Update(parts(0), parts(1).toDouble, parts(2).trim().toLong)
backendManager(update)
try {
val update = Update(parts(0), parts(1).toDouble, parts(2).trim().toLong)
backendManager(update)
} catch {
case ex: NumberFormatException => {}
}
} else {
log.error("Invalid Message %s".format(e))
// sorry, we don't care about this sh*t in logs
// TODO: make proper switch for loggig invalid msgs
//log.error("Invalid Message %s".format(e))
closeOnFlush(e.getChannel())
}
}
Expand Down
16 changes: 14 additions & 2 deletions src/test/scala/backend/strategy/ConsistentHashSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -43,14 +43,26 @@ class ConsistentHashSpec extends FlatSpec with ShouldMatchers {
}

it should "work when asking for a direct hash hit" in {
val backend = Backend("localhost", 123)
val backend = Backend("localhost", 123, "a")
val backends = Backends(backend)
val hash = new ConsistentHash(backends, 1)

val key = "%s-0".format(backend.toString)
val key = "%s:0".format(backend.toInstanceString)
hash(key) should equal (List(backend))
}

it should "comparing hashing results with Graphite web hashing" in {
val metric = "test.metric"
val backendA = Backend("127.0.0.1", 2101, "a")
val backendB = Backend("127.0.0.1", 2201, "b")
val backendC = Backend("127.0.0.1", 2301, "c")
val backendD = Backend("127.0.0.1", 2401, "d")
val backends = Backends(backendA,backendB,backendC,backendD)
// 100 - is built in value for Graphite web
val hash = new ConsistentHash(backends, 100)
hash(metric) should equal (List(backendB))
}

private def getBackends(count: Int) = {
(0 to count).map(i ⇒ Backend("localhost", i))
}
Expand Down