Skip to content
This repository was archived by the owner on Jun 20, 2024. It is now read-only.

DO NOT MERGE: adding a "simple" web service with a few routes #55

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 12 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import scoverage.ScoverageSbtPlugin.ScoverageKeys._
////////////////////////////////////////////////////////////////////////////////////////////////

////////////////////////////////////////////////////////////////////////////////////////////////
// Use sbt-release to bupm the version numbers.
// Use sbt-release to bump the version numbers.
//
// see: http://blog.byjean.eu/2015/07/10/painless-release-with-sbt.html
////////////////////////////////////////////////////////////////////////////////////////////////
Expand All @@ -40,6 +40,11 @@ releaseProcess := Seq[ReleaseStep](
pushChanges
)

// Dependency settings
val akkaV = "2.3.12"
val sprayV = "1.3.2"
val DowngradedSprayV = "1.3.1"

////////////////////////////////////////////////////////////////////////////////////////////////
// For the aggregate (root) jar, override the name. For the sub-projects,
// see the build.sbt in each project folder.
Expand Down Expand Up @@ -148,6 +153,12 @@ lazy val core = Project(id="dagr-core", base=file("core"))
"org.reflections" % "reflections" % "0.9.10",
"com.typesafe" % "config" % "1.3.0",
"javax.servlet" % "javax.servlet-api" % "3.1.0",
"com.typesafe.akka" %% "akka-actor" % akkaV,
"io.spray" %% "spray-can" % sprayV,
"io.spray" %% "spray-routing" % sprayV,
"io.spray" %% "spray-client" % sprayV,
"io.spray" %% "spray-http" % sprayV,
"io.spray" %% "spray-json" % DowngradedSprayV,
//---------- Test libraries -------------------//
"org.scalatest" %% "scalatest" % "2.2.4" % "test->*" excludeAll ExclusionRule(organization="org.junit", name="junit")
)
Expand Down
6 changes: 6 additions & 0 deletions core/src/main/scala/dagr/core/DagrDef.scala
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,10 @@ package dagr.core
object DagrDef {
/** The type of identifier used to uniquely identify tasks tracked by the execution system. */
type TaskId = BigInt

/** Companion methods for TaskId */
object TaskId {
/** The apply method for TaskId */
def apply(value: Int): TaskId = BigInt(value)
}
}
28 changes: 21 additions & 7 deletions core/src/main/scala/dagr/core/cmdline/DagrCoreMain.scala
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/*
* The MIT License
*
* Copyright (c) 2015 Fulcrum Genomics LLC
* Copyright (c) 2015-2016 Fulcrum Genomics LLC
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
Expand All @@ -26,11 +26,12 @@ package dagr.core.cmdline
import java.io.PrintWriter
import java.nio.file.{Files, Path}

import dagr.commons.util.{LogLevel, LazyLogging, Logger}
import dagr.commons.io.{Io, PathUtil}
import dagr.commons.util.{LazyLogging, LogLevel, Logger}
import dagr.core.config.Configuration
import dagr.core.execsystem._
import dagr.core.tasksystem.Pipeline
import dagr.commons.io.{PathUtil, Io}
import dagr.core.webservice.DagrServer
import dagr.sopt.arg
import dagr.sopt.cmdline.{CommandLineParser, ValidationException}
import dagr.sopt.util.TermCode
Expand All @@ -49,7 +50,6 @@ object DagrCoreMain extends Configuration {
makeItSo(args, packageList = getPackageList)
}


/** Loads the various dagr scripts and puts them on the classpath. */
private def loadScripts(clp: DagrCoreMain): Unit = {
val scriptLoader = new DagrScriptManager
Expand All @@ -60,7 +60,6 @@ object DagrCoreMain extends Configuration {
)
}


/**
* Main entry point for the class. Parses the args and executes the pipeline. */
def makeItSo(args: Array[String], packageList: List[String] = getPackageList): Unit = {
Expand Down Expand Up @@ -140,7 +139,9 @@ class DagrCoreMain(
@arg(doc = "Set the memory available to dagr.", common = true)
val memory: Option[String] = None,
@arg(doc = "Write an execution report to this file, otherwise write to the stdout", common = true)
val report: Option[Path] = None
val report: Option[Path] = None,
@arg(doc = "Run the Dagr web-service. Dagr will not exit once all tasks have completed. ", common = true, flag="w")
val webservice: Boolean = false
) extends LazyLogging {

// These are not optional, but are only populated during configure()
Expand Down Expand Up @@ -206,6 +207,13 @@ class DagrCoreMain(
val taskMan = this.taskManager.getOrElse(throw new IllegalStateException("execute() called before configure()"))
val report = this.reportPath.getOrElse(throw new IllegalStateException("execute() called before configure()"))

// Set up the web service
if (webservice) {
val server = new DagrServer(taskMan)
server.startAllServices()
sys addShutdownHook server.stopAllServices()
}

taskMan.addTask(pipeline)
taskMan.runToCompletion()

Expand All @@ -215,8 +223,14 @@ class DagrCoreMain(
pw.close()

// return an exit code based on the number of non-completed tasks
taskMan.taskToInfoBiMapFor.count { case (task, info) =>
val numNotCompleted = taskMan.taskToInfoBiMap.count { case (task, info) =>
TaskStatus.isTaskNotDone(info.status, failedIsDone=false)
}

if (webservice) {
while (true) Thread.sleep(500)
}

numNotCompleted
}
}
2 changes: 2 additions & 0 deletions core/src/main/scala/dagr/core/config/Configuration.scala
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ object Configuration extends ConfigurationLike {
val LogDirectory = "dagr.log-directory"
val SystemCores = "dagr.system-cores"
val SystemMemory = "dagr.system-memory"
val WebServiceHost = "dagr.webservice-host"
val WebSErvicePort = "dagr.webservice-port"
}

// The global configuration instance
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/dagr/core/execsystem/TaskManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ object TaskManager extends LazyLogging {
taskManager.addTask(task = task)
taskManager.runToCompletion()

taskManager.taskToInfoBiMapFor
taskManager.taskToInfoBiMap
}
}

Expand Down Expand Up @@ -551,6 +551,6 @@ class TaskManager(taskManagerResources: TaskManagerResources = TaskManagerDefaul
processCompletedTask(taskId = node.taskId, doRetry = false)
}

taskToInfoBiMapFor
taskToInfoBiMap
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ private[execsystem] object TaskManagerLike {
}

/** A generic template for task managers */
private[execsystem] trait TaskManagerLike {
private[core] trait TaskManagerLike {

/** Gets the task associated with the identifier, if any
*
Expand Down Expand Up @@ -98,7 +98,7 @@ private[execsystem] trait TaskManagerLike {
*
* @return the task and task execution information bi-directional map.
*/
def taskToInfoBiMapFor: BiMap[Task, TaskExecutionInfo]
def taskToInfoBiMap: BiMap[Task, TaskExecutionInfo]

/** Checks if we have failed tasks.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ trait TaskStatusReporter {
* @param delimiter the delimiter between entries in a row
*/
def logReport(loggerMethod: String => Unit, delimiter: String = " "): Unit = {
val taskInfoMap: BiMap[Task, TaskExecutionInfo] = taskToInfoBiMapFor
val taskInfoMap: BiMap[Task, TaskExecutionInfo] = taskToInfoBiMap

// Create the task status table
val taskStatusTable: ListBuffer[List[String]] = new ListBuffer[List[String]]()
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/dagr/core/execsystem/TaskTracker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ trait TaskTracker extends TaskManagerLike with LazyLogging {
}
}

override def taskToInfoBiMapFor: BiMap[Task, TaskExecutionInfo] = {
override def taskToInfoBiMap: BiMap[Task, TaskExecutionInfo] = {
val map: BiMap[Task, TaskExecutionInfo] = new BiMap[Task, TaskExecutionInfo]()
for ((task, trackingInfo) <- taskAndGraphNode) {
map.add(task, trackingInfo.taskInfo)
Expand Down
43 changes: 43 additions & 0 deletions core/src/main/scala/dagr/core/webservice/ApiDataModels.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* The MIT License
*
* Copyright (c) 2016 Fulcrum Genomics LLC
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE.
*/

package dagr.core.webservice


import dagr.core.execsystem.TaskExecutionInfo

/** Stores the data to be returned by an end-point. Make sure that there exists a protocol and any custom JSON
* handling specified in [[DagrApiJsonSupport]].
*/
sealed abstract class DagrResponse

case class DagrVersionResponse(id: String) extends DagrResponse

case class DagrStatusResponse(infos: Iterable[TaskExecutionInfo]) extends DagrResponse

case class DagrTaskScriptResponse(script: String) extends DagrResponse

case class DagrTaskLogResponse(log: String) extends DagrResponse

case class DagrTaskInfoResponse(info: TaskExecutionInfo) extends DagrResponse
87 changes: 87 additions & 0 deletions core/src/main/scala/dagr/core/webservice/DagrApiHandler.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
/*
* The MIT License
*
* Copyright (c) 2016 Fulcrum Genomics LLC
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE.
*/

package dagr.core.webservice

import akka.actor.{Actor, Props}
import dagr.commons.util.LazyLogging
import dagr.core.execsystem.{TaskManagerLike, TaskExecutionInfo}
import dagr.core.webservice.PerRequest.RequestComplete
import spray.http.StatusCodes
import scala.concurrent.duration._
import akka.util.Timeout
import java.nio.file.Path
import spray.httpx.SprayJsonSupport._
import dagr.core.DagrDef._

/** This object stores the definitions for the API requests. */
object DagrApiHandler {
def props(taskManager: TaskManagerLike): Props = {
Props(new DagrApiHandler(taskManager))
}

sealed trait DagrRequest
final case class DagrVersionRequest() extends DagrRequest
final case class DagrStatusRequest() extends DagrRequest
final case class DagrTaskScriptRequest(id: TaskId) extends DagrRequest
final case class DagrTaskLogRequest(id: TaskId) extends DagrRequest
final case class DagrTaskInfoRequest(id: TaskId) extends DagrRequest
}

/** Receives a request, performs the appropriate logic, and sends back a response */
class DagrApiHandler(val taskManager: TaskManagerLike) extends Actor with LazyLogging {
// needed for marshalling
import DagrApiJsonSupport._
import context.dispatcher
import DagrApiHandler._

implicit val timeout = Timeout(2.seconds)
implicit val system = context.system

override def receive = {
case DagrVersionRequest() =>
context.parent ! RequestComplete(StatusCodes.OK, DagrVersionResponse(DagrApiService.version))
case DagrStatusRequest() =>
context.parent ! RequestComplete(StatusCodes.OK, DagrStatusResponse(taskManager.taskToInfoBiMap.values))
case DagrTaskScriptRequest(id) =>
applyTaskInfo(id, info => context.parent ! RequestComplete(StatusCodes.OK, DagrTaskScriptResponse(pathToString(info.script))))
case DagrTaskLogRequest(id) =>
applyTaskInfo(id, info => context.parent ! RequestComplete(StatusCodes.OK, DagrTaskLogResponse(pathToString(info.logFile))))
case DagrTaskInfoRequest(id) =>
applyTaskInfo(id, info => context.parent ! RequestComplete(StatusCodes.OK, DagrTaskInfoResponse(info)))
}

private def pathToString(path: Path): String = scala.io.Source.fromFile(path.toFile).mkString

/** Handles the case that the specific task identifier does not exist and sends back a bad request message, otherwise,
* it applies the given method.
*/
private def applyTaskInfo(id: TaskId, f: (TaskExecutionInfo => Unit)): Unit = {
taskManager.taskExecutionInfoFor(id) match {
case Some(info) => f(info)
case None => context.parent ! RequestComplete(StatusCodes.NotFound, s"Task with id '$id' not found")
}
}
}

Loading