Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .git-blame-ignore-revs
Original file line number Diff line number Diff line change
@@ -1,2 +1,5 @@
# Scala Steward: Reformat with scalafmt 3.8.3
51df81a69373a587359f3677db5f5dea83bcd200

# Scala Steward: Reformat with scalafmt 3.8.5
54157d5b94fa5a6ff0ca6e18b1e72329cc514ebb
2 changes: 1 addition & 1 deletion .scalafmt.conf
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
version = "3.8.3"
version = "3.8.5"
runner.dialect = scala213
fileOverride {
"glob:**/src/main/scala-3/**" {
Expand Down
13 changes: 6 additions & 7 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -137,9 +137,9 @@ lazy val akkaProvided = List(
"com.typesafe.akka" %% "akka-remote" % akkaVersion % Provided
)
lazy val akkaReal = List(
"com.typesafe.akka" %% "akka-actor" % akkaVersion ,
"com.typesafe.akka" %% "akka-slf4j" % akkaVersion ,
"com.typesafe.akka" %% "akka-remote" % akkaVersion
"com.typesafe.akka" %% "akka-actor" % akkaVersion,
"com.typesafe.akka" %% "akka-slf4j" % akkaVersion,
"com.typesafe.akka" %% "akka-remote" % akkaVersion
)
lazy val core = project
.in(file("core"))
Expand Down Expand Up @@ -203,14 +203,13 @@ lazy val kubernetes = project
name := "tasks-kubernetes",
libraryDependencies ++= Seq(
"com.goyeau" %% "kubernetes-client" % "0.11.0",
"io.github.pityka" %% "selfpackage-jib" % "2.1.3",

"io.github.pityka" %% "selfpackage-jib" % "2.1.3"
) ++ akkaProvided
)
.dependsOn(core % "compile->compile;test->test")

lazy val kubernetesTest = project
.in(file("kubernetes-test"))
lazy val kubernetesTest = project
.in(file("kubernetes-test"))
.settings(commonSettings: _*)
.settings(
name := "tasks-kubernetes-test",
Expand Down
11 changes: 8 additions & 3 deletions core/src/main/scala/tasks/TaskSystemComponents.scala
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,9 @@ object TaskSystemComponents {

val packageServerPort = hostConfig.myAddressBind.getPort + 1

val packageServerHostname = hostConfig.myAddressExternal.getOrElse(hostConfig.myAddressBind).getHostName
val packageServerHostname = hostConfig.myAddressExternal
.getOrElse(hostConfig.myAddressBind)
.getHostName

val rootHistory = NoHistory

Expand Down Expand Up @@ -141,7 +143,9 @@ object TaskSystemComponents {

val emitLog = Resource.eval(IO {
scribe.info("Listening on: " + hostConfig.myAddressBind.toString)
scribe.info("External address: " + hostConfig.myAddressExternal.toString)
scribe.info(
"External address: " + hostConfig.myAddressExternal.toString
)
scribe.info("CPU: " + hostConfig.availableCPU.toString)
scribe.info("RAM: " + hostConfig.availableMemory.toString)
scribe.info("SCRATCH: " + hostConfig.availableScratch.toString)
Expand Down Expand Up @@ -726,7 +730,8 @@ object TaskSystemComponents {
"""
}

val externalAddress = hostConfig.myAddressExternal.getOrElse(hostConfig.myAddressBind)
val externalAddress =
hostConfig.myAddressExternal.getOrElse(hostConfig.myAddressBind)
val internalAddress = hostConfig.myAddressBind

val akkaProgrammaticalConfiguration =
Expand Down
5 changes: 3 additions & 2 deletions core/src/main/scala/tasks/deploy/HostConfiguration.scala
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ trait HostConfigurationFromConfig extends HostConfiguration {
def myAddressBind: SimpleSocketAddress = myAddress

def myAddressExternal: Option[SimpleSocketAddress] =
config.hostNameExternal.map{v =>
config.hostNameExternal.map { v =>
val spl = v.split(':')
SimpleSocketAddress(spl.head, if (spl.size > 1) spl(1).toInt else myPort)
}
Expand All @@ -100,7 +100,8 @@ trait HostConfigurationFromConfig extends HostConfiguration {

private def startApp = config.startApp

private val isMaster = myAddress == master || myAddressExternal.exists(_ == master)
private val isMaster =
myAddress == master || myAddressExternal.exists(_ == master)

lazy val myRoles: Set[Role] =
if (config.masterAddress.isDefined && !startApp) Set(Worker)
Expand Down
4 changes: 3 additions & 1 deletion core/src/main/scala/tasks/elastic/ElasticSupport.scala
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,9 @@ trait ElasticSupport {

trait ElasticSupportFromConfig {

def apply(implicit config: TasksConfig): cats.effect.Resource[cats.effect.IO,ElasticSupport]
def apply(implicit
config: TasksConfig
): cats.effect.Resource[cats.effect.IO, ElasticSupport]

}

Expand Down
11 changes: 7 additions & 4 deletions core/src/main/scala/tasks/elastic/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,17 @@ package tasks
import tasks.util.config.TasksConfig
import cats.effect._
package object elastic {
def makeElasticSupport(implicit config: TasksConfig): Resource[IO,Option[ElasticSupport]] =
def makeElasticSupport(implicit
config: TasksConfig
): Resource[IO, Option[ElasticSupport]] =
config.elasticSupport match {
case "" => Resource.pure(None)
case "NOENGINE" => Resource.pure(None)
case reflective =>
tasks.util
.reflectivelyInstantiateObject[ElasticSupportFromConfig](reflective)
.apply(config).map(Some(_))
tasks.util
.reflectivelyInstantiateObject[ElasticSupportFromConfig](reflective)
.apply(config)
.map(Some(_))

}
}
36 changes: 26 additions & 10 deletions core/src/main/scala/tasks/fileservice/FolderFileStorage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,10 @@ import fs2.Pipe

object FolderFileStorage {

private[tasks] def getContentHashOfFile(file: File,skip:Boolean): Int = if (skip) 0 else {
private[tasks] def getContentHashOfFile(file: File, skip: Boolean): Int = if (
skip
) 0
else {
openFileInputStream(file) { is =>
FileStorage.getContentHash(is)
}
Expand All @@ -64,7 +67,7 @@ class FolderFileStorage(val basePath: File)(implicit

private val tempFolder = new File(basePath, "___TMP___")
tempFolder.mkdirs()
private val tmpFolderCanonicalPath = tempFolder.getCanonicalPath()
private val tmpFolderCanonicalPath = tempFolder.getCanonicalPath()

private def createLocalTempFile() = {
def try1(i: Int): File = if (i == 0)
Expand All @@ -91,7 +94,10 @@ class FolderFileStorage(val basePath: File)(implicit
val sizeMatch = sizeOnDiskNow == expectedSize
val canRead = file.canRead
def contentMatch =
canRead && FolderFileStorage.getContentHashOfFile(file, config.skipContentHashCreationUponImport) === expectedHash
canRead && FolderFileStorage.getContentHashOfFile(
file,
config.skipContentHashCreationUponImport
) === expectedHash
val canDelete =
canRead && (expectedSize < 0 || (sizeMatch && contentMatch))
if (canDelete) {
Expand Down Expand Up @@ -127,7 +133,10 @@ class FolderFileStorage(val basePath: File)(implicit
val sizeMatch = sizeOnDiskNow === size
def contentMatch =
(config.skipContentHashVerificationAfterCache || (canRead && FolderFileStorage
.getContentHashOfFile(f,config.skipContentHashCreationUponImport) === hash))
.getContentHashOfFile(
f,
config.skipContentHashCreationUponImport
) === hash))
val pass = canRead && (size < 0 || (sizeMatch && contentMatch))

if (!pass) {
Expand All @@ -154,7 +163,10 @@ class FolderFileStorage(val basePath: File)(implicit
} else {
if (retrieveSizeAndHash) {
val size = f.length
val hash = FolderFileStorage.getContentHashOfFile(f,config.skipContentHashCreationUponImport)
val hash = FolderFileStorage.getContentHashOfFile(
f,
config.skipContentHashCreationUponImport
)
Some(SharedFileHelper.create(size, hash, path))
} else Some(SharedFileHelper.create(size = -1L, hash = 0, path))
}
Expand Down Expand Up @@ -265,7 +277,7 @@ class FolderFileStorage(val basePath: File)(implicit
.drain
.flatMap { _ =>
importFile(tmp, path, canMove = true)
.guarantee(IO.interruptible { if (tmp.exists) {tmp.delete} })
.guarantee(IO.interruptible { if (tmp.exists) { tmp.delete } })
.map(x => (x._1, x._2, x._3))
}
)
Expand All @@ -276,9 +288,10 @@ class FolderFileStorage(val basePath: File)(implicit
com.google.common.io.Files.equal(file1, file2)
else
file1.length == file2.length && FolderFileStorage.getContentHashOfFile(
file1,config.skipContentHashCreationUponImport
file1,
config.skipContentHashCreationUponImport
) == FolderFileStorage
.getContentHashOfFile(file2,config.skipContentHashCreationUponImport)
.getContentHashOfFile(file2, config.skipContentHashCreationUponImport)

override def importFile(
file: File,
Expand All @@ -291,15 +304,18 @@ class FolderFileStorage(val basePath: File)(implicit
IO.blocking({
scribe.debug(s"Importing file $file under name $proposed")
val size = file.length
val hash = FolderFileStorage.getContentHashOfFile(file,config.skipContentHashCreationUponImport)
val hash = FolderFileStorage.getContentHashOfFile(
file,
config.skipContentHashCreationUponImport
)
val managed = proposed.toManaged

if (assemblePath(managed).canRead) {
val finalFile = assemblePath(managed)
scribe.debug(
s"Found a file already in storage with the same name ($finalFile). Check for equality."
)
println((file,proposed))
println((file, proposed))
if (finalFile == file || checkContentEquality(finalFile, file))
(size, hash, managed)
else {
Expand Down
13 changes: 8 additions & 5 deletions core/src/main/scala/tasks/fileservice/SharedFileHelper.scala
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ private[tasks] object SharedFileHelper {
): IO[SharedFile] = {
val sharedFile = {
val proposedPath = prefix.propose(name)
service.storage.importFile(file, proposedPath,canMove=true).map { f =>
service.storage.importFile(file, proposedPath, canMove = true).map { f =>
if (deleteFile && file.exists()) {
file.delete
}
Expand Down Expand Up @@ -275,15 +275,18 @@ private[tasks] object SharedFileHelper {
val directoryPath = directory.getAbsolutePath
IO.parSequenceN(parallelism)(files.map { file =>
assert(file.getAbsolutePath.startsWith(directoryPath))
val pathElements :Seq[String] = file.getAbsolutePath.drop(directoryPath.size).split('/').filter(_.nonEmpty)
val pathElements: Seq[String] = file.getAbsolutePath
.drop(directoryPath.size)
.split('/')
.filter(_.nonEmpty)
val folders = pathElements.dropRight(1)
val name = pathElements.last
val prefix1 = prefix.append(folders)
val name = pathElements.last
val prefix1 = prefix.append(folders)
createFromFile(
file,
name,
deleteFile = false
)(prefix1,service,config,historyContext)
)(prefix1, service, config, historyContext)
})
}
}
Expand Down
26 changes: 14 additions & 12 deletions core/src/main/scala/tasks/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -140,41 +140,42 @@ package object tasks extends MacroCalls {
import cats.effect.unsafe.implicits.global

val resource = defaultTaskSystem(c)
val ((tsc,hostConfig), shutdown) = resource.allocated.unsafeRunSync()
val ((tsc, hostConfig), shutdown) = resource.allocated.unsafeRunSync()
if (hostConfig.myRoles.contains(App)) {
try {
Some(f(tsc))
} finally {
shutdown.unsafeRunSync()
}
} else {
scribe.info("Leaving withTaskSystem lambda without closing taskystem. This is only meaningful for forever running worker node.")
scribe.info(
"Leaving withTaskSystem lambda without closing taskystem. This is only meaningful for forever running worker node."
)
// Queue and Worker roles are never stopped
None
}

}

def defaultTaskSystem
: Resource[IO, (TaskSystemComponents,HostConfiguration)] =
: Resource[IO, (TaskSystemComponents, HostConfiguration)] =
defaultTaskSystem(None)

def defaultTaskSystem(
string: String
): Resource[IO, (TaskSystemComponents,HostConfiguration)] =
): Resource[IO, (TaskSystemComponents, HostConfiguration)] =
defaultTaskSystem(Some(ConfigFactory.parseString(string)))

/**
* The user of this resource should check the role of the returned HostConfiguration
* If it is not an App, then it is very likely that the correct use case is to return
* an IO.never
/** The user of this resource should check the role of the returned
* HostConfiguration If it is not an App, then it is very likely that the
* correct use case is to return an IO.never
*
* @param extraConf
* @return
*/
def defaultTaskSystem(
extraConf: Option[Config]
): Resource[IO, (TaskSystemComponents,HostConfiguration)] = {
): Resource[IO, (TaskSystemComponents, HostConfiguration)] = {

val configuration = () => {
ConfigFactory.invalidateCaches
Expand All @@ -195,8 +196,9 @@ package object tasks extends MacroCalls {
val elasticSupport = elastic.makeElasticSupport

val hostConfig = elasticSupport
.map(_.flatMap(_.hostConfig).getOrElse(MasterSlaveGridEngineChosenFromConfig))

.map(
_.flatMap(_.hostConfig).getOrElse(MasterSlaveGridEngineChosenFromConfig)
)

TaskSystemComponents.make(hostConfig, elasticSupport, tconfig)
}
Expand Down
5 changes: 3 additions & 2 deletions core/src/main/scala/tasks/queue/Queue.scala
Original file line number Diff line number Diff line change
Expand Up @@ -229,8 +229,9 @@ class TaskQueue(
case AskForWork(availableResource) =>
if (state.negotiation.isEmpty) {
scribe.debug(
s"AskForWork ${sender()} $availableResource ${state.negotiation} ${state.queuedTasks.map { case (_, (sch, _)) =>
(sch.description.taskId, sch.resource)
s"AskForWork ${sender()} $availableResource ${state.negotiation} ${state.queuedTasks.map {
case (_, (sch, _)) =>
(sch.description.taskId, sch.resource)
}.toSeq}"
)

Expand Down
17 changes: 13 additions & 4 deletions core/src/main/scala/tasks/util/config/TasksConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,8 @@ class TasksConfig(load: () => Config) {
val acceptableHeartbeatPause: FD =
raw.getDuration("tasks.failuredetector.acceptable-heartbeat-pause")

val hostImage = if (raw.hasPath("hosts.image") ) Some(raw.getString("hosts.image")) else None
val hostImage =
if (raw.hasPath("hosts.image")) Some(raw.getString("hosts.image")) else None

val hostNumCPU = raw.getInt("hosts.numCPU")

Expand All @@ -112,7 +113,10 @@ class TasksConfig(load: () => Config) {
val hostScratch = raw.getInt("hosts.scratch")

val hostName = raw.getString("hosts.hostname")
val hostNameExternal = if (raw.hasPath("hosts.hostnameExternal")) Some(raw.getString("hosts.hostnameExternal")) else None
val hostNameExternal =
if (raw.hasPath("hosts.hostnameExternal"))
Some(raw.getString("hosts.hostnameExternal"))
else None

val hostPort = raw.getInt("hosts.port")

Expand Down Expand Up @@ -266,9 +270,14 @@ class TasksConfig(load: () => Config) {
def kubernetesRamMin = raw.getInt("tasks.kubernetes.minimumlimits.ram")

def kubernetesPodSpec = {

if (raw.hasPath("tasks.kubernetes.podSpec"))
Some(raw.getConfig("tasks.kubernetes.podSpec").root().render(ConfigRenderOptions.concise()))
Some(
raw
.getConfig("tasks.kubernetes.podSpec")
.root()
.render(ConfigRenderOptions.concise())
)
else None
}

Expand Down
Loading