in common/scala/src/main/scala/org/apache/openwhisk/core/yarn/YARNContainerFactory.scala [139:218]
def createService(): Unit = {
logging.info(this, "Creating Service with images: " + images.map(i => i.resolveImageName()).mkString(", "))
val componentList = images
.map(
i =>
ComponentDefinition(
i.name.replace('.', '-'), //name must be [a-z][a-z0-9-]*
Some(0), //start with zero containers
Some(runCommand),
Option.empty,
Some(ArtifactDefinition(i.resolveImageName(), "DOCKER")),
Some(ResourceDefinition(yarnConfig.cpus, yarnConfig.memory)),
Some(ConfigurationDefinition(Map(("YARN_CONTAINER_RUNTIME_DOCKER_RUN_OVERRIDE_DISABLE", "true")))),
List[String]()))
.toList
//Add kerberos def if necessary
var kerberosDef: Option[KerberosPrincipalDefinition] = None
if (yarnConfig.authType.equals(YARNRESTUtil.KERBEROSAUTH))
kerberosDef = Some(
KerberosPrincipalDefinition(Some(yarnConfig.kerberosPrincipal), Some(yarnConfig.kerberosKeytab)))
val service = ServiceDefinition(
Some(serviceName),
Some(version),
Some(description),
Some("STABLE"),
Some(yarnConfig.queue),
componentList,
kerberosDef)
//Submit service
val response =
YARNRESTUtil.submitRequestWithAuth(
yarnConfig.authType,
HttpMethods.POST,
s"${yarnConfig.masterUrl}/app/v1/services",
service.toJson.compactPrint)
//Handle response
response match {
case httpresponse(StatusCodes.OK, content) =>
logging.info(this, s"Service submitted. Response: $content")
case httpresponse(StatusCodes.Accepted, content) =>
logging.info(this, s"Service submitted. Response: $content")
case httpresponse(_, _) => YARNRESTUtil.handleYARNRESTError(logging)
}
//Wait for service start (up to serviceStartTimeoutMS milliseconds)
var started = false
var retryCount = 0
val maxRetryCount = serviceStartTimeoutMS / retryWaitMS
while (!started && retryCount < maxRetryCount) {
val serviceDef =
YARNRESTUtil.downloadServiceDefinition(yarnConfig.authType, serviceName, yarnConfig.masterUrl)(logging)
if (serviceDef == null) {
logging.info(this, "Service not found yet")
Thread.sleep(retryWaitMS)
} else {
serviceDef.state.getOrElse(None) match {
case "STABLE" | "STARTED" =>
logging.info(this, "YARN service achieved stable state")
started = true
case state =>
logging.info(
this,
s"YARN service is not in stable state yet ($retryCount/$maxRetryCount). Current state: $state")
Thread.sleep(retryWaitMS)
}
}
retryCount += 1
}
if (!started)
throw new Exception(s"After ${serviceStartTimeoutMS}ms YARN service did not achieve stable state")
}