def prepareLocalResources()

in resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala [526:839]


  def prepareLocalResources(
      destDir: Path,
      pySparkArchives: Seq[String]): HashMap[String, LocalResource] = {
    logInfo("Preparing resources for our AM container")
    // Upload Spark and the application JAR to the remote file system if necessary,
    // and add them as local resources to the application master.
    val fs = destDir.getFileSystem(hadoopConf)

    // Used to keep track of URIs added to the distributed cache. If the same URI is added
    // multiple times, YARN will fail to launch containers for the app with an internal
    // error.
    val distributedUris = new HashSet[String]
    // Used to keep track of URIs(files) added to the distribute cache have the same name. If
    // same name but different path files are added multiple time, YARN will fail to launch
    // containers for the app with an internal error.
    val distributedNames = new HashSet[String]

    val replication = sparkConf.get(STAGING_FILE_REPLICATION).map(_.toShort)
    val localResources = HashMap[String, LocalResource]()
    FileSystem.mkdirs(fs, destDir, new FsPermission(STAGING_DIR_PERMISSION))

    // If preload is enabled, preload the statCache with the files in the directories
    val statCache = if (statCachePreloadEnabled) {
      // Consider only following configurations, as they involve the distribution of multiple files
      val files = sparkConf.get(SPARK_JARS).getOrElse(Nil) ++ sparkConf.get(JARS_TO_DISTRIBUTE) ++
        sparkConf.get(FILES_TO_DISTRIBUTE) ++ sparkConf.get(ARCHIVES_TO_DISTRIBUTE) ++
        sparkConf.get(PY_FILES) ++ pySparkArchives

      getPreloadedStatCache(files)
    } else {
      HashMap[URI, FileStatus]()
    }
    val symlinkCache: Map[URI, Path] = HashMap[URI, Path]()

    def addDistributedUri(uri: URI): Boolean = {
      val uriStr = uri.toString()
      val fileName = new File(uri.getPath).getName
      if (distributedUris.contains(uriStr)) {
        logWarning(log"Same path resource ${MDC(LogKeys.URI, uri)} added multiple times " +
          log"to distributed cache.")
        false
      } else if (distributedNames.contains(fileName)) {
        logWarning(log"Same name resource ${MDC(LogKeys.URI, uri)} added multiple times " +
          log"to distributed cache")
        false
      } else {
        distributedUris += uriStr
        distributedNames += fileName
        true
      }
    }

    /*
     * Distribute a file to the cluster.
     *
     * If the file's path is a "local:" URI, it's actually not distributed. Other files are copied
     * to HDFS (if not already there) and added to the application's distributed cache.
     *
     * @param path URI of the file to distribute.
     * @param resType Type of resource being distributed.
     * @param destName Name of the file in the distributed cache.
     * @param targetDir Subdirectory where to place the file.
     * @param appMasterOnly Whether to distribute only to the AM.
     * @return A 2-tuple. First item is whether the file is a "local:" URI. Second item is the
     *         localized path for non-local paths, or the input `path` for local paths.
     *         The localized path will be null if the URI has already been added to the cache.
     */
    def distribute(
        path: String,
        resType: LocalResourceType = LocalResourceType.FILE,
        destName: Option[String] = None,
        targetDir: Option[String] = None,
        appMasterOnly: Boolean = false): (Boolean, String) = {
      val trimmedPath = path.trim()
      val localURI = Utils.resolveURI(trimmedPath)
      if (localURI.getScheme != Utils.LOCAL_SCHEME) {
        if (addDistributedUri(localURI)) {
          val localPath = getQualifiedLocalPath(localURI, hadoopConf)
          val linkname = targetDir.map(_ + "/").getOrElse("") +
            destName.orElse(Option(localURI.getFragment())).getOrElse(localPath.getName())
          val destPath = copyFileToRemote(destDir, localPath, replication, symlinkCache)
          val destFs = FileSystem.get(destPath.toUri(), hadoopConf)
          distCacheMgr.addResource(
            destFs, hadoopConf, destPath, localResources, resType, linkname, statCache,
            appMasterOnly = appMasterOnly)
          (false, linkname)
        } else {
          (false, null)
        }
      } else {
        (true, trimmedPath)
      }
    }

    // If we passed in a keytab, make sure we copy the keytab to the staging directory on
    // HDFS, and setup the relevant environment vars, so the AM can login again.
    amKeytabFileName.foreach { kt =>
      logInfo("To enable the AM to login from keytab, credentials are being copied over to the AM" +
        " via the YARN Secure Distributed Cache.")
      val (_, localizedPath) = distribute(keytab,
        destName = Some(kt),
        appMasterOnly = true)
      require(localizedPath != null, "Keytab file already distributed.")
    }

    // If we passed in a ivySettings file, make sure we copy the file to the distributed cache
    // in cluster mode so that the driver can access it
    val ivySettings = sparkConf.getOption("spark.jars.ivySettings")
    val ivySettingsLocalizedPath: Option[String] = ivySettings match {
      case Some(ivySettingsPath) if isClusterMode =>
        val uri = new URI(ivySettingsPath)
        Option(uri.getScheme).getOrElse("file") match {
          case "file" =>
            val ivySettingsFile = new File(uri.getPath)
            require(ivySettingsFile.exists(), s"Ivy settings file $ivySettingsFile not found")
            require(ivySettingsFile.isFile(), s"Ivy settings file $ivySettingsFile is not a" +
              "normal file")
            // Generate a file name that can be used for the ivySettings file, that does not
            // conflict with any user file.
            val localizedFileName = Some(ivySettingsFile.getName() + "-" +
              UUID.randomUUID().toString)
            val (_, localizedPath) = distribute(ivySettingsPath, destName = localizedFileName)
            require(localizedPath != null, "IvySettings file already distributed.")
            Some(localizedPath)
          case scheme =>
            throw new IllegalArgumentException(s"Scheme $scheme not supported in " +
              "spark.jars.ivySettings")
        }
      case _ => None
    }

    /**
     * Add Spark to the cache. There are two settings that control what files to add to the cache:
     * - if a Spark archive is defined, use the archive. The archive is expected to contain
     *   jar files at its root directory.
     * - if a list of jars is provided, filter the non-local ones, resolve globs, and
     *   add the found files to the cache.
     *
     * Note that the archive cannot be a "local" URI. If none of the above settings are found,
     * then upload all files found in $SPARK_HOME/jars.
     */
    val sparkArchive = sparkConf.get(SPARK_ARCHIVE)
    if (sparkArchive.isDefined) {
      val archive = sparkArchive.get
      require(!Utils.isLocalUri(archive), s"${SPARK_ARCHIVE.key} cannot be a local URI.")
      distribute(Utils.resolveURI(archive).toString,
        resType = LocalResourceType.ARCHIVE,
        destName = Some(LOCALIZED_LIB_DIR))
    } else {
      sparkConf.get(SPARK_JARS) match {
        case Some(jars) =>
          // Break the list of jars to upload, and resolve globs.
          val localJars = new ArrayBuffer[String]()
          jars.foreach { jar =>
            if (!Utils.isLocalUri(jar)) {
              val path = getQualifiedLocalPath(Utils.resolveURI(jar), hadoopConf)
              val pathFs = FileSystem.get(path.toUri(), hadoopConf)
              if (statCache.contains(path.toUri)) {
                distribute(path.toUri.toString, targetDir = Some(LOCALIZED_LIB_DIR))
              } else {
                val fss = pathFs.globStatus(path)
                if (fss == null) {
                  throw new FileNotFoundException(s"Path ${path.toString} does not exist")
                }
                fss.filter(_.isFile()).foreach { entry =>
                  val uri = entry.getPath().toUri()
                  statCache.update(uri, entry)
                  distribute(uri.toString(), targetDir = Some(LOCALIZED_LIB_DIR))
                }
              }
            } else {
              localJars += jar
            }
          }

          // Propagate the local URIs to the containers using the configuration.
          sparkConf.set(SPARK_JARS, localJars.toSeq)

        case None =>
          // No configuration, so fall back to uploading local jar files.
          logWarning(
            log"Neither ${MDC(LogKeys.CONFIG, SPARK_JARS.key)} nor " +
              log"${MDC(LogKeys.CONFIG2, SPARK_ARCHIVE.key)}} is set, falling back to uploading " +
              log"libraries under SPARK_HOME.")
          val jarsDir = new File(YarnCommandBuilderUtils.findJarsDir(
            sparkConf.getenv("SPARK_HOME")))
          val jarsArchive = File.createTempFile(LOCALIZED_LIB_DIR, ".zip",
            new File(Utils.getLocalDir(sparkConf)))
          val jarsStream = new ZipOutputStream(new FileOutputStream(jarsArchive))

          try {
            jarsStream.setLevel(0)
            jarsDir.listFiles().foreach { f =>
              if (f.isFile && f.getName.toLowerCase(Locale.ROOT).endsWith(".jar") && f.canRead) {
                jarsStream.putNextEntry(new ZipEntry(f.getName))
                Files.copy(f.toPath, jarsStream)
                jarsStream.closeEntry()
              }
            }
          } finally {
            jarsStream.close()
          }

          distribute(jarsArchive.toURI.getPath,
            resType = LocalResourceType.ARCHIVE,
            destName = Some(LOCALIZED_LIB_DIR))
          jarsArchive.delete()
      }
    }

    /**
     * Copy user jar to the distributed cache if their scheme is not "local".
     * Otherwise, set the corresponding key in our SparkConf to handle it downstream.
     */
    Option(args.userJar).filter(_.trim.nonEmpty).foreach { jar =>
      val (isLocal, localizedPath) = distribute(jar, destName = Some(APP_JAR_NAME))
      if (isLocal) {
        require(localizedPath != null, s"Path $jar already distributed")
        // If the resource is intended for local use only, handle this downstream
        // by setting the appropriate property
        sparkConf.set(APP_JAR, localizedPath)
      }
    }

    /**
     * Do the same for any additional resources passed in through ClientArguments.
     * Each resource category is represented by a 3-tuple of:
     *   (1) comma separated list of resources in this category,
     *   (2) resource type, and
     *   (3) whether to add these resources to the classpath
     */
    val cachedSecondaryJarLinks = ListBuffer.empty[String]
    List(
      (sparkConf.get(JARS_TO_DISTRIBUTE), LocalResourceType.FILE, true),
      (sparkConf.get(FILES_TO_DISTRIBUTE), LocalResourceType.FILE, false),
      (sparkConf.get(ARCHIVES_TO_DISTRIBUTE), LocalResourceType.ARCHIVE, false)
    ).foreach { case (flist, resType, addToClasspath) =>
      flist.foreach { file =>
        val (_, localizedPath) = distribute(file, resType = resType)
        // If addToClassPath, we ignore adding jar multiple times to distributed cache.
        if (addToClasspath) {
          if (localizedPath != null) {
            cachedSecondaryJarLinks += localizedPath
          }
        } else {
          if (localizedPath == null) {
            throw new IllegalArgumentException(s"Attempt to add ($file) multiple times" +
              " to the distributed cache.")
          }
        }
      }
    }
    if (cachedSecondaryJarLinks.nonEmpty) {
      sparkConf.set(SECONDARY_JARS, cachedSecondaryJarLinks.toSeq)
    }

    if (isClusterMode && args.primaryPyFile != null) {
      distribute(args.primaryPyFile, appMasterOnly = true)
    }

    pySparkArchives.foreach { f =>
      val uri = Utils.resolveURI(f)
      if (uri.getScheme != Utils.LOCAL_SCHEME) {
        distribute(f)
      }
    }

    // The python files list needs to be treated especially. All files that are not an
    // archive need to be placed in a subdirectory that will be added to PYTHONPATH.
    sparkConf.get(PY_FILES).foreach { f =>
      val targetDir = if (f.endsWith(".py")) Some(LOCALIZED_PYTHON_DIR) else None
      distribute(f, targetDir = targetDir)
    }

    // Update the configuration with all the distributed files, minus the conf archive. The
    // conf archive will be handled by the AM differently so that we avoid having to send
    // this configuration by other means. See SPARK-14602 for one reason of why this is needed.
    distCacheMgr.updateConfiguration(cachedResourcesConf)

    // Upload the conf archive to HDFS manually, and record its location in the configuration.
    // This will allow the AM to know where the conf archive is in HDFS, so that it can be
    // distributed to the containers.
    //
    // This code forces the archive to be copied, so that unit tests pass (since in that case both
    // file systems are the same and the archive wouldn't normally be copied). In most (all?)
    // deployments, the archive would be copied anyway, since it's a temp file in the local file
    // system.
    val remoteConfArchivePath = new Path(destDir, LOCALIZED_CONF_ARCHIVE)
    val remoteFs = FileSystem.get(remoteConfArchivePath.toUri(), hadoopConf)
    cachedResourcesConf.set(CACHED_CONF_ARCHIVE, remoteConfArchivePath.toString())

    val confsToOverride = Map.empty[String, String]
    // If propagating the keytab to the AM, override the keytab name with the name of the
    // distributed file.
    amKeytabFileName.foreach { kt => confsToOverride.put(KEYTAB.key, kt) }

    // If propagating the ivySettings file to the distributed cache, override the ivySettings
    // file name with the name of the distributed file.
    ivySettingsLocalizedPath.foreach { path =>
      confsToOverride.put("spark.jars.ivySettings", path)
    }

    val localConfArchive = new Path(createConfArchive(confsToOverride).toURI())
    copyFileToRemote(destDir, localConfArchive, replication, symlinkCache, force = true,
      destName = Some(LOCALIZED_CONF_ARCHIVE))

    // Manually add the config archive to the cache manager so that the AM is launched with
    // the proper files set up.
    distCacheMgr.addResource(
      remoteFs, hadoopConf, remoteConfArchivePath, localResources, LocalResourceType.ARCHIVE,
      LOCALIZED_CONF_DIR, statCache, appMasterOnly = false)

    localResources
  }