Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .git-blame-ignore-revs
Original file line number Diff line number Diff line change
@@ -1,2 +1,5 @@
# Scala Steward: Reformat with scalafmt 3.8.3
51df81a69373a587359f3677db5f5dea83bcd200

# Scala Steward: Reformat with scalafmt 3.8.6
885cb584da06d02e8af9023dbe4773dd86214eae
2 changes: 1 addition & 1 deletion .scalafmt.conf
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
version = "3.8.3"
version = "3.8.6"
runner.dialect = scala213
fileOverride {
"glob:**/src/main/scala-3/**" {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,7 @@ private final class ProcessCreateNode(
scribe.debug(
s"Exec",
scribe.data("exec-list", cmd.mkString("['", "','", "']")),
scribe.data("explain","command to spawn new process")
scribe.data("explain", "command to spawn new process")
)
fs2.io.process.ProcessBuilder
.apply(cmd.head, cmd.drop(1))
Expand Down
63 changes: 51 additions & 12 deletions core/src/main/scala/tasks/queue/Launcher.scala
Original file line number Diff line number Diff line change
Expand Up @@ -234,8 +234,12 @@ private[tasks] object Launcher {
filePrefix,
address
)
scribe.debug("RemainsAfterLaunch",scheduleTask,newState.availableResources,address)

scribe.debug(
"RemainsAfterLaunch",
scheduleTask,
newState.availableResources,
address
)

val task: Task =
new Task(
Expand Down Expand Up @@ -292,7 +296,7 @@ private[tasks] object Launcher {
ref.flatModifyFull { case (poll, state) =>
if (!state.denyWorkBeforeShutdown && !state.waitingForWork) {

scribe.debug(s"WillAskForWork ", state.availableResources,address)
scribe.debug(s"WillAskForWork ", state.availableResources, address)
val effect: IO[Unit] = poll(
queue
.askForWork(address, state.availableResources, node)
Expand All @@ -309,17 +313,30 @@ private[tasks] object Launcher {
)
) *>
ref.update { state =>
scribe.debug(s"QueueError ", state.availableResources,address)
scribe.debug(
s"QueueError ",
state.availableResources,
address
)
state.copy(waitingForWork = false)
}
case Right(Left(MessageData.NothingForSchedule)) =>
ref.update { state =>
scribe.debug(s"NothingForSchedule ", state.availableResources,address)
scribe.debug(
s"NothingForSchedule ",
state.availableResources,
address
)
state.copy(waitingForWork = false)
}
case Right(Right(MessageData.Schedule(scheduleTask))) =>
ref.flatModifyFull { case (poll, state) =>
scribe.debug(s"Received Schedule ", scheduleTask,state.availableResources,address)
scribe.debug(
s"Received Schedule ",
scheduleTask,
state.availableResources,
address
)
val st0 = state.copy(waitingForWork = false)
val (newState, sideEffects) =
if (!st0.denyWorkBeforeShutdown) {
Expand All @@ -328,7 +345,12 @@ private[tasks] object Launcher {
val (allocated, st2, io1) = launch(st1, scheduleTask, ref)
(st2, io1)
} else (st0, IO.unit)
scribe.debug(s"State after Schedule",scheduleTask,newState.availableResources,address)
scribe.debug(
s"State after Schedule",
scheduleTask,
newState.availableResources,
address
)
newState -> sideEffects
}
}
Expand Down Expand Up @@ -360,7 +382,6 @@ private[tasks] object Launcher {

def release(task: Task) = {
ref.update { state =>

val allocated = state.runningTasks.find(_._1 == task).map(_._3)
val newState = if (allocated.isEmpty) {
scribe.error("Can't find proxy ", task.proxy, address)
Expand All @@ -378,8 +399,16 @@ private[tasks] object Launcher {
)
)
}
scribe.debug("ReleaseLauncherResourceBefore",address,state.availableResources)
scribe.debug("ReleaseLauncherResource",address,newState.availableResources)
scribe.debug(
"ReleaseLauncherResourceBefore",
address,
state.availableResources
)
scribe.debug(
"ReleaseLauncherResource",
address,
newState.availableResources
)
newState
} *> askForWork(ref, messenger, address, queue)
}
Expand Down Expand Up @@ -472,8 +501,18 @@ private[tasks] object Launcher {
)
}
val st2 = st1.copy(lastTaskFinished = System.nanoTime)
scribe.debug(s"TaskFinishedOld ", scheduleTask,state.availableResources,address)
scribe.debug(s"TaskFinished ", scheduleTask,st2.availableResources,address)
scribe.debug(
s"TaskFinishedOld ",
scheduleTask,
state.availableResources,
address
)
scribe.debug(
s"TaskFinished ",
scheduleTask,
st2.availableResources,
address
)
(st2, sideEffect)
}

Expand Down
120 changes: 61 additions & 59 deletions core/src/main/scala/tasks/queue/QueueImpl.scala
Original file line number Diff line number Diff line change
Expand Up @@ -194,8 +194,8 @@ object QueueImpl {
unmanagedResource: tasks.shared.ResourceAvailable
)(implicit
config: TasksConfig
) = Resource.make(Ref.of[IO, QueueImpl.State](QueueImpl.State.empty).flatMap {
ref =>
) = Resource.make(
Ref.of[IO, QueueImpl.State](QueueImpl.State.empty).flatMap { ref =>
Ref.of[IO, List[FiberIO[Unit]]](Nil).flatMap { ref2 =>
val q = new QueueImpl(
ref = Transaction.fromRef(ref),
Expand All @@ -209,7 +209,8 @@ object QueueImpl {
)
q.startCounterLoops.map(_ => q)
}
})(_.release)
}
)(_.release)
}

private[tasks] class QueueImpl(
Expand Down Expand Up @@ -515,70 +516,71 @@ private[tasks] class QueueImpl(
)
)
) *> IO
.parSequenceN(1)(requestedNodes.toList.map { case (request, _) =>
createNode
.requestOneNewJobFromJobScheduler(request)
.flatMap {
case Left(e) =>
IO(
scribe.debug(
"NodeRequestFailed",
scribe.data("info", e),
scribe.data(
"explain",
"This is normal if there is no more capacity. " +
"Note: failed requests still count against maxNodesCumulative " +
"as a defensive measure to bound total attempts."
)
)
) *>
ref.update(
_.update(
NodeEvent(NodeRegistryState.NodeRequested)
.parSequenceN(1)(requestedNodes.toList.map {
case (request, _) =>
createNode
.requestOneNewJobFromJobScheduler(request)
.flatMap {
case Left(e) =>
IO(
scribe.debug(
"NodeRequestFailed",
scribe.data("info", e),
scribe.data(
"explain",
"This is normal if there is no more capacity. " +
"Note: failed requests still count against maxNodesCumulative " +
"as a defensive measure to bound total attempts."
)
)
)
case Right((jobId, size)) =>
IO(
scribe.info(
s"NodeRequestSucceeded",
jobId,
size
)
) *> ref.update(
_.update(NodeEvent(NodeRegistryState.NodeRequested))
.update(
NodeEvent(
NodeRegistryState.NodeIsPending(jobId, size)
) *>
ref.update(
_.update(
NodeEvent(NodeRegistryState.NodeRequested)
)
)
) *> IO
.sleep(config.pendingNodeTimeout)
.flatMap { initFailed =>
ref.flatModify { state =>
if (state.nodes.pending.contains(jobId)) {
scribe.warn(
"NodeInitFailed: ",
jobId,
scribe.data(
"explain",
"The node was allocated but the peer process on the node failed to make initial contact."
)
case Right((jobId, size)) =>
IO(
scribe.info(
s"NodeRequestSucceeded",
jobId,
size
)
) *> ref.update(
_.update(NodeEvent(NodeRegistryState.NodeRequested))
.update(
NodeEvent(
NodeRegistryState.NodeIsPending(jobId, size)
)

state.update(
NodeEvent(
NodeRegistryState.InitFailed(jobId)
)
) *> IO
.sleep(config.pendingNodeTimeout)
.flatMap { initFailed =>
ref.flatModify { state =>
if (state.nodes.pending.contains(jobId)) {
scribe.warn(
"NodeInitFailed: ",
jobId,
scribe.data(
"explain",
"The node was allocated but the peer process on the node failed to make initial contact."
)
)
) ->
shutdownNode.shutdownPendingNode(jobId)

} else (state, IO.unit)
state.update(
NodeEvent(
NodeRegistryState.InitFailed(jobId)
)
) ->
shutdownNode.shutdownPendingNode(jobId)

} else (state, IO.unit)
}
}
}
.start
.void
.start
.void

}
}
})
.void

Expand Down
9 changes: 6 additions & 3 deletions core/src/main/scala/tasks/util/LocalMessenger.scala
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,12 @@ private[tasks] class LocalMessenger(
case None =>
(map.updated(address.withoutUri, ch), IO.unit)
case Some(_) =>
(map, IO.raiseError[Unit](
new RuntimeException(s"Address $address already subscribed")
))
(
map,
IO.raiseError[Unit](
new RuntimeException(s"Address $address already subscribed")
)
)
}
}.flatten
} yield ch.stream
Expand Down
10 changes: 8 additions & 2 deletions core/src/test/scala/tasks/CheckContentEqualityTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,10 @@ class CheckContentEqualityTestSuite
// so no backup is created. With the bug, .old.0 would exist because
// checkContentEquality wrongly returned false for non-empty identical files.
val backup =
new java.io.File(folder, "test" + java.io.File.separator + "file.txt.old.0")
new java.io.File(
folder,
"test" + java.io.File.separator + "file.txt.old.0"
)
backup.exists shouldBe false
}).unsafeRunSync()
}
Expand Down Expand Up @@ -83,7 +86,10 @@ class CheckContentEqualityTestSuite
} yield {
// Different content β€” old file should be renamed to .old.0
val backup =
new java.io.File(folder, "test" + java.io.File.separator + "file.txt.old.0")
new java.io.File(
folder,
"test" + java.io.File.separator + "file.txt.old.0"
)
backup.exists shouldBe true
}).unsafeRunSync()
}
Expand Down
4 changes: 3 additions & 1 deletion core/src/test/scala/tasks/LocalMessengerTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ class LocalMessengerTestSuite extends FunSuite with Matchers {
result <- messenger.subscribe(address).attempt
} yield {
result.isLeft shouldBe true
result.swap.getOrElse(fail()).getMessage should include("already subscribed")
result.swap.getOrElse(fail()).getMessage should include(
"already subscribed"
)
}
}
.unsafeRunSync()
Expand Down
2 changes: 1 addition & 1 deletion core/src/test/scala/tasks/ParallelSubmissionTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ hosts.gpus = [0,1,2,3]
test("parallel submission of recursive fibonacci with 'gpu' counting ") {
IO.parSequenceN(500)((1 to 3000).toList.map { n0 =>
{
val n = math.min(n0,8)
val n = math.min(n0, 8)
val r = (fibtask(FibInput(n))(
ResourceRequest(cpu = (1, 1), memory = 1, gpu = 1, scratch = 1)
)).map(_.n)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,9 +120,9 @@ hosts.gpus = [0,1,2,3]
import ParallelSubmissionTestNonRecursive._

test("parallel submission of non-recursive task with 'gpu' counting ") {
IO.parSequenceN(500)((1 to 10000).toList.map { n1=>
IO{
val n = math.min(30,n1)
IO.parSequenceN(500)((1 to 10000).toList.map { n1 =>
IO {
val n = math.min(30, n1)
val r = (fibtask(FibInput(n))(
ResourceRequest(cpu = (1, 1), memory = 1, gpu = 1, scratch = 1)
)).map(_.n)
Expand Down
Loading
Loading