- These notes were added while making a new AWS backend for Amazon AWS.
To start with, I just need to create a bunch of boilerplate which will eventually be filled in with all of the lovely AWS details!
- Added entries to
project/Settings.scala
,project/Dependencies.scala
andbuild.sbt
- This was mainly just a copy/paste from existing backend projects. I made a few typos renaming everything and linking the dependencies properly though!
- E.g. In my first commit I forgot to update the libraryDependencies name for my AWS backend project:
val awsBackendSettings = List(
name := "cromwell-aws-backend",
libraryDependencies ++= awsBackendDependencies
) ++ commonSettings
- I guessed that I'd need the AWS SDK so I included that immediately in Dependencies.scala:
val awsBackendDependencies = List(
"com.amazonaws" % "aws-java-sdk" % "1.11.41"
)
- In build.scala I had to also edit the
lazy val root
to include a new.aggregate(awsBackend)
and a new.dependsOn(awsBackend)
- This is probably going to be autogenerated for you in the directories specified in the above files. I'd already added my own directory structure and sbt managed to pick it up correctly in
supportedBackends/aws
.
- To run a job, Cromwell needs to instantiate a Job Execution actor. I'll fill in the details later but for now, I'll just add the constructor, props, and an unimplemented method definition for
execute
:
class AwsJobExecutionActor(override val jobDescriptor: BackendJobDescriptor,
override val configurationDescriptor: BackendConfigurationDescriptor) extends BackendJobExecutionActor {
override def execute: Future[BackendJobExecutionResponse] = ???
}
object AwsJobExecutionActor {
def props(jobDescriptor: BackendJobDescriptor,
configurationDescriptor: BackendConfigurationDescriptor): Props = Props(new AwsJobExecutionActor(jobDescriptor, configurationDescriptor))
}
- This is the class which tells Cromwell which classes represent job execution actors, initialization actors and so on. I'm just adding a skeleton for now, with a constructor of the form the Cromwell expects:
case class AwsBackendActorFactory(name: String, configurationDescriptor: BackendConfigurationDescriptor) extends BackendLifecycleActorFactory {
override def jobExecutionActorProps(jobDescriptor: BackendJobDescriptor,
initializationData: Option[BackendInitializationData],
serviceRegistryActor: ActorRef,
backendSingletonActor: Option[ActorRef]): Props = AwsJobExecutionActor.props(jobDescriptor, configurationDescriptor)
}
- There are a few other actor definitions that can be added to this file over time. But the only one that Cromwell requires to work is the job execution actor.
- Reference.conf is a set of reference options which shows people how to enable the backends that they want. So I'll add the initial config which people would add if they wanted the AWS backend (commented out in the reference so it's not enabled by default). This goes below all the other backend references:
#AWS {
# actor-factory = "cromwell.backend.impl.aws.AwsBackendActorFactory"
# config {
#
# }
#}
- OK so I've now told people how to add this backend... Now I actually add it to my own personal configuration file so I can try it out!
backend {
default = "AWS"
providers {
AWS {
actor-factory = "cromwell.backend.impl.aws.AwsBackendActorFactory"
config {
}
}
}
}
So we now have a backend skeleton! What happens when we run it? Well hopefully Cromwell will instantiate the backend far enough to reach the unimplemented execute method and then fall over. Let's give it a go!
- I fire up cromwell in server mode with my modified application.conf.
- I create a sample WDL that would sleep for 20 seconds if it actually worked: The input WDL:
task sleep {
command { sleep 20 }
}
workflow main {
call sleep
}
- I submit the WDL to the swagger endpoint (http://localhost:8000/swagger/index.html?url=/swagger/cromwell.yaml) and watch the server logs...
- And as expected:
2016-10-13 13:14:29,017 cromwell-system-akka.dispatchers.engine-dispatcher-39 INFO - MaterializeWorkflowDescriptorActor [UUID(ddd827ba)]: Call-to-Backend assignments: main.sleep -> AWS
2016-10-13 13:14:30,167 cromwell-system-akka.dispatchers.engine-dispatcher-39 INFO - WorkflowExecutionActor-ddd827ba-091f-4c6f-b98f-cc9825717007 [UUID(ddd827ba)]: Starting calls: main.sleep:NA:1
2016-10-13 13:14:30,983 cromwell-system-akka.actor.default-dispatcher-5 ERROR - guardian failed, shutting down system
scala.NotImplementedError: an implementation is missing
at scala.Predef$.$qmark$qmark$qmark(Predef.scala:230)
at cromwell.backend.impl.aws.AwsJobExecutionActor.execute(AwsJobExecutionActor.scala:12)
- OK, so now I just need to implement
execute(): Future[JobExecutionResult]
and Cromwell can interface with AWS. How hard can it be!
- This was a learning experience after using the Google pipelines service to submit jobs!
- To get myself started, I've manually created an ECS cluster which I've called
ecs-t2micro-cluster
via the ECS web console.
-
I see in the aws sdk docs that there's an AmazonECSAsyncClient class. That sounds promising! Luckily I already added the dependency on AWS SDK in Part 1 so I guess I can just write something basic in my AwsJobExecutionActor class and see what happens:
-
I ended up having to add some credentials options to the configuration file. The new
reference.conf
now looks like:
#AWS {
# actor-factory = "cromwell.backend.impl.aws.AwsBackendActorFactory"
# config {
# ## These two settings are required to authenticate with the ECS service:
# accessKeyId = "..."
# secretKey = "..."
# }
#}
- After a little bit of experimentation with the ECS API, I was able to come up with a backend that works but is very limited... It is entirely synchronous in the
execute
method. That's certainly not a final answer but it works OK for running a single task. And we can now run that singlesleep
command successfully on the Amazon EC2 Container Service!- The synchronous
execute
method:
- The synchronous
class AwsJobExecutionActor(override val jobDescriptor: BackendJobDescriptor,
override val configurationDescriptor: BackendConfigurationDescriptor) extends BackendJobExecutionActor {
val awsAccessKeyId = configurationDescriptor.backendConfig.as[String]("accessKeyId")
val awsSecretKey = configurationDescriptor.backendConfig.as[String]("secretKey")
val clusterName = "ecs-t2micro-cluster"
val credentials = new AWSCredentials {
override def getAWSAccessKeyId: String = awsAccessKeyId
override def getAWSSecretKey: String = awsSecretKey
}
val ecsAsyncClient = new AmazonECSAsyncClient(credentials)
override def execute: Future[BackendJobExecutionResponse] = {
val commandOverride = new ContainerOverride().withName("simple-app").withCommand(jobDescriptor.call.instantiateCommandLine(Map.empty, OnlyPureFunctions, identity).get)
val runRequest: RunTaskRequest = new RunTaskRequest()
.withCluster(clusterName)
.withCount(1)
.withTaskDefinition("ubuntuTask:1")
.withOverrides(new TaskOverride().withContainerOverrides(commandOverride))
val submitResultHandler = new AwsSdkAsyncHandler[RunTaskRequest, RunTaskResult]()
val _ = ecsAsyncClient.runTaskAsync(runRequest, submitResultHandler)
submitResultHandler.future map {
case AwsSdkAsyncResult(_, result) =>
log.info("AWS submission completed:\n{}", result.toString)
val taskArn= result.getTasks.asScala.head.getTaskArn
val taskDescription = waitUntilDone(taskArn)
log.info("AWS task completed!\n{}", taskDescription.toString)
SucceededResponse(jobDescriptor.key, Option(0), Map.empty, None, Seq.empty)
}
}
private def waitUntilDone(taskArn: String): Task = {
val describeTasksRequest = new DescribeTasksRequest().withCluster(clusterName).withTasks(List(taskArn).asJava)
val resultHandler = new AwsSdkAsyncHandler[DescribeTasksRequest, DescribeTasksResult]()
val _ = ecsAsyncClient.describeTasksAsync(describeTasksRequest, resultHandler)
val desribedTasks = Await.result(resultHandler.future, Duration.Inf)
val taskDescription = desribedTasks.result.getTasks.asScala.head
if (taskDescription.getLastStatus == DesiredStatus.STOPPED.toString) {
taskDescription
} else {
Thread.sleep(200)
waitUntilDone(taskArn)
}
}
}