Skip to content
This repository was archived by the owner on Jun 20, 2024. It is now read-only.

Adding a pipeline to parse dagr execution reports. #375

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 87 additions & 0 deletions core/src/main/scala/dagr/core/execsystem/FinalStatusReporter.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,24 @@

package dagr.core.execsystem

import java.nio.file.{Files, Path, Paths}
import java.time.format.DateTimeFormatter
import java.time.{Duration, Instant, ZoneId}

import com.fulcrumgenomics.commons.CommonsDef.DirPath
import com.fulcrumgenomics.commons.collection.BiMap
import com.fulcrumgenomics.commons.io.Io
import com.fulcrumgenomics.commons.util.StringUtil._
import com.fulcrumgenomics.commons.util.TimeUtil._
import dagr.core.DagrDef.TaskId
import dagr.core.execsystem.GraphNodeState.GraphNodeState
import dagr.core.execsystem.TaskStatus.TaskStatus
import dagr.core.tasksystem.Task

import scala.collection.mutable
import scala.collection.mutable.ListBuffer


/** Provides a method to provide an execution report for a task tracker */
trait FinalStatusReporter {
this: TaskTracker =>
Expand Down Expand Up @@ -100,3 +110,80 @@ trait FinalStatusReporter {
loggerMethod("\n" + columnIt(taskStatusCountTable.toList, delimiter))
}
}

/** Stores some useful information about a task, as read from an execution report. */
case class TaskInfo
(
report: Option[Path],
id: TaskId,
name: String,
status: TaskStatus,
cores: Cores,
memory: Memory,
submissionDate: Instant,
startDate: Instant,
endDate: Instant,
executionTime: Duration,
wallClockTime: Duration,
script: Path,
log: Path,
attempts: Int,
graphNodeState: GraphNodeState
) {
/** True if this a [[Pipeline]], false if a [[UnitTask]]. */
def isPipeline: Boolean = !log.toFile.exists()
private val _id: Int = this.id.toInt
override def hashCode(): Int = _id // works, since ids should be unique, and speeds up this method!
}

object TaskInfo {
private val TimeStampFormatter = DateTimeFormatter.ofPattern("uuuu-MM-dd HH:mm:ss").withZone(ZoneId.systemDefault())

private def fromDate(date: String): Instant = Instant.from(TimeStampFormatter.parse(date))

private def fromDuration(duration: String): Duration = {
duration.split(':').map(_.toInt) match {
case Array(hours, minutes, seconds) => Duration.ofSeconds(seconds + 60 * (minutes + 60 * hours))
case _ => throw new IllegalArgumentException(s"Could not parse duration: $duration")
}
}

/** Parses a line from an execution report. */
def apply(line: String, report: Option[Path] = None, logsDir: Option[DirPath] = None): TaskInfo = {
val fields = line.split("\\s{2,}")
require(fields.length == 14, s"Expected 14 fields, found ${fields.length}: ${fields.toList}")

val log = Paths.get(fields(11)) match {
case _log if Files.exists(_log) && !Files.isRegularFile(_log) && Files.isReadable(_log) => _log
case _log => logsDir.map(_.resolve(_log.getFileName)).getOrElse(_log)
}

new TaskInfo(
report = report,
id = fields(0).toInt,
name = fields(1).replace(' ', '_'),
status = TaskStatus.withName(fields(2)),
cores = Cores(fields(3).toDouble),
memory = Memory(fields(4)),
submissionDate = fromDate(fields(5)),
startDate = fromDate(fields(6)),
endDate = fromDate(fields(7)),
executionTime = fromDuration(fields(8)),
wallClockTime = fromDuration(fields(9)),
script = Paths.get(fields(10)),
log = log,
attempts = fields(12).toInt,
graphNodeState = GraphNodeState.withName(fields(13))
)
}

/** Slurps in the lines from an execution report. */
def from(report: Path, logsDir: Option[DirPath] = None): Iterator[TaskInfo] = {
Io.readLines(report)
.drop(1) // header line
.map(_.trim) // leading and trailing whitespace exist
.takeWhile(_.nonEmpty) // hacky way of finding the end of the file
.filterNot(_.contains(" NA ")) // hacky way to find tasks with no start date
.map(line => TaskInfo(line, report=Some(report), logsDir=logsDir))
}
}
2 changes: 1 addition & 1 deletion core/src/main/scala/dagr/core/execsystem/Resource.scala
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ object Resource {
* @tparam T the type of number the value is expressed in, e.g. Long or Double
* @tparam R self-referential type required to make all the operators work nicely
*/
sealed abstract class Resource[T, R <: Resource[T,R]](val value: T)(implicit numeric :Numeric[T]) {
sealed abstract class Resource[T, R <: Resource[T,R]](val value: T)(implicit numeric: Numeric[T]) {
if (numeric.toDouble(value) < 0) {
throw new IllegalArgumentException(s"Cannot have negative resource. ${getClass.getSimpleName}=" + value)
}
Expand Down
Loading