def submitApplication()

in samza-yarn3/src/main/scala/org/apache/samza/job/yarn/ClientHelper.scala [97:240]


  def submitApplication(config: Config, cmds: List[String], env: Option[Map[String, String]], name: Option[String]): Option[ApplicationId] = {
    val app = yarnClient.createApplication
    val newAppResponse = app.getNewApplicationResponse

    val yarnConfig = new YarnConfig(config)

    val packagePath = new Path(yarnConfig.getPackagePath)
    val mem = yarnConfig.getAMContainerMaxMemoryMb
    val cpu = yarnConfig.getAMContainerMaxCpuCores
    val queueName = Option(yarnConfig.getQueueName)
    val appMasterLabel = Option(yarnConfig.getAMContainerLabel)

    // If we are asking for memory more than the max allowed, shout out
    if (mem > newAppResponse.getMaximumResourceCapability().getMemory()) {
      throw new SamzaException("You're asking for more memory (%s) than is allowed by YARN: %s" format
        (mem, newAppResponse.getMaximumResourceCapability().getMemory()))
    }

    // If we are asking for cpu more than the max allowed, shout out
    if (cpu > newAppResponse.getMaximumResourceCapability().getVirtualCores()) {
      throw new SamzaException("You're asking for more CPU (%s) than is allowed by YARN: %s" format
        (cpu, newAppResponse.getMaximumResourceCapability().getVirtualCores()))
    }

    jobContext = new JobContext
    jobContext.setAppId(newAppResponse.getApplicationId)
    val appId = jobContext.getAppId

    info("preparing to request resources for app id %s" format appId.get)

    val appCtx = app.getApplicationSubmissionContext
    val containerCtx = Records.newRecord(classOf[ContainerLaunchContext])
    val resource = Records.newRecord(classOf[Resource])
    val packageResource = Records.newRecord(classOf[LocalResource])

    name match {
      case Some(name) => { appCtx.setApplicationName(name) }
      case None => { appCtx.setApplicationName(appId.get.toString) }
    }

    appMasterLabel match {
      case Some(label) => {
        appCtx.setNodeLabelExpression(label)
        info("set yarn node label expression to %s" format queueName)
      }
      case None =>
    }

    queueName match {
      case Some(queueName) => {
        appCtx.setQueue(queueName)
        info("set yarn queue name to %s" format queueName)
      }
      case None =>
    }

    // TODO: remove the customized approach for package resource and use the common one.
    // But keep it now for backward compatibility.
    // set the local package so that the containers and app master are provisioned with it
    val packageUrl = ConverterUtils.getYarnUrlFromPath(packagePath)
    val fs = packagePath.getFileSystem(conf)
    val fileStatus = fs.getFileStatus(packagePath)

    packageResource.setResource(packageUrl)
    info("set package url to %s for %s" format (packageUrl, appId.get))
    packageResource.setSize(fileStatus.getLen)
    info("set package size to %s for %s" format (fileStatus.getLen, appId.get))
    packageResource.setTimestamp(fileStatus.getModificationTime)
    packageResource.setType(LocalResourceType.ARCHIVE)
    packageResource.setVisibility(LocalResourceVisibility.APPLICATION)

    resource.setMemory(mem)
    info("set memory request to %s for %s" format (mem, appId.get))
    resource.setVirtualCores(cpu)
    info("set cpu core request to %s for %s" format (cpu, appId.get))
    appCtx.setResource(resource)
    containerCtx.setCommands(cmds.asJava)
    info("set command to %s for %s" format (cmds, appId.get))

    appCtx.setApplicationId(appId.get)
    info("set app ID to %s" format appId.get)

    if (new JobConfig(config).getApplicationMasterHighAvailabilityEnabled) {
      appCtx.setKeepContainersAcrossApplicationAttempts(true)
      info("keep containers alive across application attempts for AM High availability")
    }
    val localResources: HashMap[String, LocalResource] = HashMap[String, LocalResource]()
    localResources += "__package" -> packageResource


    // include the resources from the universal resource configurations
    try {
      val resourceMapper = new LocalizerResourceMapper(new LocalizerResourceConfig(config), new YarnConfiguration(conf))
      localResources ++= resourceMapper.getResourceMap.asScala
    } catch {
      case e: LocalizerResourceException => {
        throw new SamzaException("Exception during resource mapping from config. ", e)
      }
    }

    if (UserGroupInformation.isSecurityEnabled) {
      validateJobConfig(config)

      setupSecurityToken(fs, containerCtx)
      info("set security token for %s" format appId.get)

      val acls = yarnConfig.getYarnApplicationAcls
      if (!acls.isEmpty) {
        containerCtx.setApplicationACLs(acls)
      }

      val amLocalResources = setupAMLocalResources(fs, Option(yarnConfig.getYarnKerberosPrincipal), Option(yarnConfig.getYarnKerberosKeytab))
      localResources ++= amLocalResources

      val securityYarnConfig  = getSecurityYarnConfig
      val coordinatorStreamWriter: CoordinatorStreamWriter = new CoordinatorStreamWriter(config)
      coordinatorStreamWriter.start()

      securityYarnConfig.foreach {
        case (key: String, value: String) =>
          coordinatorStreamWriter.sendMessage(SetConfig.TYPE, key, value)
      }
      coordinatorStreamWriter.stop()
    }

    // prepare all local resources for localizer
    info("localResources is: %s" format localResources)
    containerCtx.setLocalResources(localResources.asJava)
    info("set local resources on application master for %s" format appId.get)

    env match {
      case Some(env) => {
        containerCtx.setEnvironment(env.asJava)
        info("set environment variables to %s for %s" format (env, appId.get))
      }
      case None =>
    }

    appCtx.setAMContainerSpec(containerCtx)
    appCtx.setApplicationType(yarnConfig.getYarnApplicationType)
    info("submitting application request for %s" format appId.get)
    yarnClient.submitApplication(appCtx)
    appId
  }