diff --git a/README.md b/README.md index 0ca062436..be830e938 100644 --- a/README.md +++ b/README.md @@ -46,10 +46,13 @@ Spark Compatibility | Crossdata Version | Spark Version | |-------------------|:--------------| +| 1.6.X | 1.6.X | +| 1.5.X | 1.6.X | +| 1.4.X | 1.6.X | | 1.3.X | 1.6.X | -| 1.2.X | 1.5.X | -| 1.1.X | 1.5.X | -| 1.0.X | 1.5.X | +| 1.2.X | 1.5.X | +| 1.1.X | 1.5.X | +| 1.0.X | 1.5.X | ============= diff --git a/server/src/main/scala/com/stratio/crossdata/server/CrossdataServer.scala b/server/src/main/scala/com/stratio/crossdata/server/CrossdataServer.scala index a4e6b6d00..a3c316c87 100644 --- a/server/src/main/scala/com/stratio/crossdata/server/CrossdataServer.scala +++ b/server/src/main/scala/com/stratio/crossdata/server/CrossdataServer.scala @@ -45,7 +45,7 @@ import scala.concurrent.{ExecutionContext, Future} import scala.util.{Failure, Try} -class CrossdataServer(progrConfig: Option[Config] = None) extends ServerConfig { +class CrossdataServer(progrConfig: Option[Config] = None, hzMembers: Option[Set[String]] = None) extends ServerConfig { override lazy val logger = Logger.getLogger(classOf[CrossdataServer]) @@ -55,7 +55,20 @@ class CrossdataServer(progrConfig: Option[Config] = None) extends ServerConfig { private val serverConfig = progrConfig map (_.withFallback(config)) getOrElse (config) - private val hzConfig: HzConfig = new XmlConfigBuilder().build() + private val hzConfig: HzConfig = { + val baseConfig = new XmlConfigBuilder().build() + hzMembers map { members => + insertHzMembers(baseConfig, members) + } getOrElse { + baseConfig + } + } + + private def insertHzMembers(hConfig: HzConfig, members: Set[String]) = + hConfig.setNetworkConfig( + hConfig.getNetworkConfig.setJoin( + hConfig.getNetworkConfig.getJoin.setTcpIpConfig( + hConfig.getNetworkConfig.getJoin.getTcpIpConfig.setMembers(members.toList)))) private def startDiscoveryClient(sdConfig: SDCH): CuratorFramework = { diff --git a/server/src/test/scala/com/stratio/crossdata/server/ServiceDiscoveryIT.scala b/server/src/test/scala/com/stratio/crossdata/server/ServiceDiscoveryIT.scala new file mode 100644 index 000000000..83cfa8ff5 --- /dev/null +++ b/server/src/test/scala/com/stratio/crossdata/server/ServiceDiscoveryIT.scala @@ -0,0 +1,88 @@ +/* + * Copyright (C) 2015 Stratio (http://stratio.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.stratio.crossdata.server + +import java.util.concurrent.Executors + +import com.stratio.crossdata.server.discovery.ServiceDiscoveryConfigHelper +import com.stratio.crossdata.test.BaseXDTest +import com.typesafe.config.{ConfigFactory, ConfigValueFactory} +import org.apache.curator.framework.CuratorFrameworkFactory +import org.apache.curator.retry.ExponentialBackoffRetry +import org.junit.runner.RunWith +import org.scalatest.BeforeAndAfterAll +import org.scalatest.junit.JUnitRunner + +import scala.concurrent.{Await, ExecutionContext, Future} +import scala.concurrent.duration._ +import scala.util.Try + +@RunWith(classOf[JUnitRunner]) +class ServiceDiscoveryIT extends BaseXDTest with BeforeAndAfterAll { + + import ServiceDiscoveryConstants._ + + val ZookeeperStreamingConnectionKey = "streaming.catalog.zookeeper.connectionString" + val ZookeeperConnection: Option[String] = + Try(ConfigFactory.load().getString(ZookeeperStreamingConnectionKey)).toOption + + var testServer: CrossdataServer = _ + + implicit val executionContext = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(8)) + + override def beforeAll(): Unit = { + + val testConfig = ConfigFactory.empty + .withValue("akka.remote.netty.tcp.hostname", ConfigValueFactory.fromAnyRef(TestHost)) + .withValue("akka.remote.netty.tcp.port", ConfigValueFactory.fromAnyRef(AkkaPort)) + .withValue("service-discovery.activated", ConfigValueFactory.fromAnyRef(true)) + .withValue( + "config.spark.jars", + ConfigValueFactory.fromAnyRef(s"server/target/2.11/crossdata-server-jar-with-dependencies.jar")) + + testServer = Await.result(Future(new CrossdataServer(Some(testConfig), Some(Set(s"$TestHost:$HzPort")))), 2 minutes) + + Await.result(Future(testServer.start), 2 minutes) + } + + override def afterAll(): Unit = { + Await.result(Future(testServer.stop), 2 minutes) + } + + "A Crossdata Server" should "write its hostname:port in ZK when service discovery is activated" in { + + ZookeeperConnection.isDefined should be (true) + + val curatorClient = CuratorFrameworkFactory.newClient( + ZookeeperConnection.get, + new ExponentialBackoffRetry(1000, 3)) + curatorClient.blockUntilConnected + val currentSeeds = new String(curatorClient.getData.forPath(ServiceDiscoveryConfigHelper.DefaultSeedsPath)) + + currentSeeds should be (s"$TestHost:$AkkaPort") + + val currentMembers = new String(curatorClient.getData.forPath(ServiceDiscoveryConfigHelper.DefaultProviderPath)) + + currentMembers should be (s"$TestHost:$HzPort") + } + +} + +object ServiceDiscoveryConstants { + val TestHost = "127.0.0.1" + val AkkaPort = 13456 + val HzPort = 5789 +}