in samza-yarn3/src/main/scala/org/apache/samza/job/yarn/ClientHelper.scala [97:240]
def submitApplication(config: Config, cmds: List[String], env: Option[Map[String, String]], name: Option[String]): Option[ApplicationId] = {
val app = yarnClient.createApplication
val newAppResponse = app.getNewApplicationResponse
val yarnConfig = new YarnConfig(config)
val packagePath = new Path(yarnConfig.getPackagePath)
val mem = yarnConfig.getAMContainerMaxMemoryMb
val cpu = yarnConfig.getAMContainerMaxCpuCores
val queueName = Option(yarnConfig.getQueueName)
val appMasterLabel = Option(yarnConfig.getAMContainerLabel)
// If we are asking for memory more than the max allowed, shout out
if (mem > newAppResponse.getMaximumResourceCapability().getMemory()) {
throw new SamzaException("You're asking for more memory (%s) than is allowed by YARN: %s" format
(mem, newAppResponse.getMaximumResourceCapability().getMemory()))
}
// If we are asking for cpu more than the max allowed, shout out
if (cpu > newAppResponse.getMaximumResourceCapability().getVirtualCores()) {
throw new SamzaException("You're asking for more CPU (%s) than is allowed by YARN: %s" format
(cpu, newAppResponse.getMaximumResourceCapability().getVirtualCores()))
}
jobContext = new JobContext
jobContext.setAppId(newAppResponse.getApplicationId)
val appId = jobContext.getAppId
info("preparing to request resources for app id %s" format appId.get)
val appCtx = app.getApplicationSubmissionContext
val containerCtx = Records.newRecord(classOf[ContainerLaunchContext])
val resource = Records.newRecord(classOf[Resource])
val packageResource = Records.newRecord(classOf[LocalResource])
name match {
case Some(name) => { appCtx.setApplicationName(name) }
case None => { appCtx.setApplicationName(appId.get.toString) }
}
appMasterLabel match {
case Some(label) => {
appCtx.setNodeLabelExpression(label)
info("set yarn node label expression to %s" format queueName)
}
case None =>
}
queueName match {
case Some(queueName) => {
appCtx.setQueue(queueName)
info("set yarn queue name to %s" format queueName)
}
case None =>
}
// TODO: remove the customized approach for package resource and use the common one.
// But keep it now for backward compatibility.
// set the local package so that the containers and app master are provisioned with it
val packageUrl = ConverterUtils.getYarnUrlFromPath(packagePath)
val fs = packagePath.getFileSystem(conf)
val fileStatus = fs.getFileStatus(packagePath)
packageResource.setResource(packageUrl)
info("set package url to %s for %s" format (packageUrl, appId.get))
packageResource.setSize(fileStatus.getLen)
info("set package size to %s for %s" format (fileStatus.getLen, appId.get))
packageResource.setTimestamp(fileStatus.getModificationTime)
packageResource.setType(LocalResourceType.ARCHIVE)
packageResource.setVisibility(LocalResourceVisibility.APPLICATION)
resource.setMemory(mem)
info("set memory request to %s for %s" format (mem, appId.get))
resource.setVirtualCores(cpu)
info("set cpu core request to %s for %s" format (cpu, appId.get))
appCtx.setResource(resource)
containerCtx.setCommands(cmds.asJava)
info("set command to %s for %s" format (cmds, appId.get))
appCtx.setApplicationId(appId.get)
info("set app ID to %s" format appId.get)
if (new JobConfig(config).getApplicationMasterHighAvailabilityEnabled) {
appCtx.setKeepContainersAcrossApplicationAttempts(true)
info("keep containers alive across application attempts for AM High availability")
}
val localResources: HashMap[String, LocalResource] = HashMap[String, LocalResource]()
localResources += "__package" -> packageResource
// include the resources from the universal resource configurations
try {
val resourceMapper = new LocalizerResourceMapper(new LocalizerResourceConfig(config), new YarnConfiguration(conf))
localResources ++= resourceMapper.getResourceMap.asScala
} catch {
case e: LocalizerResourceException => {
throw new SamzaException("Exception during resource mapping from config. ", e)
}
}
if (UserGroupInformation.isSecurityEnabled) {
validateJobConfig(config)
setupSecurityToken(fs, containerCtx)
info("set security token for %s" format appId.get)
val acls = yarnConfig.getYarnApplicationAcls
if (!acls.isEmpty) {
containerCtx.setApplicationACLs(acls)
}
val amLocalResources = setupAMLocalResources(fs, Option(yarnConfig.getYarnKerberosPrincipal), Option(yarnConfig.getYarnKerberosKeytab))
localResources ++= amLocalResources
val securityYarnConfig = getSecurityYarnConfig
val coordinatorStreamWriter: CoordinatorStreamWriter = new CoordinatorStreamWriter(config)
coordinatorStreamWriter.start()
securityYarnConfig.foreach {
case (key: String, value: String) =>
coordinatorStreamWriter.sendMessage(SetConfig.TYPE, key, value)
}
coordinatorStreamWriter.stop()
}
// prepare all local resources for localizer
info("localResources is: %s" format localResources)
containerCtx.setLocalResources(localResources.asJava)
info("set local resources on application master for %s" format appId.get)
env match {
case Some(env) => {
containerCtx.setEnvironment(env.asJava)
info("set environment variables to %s for %s" format (env, appId.get))
}
case None =>
}
appCtx.setAMContainerSpec(containerCtx)
appCtx.setApplicationType(yarnConfig.getYarnApplicationType)
info("submitting application request for %s" format appId.get)
yarnClient.submitApplication(appCtx)
appId
}