private ApplicationReport startAppMaster()

in flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java [833:1310]


    private ApplicationReport startAppMaster(
            Configuration configuration,
            String applicationName,
            String yarnClusterEntrypoint,
            JobGraph jobGraph,
            YarnClient yarnClient,
            YarnClientApplication yarnApplication,
            ClusterSpecification clusterSpecification)
            throws Exception {

        // ------------------ Initialize the file systems -------------------------

        org.apache.flink.core.fs.FileSystem.initialize(
                configuration, PluginUtils.createPluginManagerFromRootFolder(configuration));

        final FileSystem fs = FileSystem.get(yarnConfiguration);

        // hard coded check for the GoogleHDFS client because its not overriding the getScheme()
        // method.
        if (!fs.getClass().getSimpleName().equals("GoogleHadoopFileSystem")
                && fs.getScheme().startsWith("file")) {
            LOG.warn(
                    "The file system scheme is '"
                            + fs.getScheme()
                            + "'. This indicates that the "
                            + "specified Hadoop configuration path is wrong and the system is using the default Hadoop configuration values."
                            + "The Flink YARN client needs to store its files in a distributed file system");
        }

        ApplicationSubmissionContext appContext = yarnApplication.getApplicationSubmissionContext();

        final List<Path> providedLibDirs =
                Utils.getQualifiedRemoteProvidedLibDirs(configuration, yarnConfiguration);

        final Optional<Path> providedUsrLibDir =
                Utils.getQualifiedRemoteProvidedUsrLib(configuration, yarnConfiguration);

        Path stagingDirPath = getStagingDir(fs);
        FileSystem stagingDirFs = stagingDirPath.getFileSystem(yarnConfiguration);
        final YarnApplicationFileUploader fileUploader =
                YarnApplicationFileUploader.from(
                        stagingDirFs,
                        stagingDirPath,
                        providedLibDirs,
                        appContext.getApplicationId(),
                        getFileReplication());

        // The files need to be shipped and added to classpath.
        Set<Path> systemShipFiles = new HashSet<>(shipFiles);

        final String logConfigFilePath =
                configuration.get(YarnConfigOptionsInternal.APPLICATION_LOG_CONFIG_FILE);
        if (logConfigFilePath != null) {
            systemShipFiles.add(getPathFromLocalFilePathStr(logConfigFilePath));
        }

        // Set-up ApplicationSubmissionContext for the application

        final ApplicationId appId = appContext.getApplicationId();

        // ------------------ Add Zookeeper namespace to local flinkConfiguration ------
        setHAClusterIdIfNotSet(configuration, appId);

        if (HighAvailabilityMode.isHighAvailabilityModeActivated(configuration)) {
            // activate re-execution of failed applications
            appContext.setMaxAppAttempts(
                    configuration.get(
                            YarnConfigOptions.APPLICATION_ATTEMPTS,
                            YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS));

            activateHighAvailabilitySupport(appContext);
        } else {
            // set number of application retries to 1 in the default case
            appContext.setMaxAppAttempts(
                    configuration.get(YarnConfigOptions.APPLICATION_ATTEMPTS, 1));
        }

        final Set<Path> userJarFiles = new HashSet<>();
        if (jobGraph != null) {
            userJarFiles.addAll(
                    jobGraph.getUserJars().stream()
                            .map(f -> f.toUri())
                            .map(Path::new)
                            .collect(Collectors.toSet()));
        }

        final List<URI> jarUrls =
                ConfigUtils.decodeListFromConfig(configuration, PipelineOptions.JARS, URI::create);
        if (jarUrls != null
                && YarnApplicationClusterEntryPoint.class.getName().equals(yarnClusterEntrypoint)) {
            userJarFiles.addAll(jarUrls.stream().map(Path::new).collect(Collectors.toSet()));
        }

        if (providedLibDirs == null || providedLibDirs.isEmpty()) {
            addLibFoldersToShipFiles(systemShipFiles);
        }

        // Register all files in provided lib dirs as local resources with public visibility
        // and upload the remaining dependencies as local resources with APPLICATION visibility.
        final List<String> systemClassPaths = fileUploader.registerProvidedLocalResources();
        final List<String> uploadedDependencies =
                fileUploader.registerMultipleLocalResources(
                        systemShipFiles, Path.CUR_DIR, LocalResourceType.FILE);
        systemClassPaths.addAll(uploadedDependencies);

        // upload and register ship-only files
        // Plugin files only need to be shipped and should not be added to classpath.
        if (providedLibDirs == null || providedLibDirs.isEmpty()) {
            Set<Path> shipOnlyFiles = new HashSet<>();
            addPluginsFoldersToShipFiles(shipOnlyFiles);
            fileUploader.registerMultipleLocalResources(
                    shipOnlyFiles, Path.CUR_DIR, LocalResourceType.FILE);
        }

        if (!shipArchives.isEmpty()) {
            fileUploader.registerMultipleLocalResources(
                    shipArchives, Path.CUR_DIR, LocalResourceType.ARCHIVE);
        }

        // only for application mode
        if (YarnApplicationClusterEntryPoint.class.getName().equals(yarnClusterEntrypoint)) {
            // Python jar/Sql Gateway jar only need to be shipped and should not be added to
            // classpath.
            if (PackagedProgramUtils.isPython(configuration.get(APPLICATION_MAIN_CLASS))) {
                fileUploader.registerMultipleLocalResources(
                        Collections.singletonList(
                                new Path(PackagedProgramUtils.getPythonJar().toURI())),
                        ConfigConstants.DEFAULT_FLINK_OPT_DIR,
                        LocalResourceType.FILE);
            } else if (PackagedProgramUtils.isSqlApplication(
                    configuration.get(APPLICATION_MAIN_CLASS))) {
                fileUploader.registerMultipleLocalResources(
                        Collections.singletonList(
                                new Path(PackagedProgramUtils.getSqlGatewayJar().toURI())),
                        ConfigConstants.DEFAULT_FLINK_OPT_DIR,
                        LocalResourceType.FILE);
            }
        }

        // Upload and register user jars
        final List<String> userClassPaths =
                fileUploader.registerMultipleLocalResources(
                        userJarFiles,
                        userJarInclusion == YarnConfigOptions.UserJarInclusion.DISABLED
                                ? ConfigConstants.DEFAULT_FLINK_USR_LIB_DIR
                                : Path.CUR_DIR,
                        LocalResourceType.FILE);

        // usrlib in remote will be used first.
        if (providedUsrLibDir.isPresent()) {
            final List<String> usrLibClassPaths =
                    fileUploader.registerMultipleLocalResources(
                            Collections.singletonList(providedUsrLibDir.get()),
                            Path.CUR_DIR,
                            LocalResourceType.FILE);
            userClassPaths.addAll(usrLibClassPaths);
        } else if (ClusterEntrypointUtils.tryFindUserLibDirectory().isPresent()) {
            // local usrlib will be automatically shipped if it exists and there is no remote
            // usrlib.
            final Set<File> usrLibShipFiles = new HashSet<>();
            addUsrLibFolderToShipFiles(usrLibShipFiles);
            final List<String> usrLibClassPaths =
                    fileUploader.registerMultipleLocalResources(
                            usrLibShipFiles.stream()
                                    .map(e -> new Path(e.toURI()))
                                    .collect(Collectors.toSet()),
                            Path.CUR_DIR,
                            LocalResourceType.FILE);
            userClassPaths.addAll(usrLibClassPaths);
        }

        if (userJarInclusion == YarnConfigOptions.UserJarInclusion.ORDER) {
            systemClassPaths.addAll(userClassPaths);
        }

        // normalize classpath by sorting
        Collections.sort(systemClassPaths);
        Collections.sort(userClassPaths);

        // classpath assembler
        StringBuilder classPathBuilder = new StringBuilder();
        if (userJarInclusion == YarnConfigOptions.UserJarInclusion.FIRST) {
            for (String userClassPath : userClassPaths) {
                classPathBuilder.append(userClassPath).append(File.pathSeparator);
            }
        }
        for (String classPath : systemClassPaths) {
            classPathBuilder.append(classPath).append(File.pathSeparator);
        }

        // Setup jar for ApplicationMaster
        final YarnLocalResourceDescriptor localResourceDescFlinkJar =
                fileUploader.uploadFlinkDist(flinkJarPath);
        classPathBuilder
                .append(localResourceDescFlinkJar.getResourceKey())
                .append(File.pathSeparator);

        // write job graph to tmp file and add it to local resource
        // TODO: server use user main method to generate job graph
        if (jobGraph != null) {
            File tmpJobGraphFile = null;
            try {
                tmpJobGraphFile = File.createTempFile(appId.toString(), null);
                try (FileOutputStream output = new FileOutputStream(tmpJobGraphFile);
                        ObjectOutputStream obOutput = new ObjectOutputStream(output)) {
                    obOutput.writeObject(jobGraph);
                }

                final String jobGraphFilename = "job.graph";
                configuration.set(JOB_GRAPH_FILE_PATH, jobGraphFilename);

                fileUploader.registerSingleLocalResource(
                        jobGraphFilename,
                        new Path(tmpJobGraphFile.toURI()),
                        "",
                        LocalResourceType.FILE,
                        true,
                        false);
                classPathBuilder.append(jobGraphFilename).append(File.pathSeparator);
            } catch (Exception e) {
                LOG.warn("Add job graph to local resource fail.");
                throw e;
            } finally {
                if (tmpJobGraphFile != null && !tmpJobGraphFile.delete()) {
                    LOG.warn("Fail to delete temporary file {}.", tmpJobGraphFile.toPath());
                }
            }
        }

        // Upload the flink configuration
        // write out configuration file
        File tmpConfigurationFile = null;
        try {
            String flinkConfigFileName = GlobalConfiguration.getFlinkConfFilename();
            tmpConfigurationFile = File.createTempFile(appId + "-" + flinkConfigFileName, null);

            // remove localhost bind hosts as they render production clusters unusable
            removeLocalhostBindHostSetting(configuration, JobManagerOptions.BIND_HOST);
            removeLocalhostBindHostSetting(configuration, TaskManagerOptions.BIND_HOST);
            // this setting is unconditionally overridden anyway, so we remove it for clarity
            configuration.removeConfig(TaskManagerOptions.HOST);

            BootstrapTools.writeConfiguration(configuration, tmpConfigurationFile);

            fileUploader.registerSingleLocalResource(
                    flinkConfigFileName,
                    new Path(tmpConfigurationFile.toURI()),
                    "",
                    LocalResourceType.FILE,
                    true,
                    true);
            classPathBuilder.append(flinkConfigFileName).append(File.pathSeparator);
        } finally {
            if (tmpConfigurationFile != null && !tmpConfigurationFile.delete()) {
                LOG.warn("Fail to delete temporary file {}.", tmpConfigurationFile.toPath());
            }
        }

        if (userJarInclusion == YarnConfigOptions.UserJarInclusion.LAST) {
            for (String userClassPath : userClassPaths) {
                classPathBuilder.append(userClassPath).append(File.pathSeparator);
            }
        }

        // To support Yarn Secure Integration Test Scenario
        // In Integration test setup, the Yarn containers created by YarnMiniCluster does not have
        // the Yarn site XML
        // and KRB5 configuration files. We are adding these files as container local resources for
        // the container
        // applications (JM/TMs) to have proper secure cluster setup
        Path remoteYarnSiteXmlPath = null;
        if (System.getenv("IN_TESTS") != null) {
            File f = new File(System.getenv("YARN_CONF_DIR"), Utils.YARN_SITE_FILE_NAME);
            LOG.info(
                    "Adding Yarn configuration {} to the AM container local resource bucket",
                    f.getAbsolutePath());
            Path yarnSitePath = new Path(f.getAbsolutePath());
            remoteYarnSiteXmlPath =
                    fileUploader
                            .registerSingleLocalResource(
                                    Utils.YARN_SITE_FILE_NAME,
                                    yarnSitePath,
                                    "",
                                    LocalResourceType.FILE,
                                    false,
                                    false)
                            .getPath();
            if (System.getProperty("java.security.krb5.conf") != null) {
                configuration.set(
                        SecurityOptions.KERBEROS_KRB5_PATH,
                        System.getProperty("java.security.krb5.conf"));
            }
        }

        Path remoteKrb5Path = null;
        boolean hasKrb5 = false;
        String krb5Config = configuration.get(SecurityOptions.KERBEROS_KRB5_PATH);
        if (!StringUtils.isNullOrWhitespaceOnly(krb5Config)) {
            final File krb5 = new File(krb5Config);
            LOG.info(
                    "Adding KRB5 configuration {} to the AM container local resource bucket",
                    krb5.getAbsolutePath());
            final Path krb5ConfPath = new Path(krb5.getAbsolutePath());
            remoteKrb5Path =
                    fileUploader
                            .registerSingleLocalResource(
                                    Utils.KRB5_FILE_NAME,
                                    krb5ConfPath,
                                    "",
                                    LocalResourceType.FILE,
                                    false,
                                    false)
                            .getPath();
            hasKrb5 = true;
        }

        Path remotePathKeytab = null;
        String localizedKeytabPath = null;
        String keytab = configuration.get(SecurityOptions.KERBEROS_LOGIN_KEYTAB);
        if (keytab != null) {
            boolean localizeKeytab = flinkConfiguration.get(YarnConfigOptions.SHIP_LOCAL_KEYTAB);
            localizedKeytabPath = flinkConfiguration.get(YarnConfigOptions.LOCALIZED_KEYTAB_PATH);
            if (localizeKeytab) {
                // Localize the keytab to YARN containers via local resource.
                LOG.info("Adding keytab {} to the AM container local resource bucket", keytab);
                remotePathKeytab =
                        fileUploader
                                .registerSingleLocalResource(
                                        localizedKeytabPath,
                                        new Path(keytab),
                                        "",
                                        LocalResourceType.FILE,
                                        false,
                                        false)
                                .getPath();
            } else {
                // // Assume Keytab is pre-installed in the container.
                localizedKeytabPath =
                        flinkConfiguration.get(YarnConfigOptions.LOCALIZED_KEYTAB_PATH);
            }
        }

        final JobManagerProcessSpec processSpec =
                JobManagerProcessUtils.processSpecFromConfig(flinkConfiguration);
        final ContainerLaunchContext amContainer =
                setupApplicationMasterContainer(yarnClusterEntrypoint, hasKrb5, processSpec);

        boolean fetchToken = configuration.get(SecurityOptions.DELEGATION_TOKENS_ENABLED);
        KerberosLoginProvider kerberosLoginProvider = new KerberosLoginProvider(configuration);
        if (kerberosLoginProvider.isLoginPossible(true)) {
            setTokensFor(amContainer, fetchToken);
        } else {
            LOG.info(
                    "Cannot use kerberos delegation token manager, no valid kerberos credentials provided.");
        }

        amContainer.setLocalResources(fileUploader.getRegisteredLocalResources());
        fileUploader.close();

        Utils.setAclsFor(amContainer, flinkConfiguration);

        // Setup CLASSPATH and environment variables for ApplicationMaster
        final Map<String, String> appMasterEnv =
                generateApplicationMasterEnv(
                        fileUploader,
                        classPathBuilder.toString(),
                        localResourceDescFlinkJar.toString(),
                        appId.toString());

        if (localizedKeytabPath != null) {
            appMasterEnv.put(YarnConfigKeys.LOCAL_KEYTAB_PATH, localizedKeytabPath);
            String principal = configuration.get(SecurityOptions.KERBEROS_LOGIN_PRINCIPAL);
            appMasterEnv.put(YarnConfigKeys.KEYTAB_PRINCIPAL, principal);
            if (remotePathKeytab != null) {
                appMasterEnv.put(YarnConfigKeys.REMOTE_KEYTAB_PATH, remotePathKeytab.toString());
            }
        }

        // To support Yarn Secure Integration Test Scenario
        if (remoteYarnSiteXmlPath != null) {
            appMasterEnv.put(
                    YarnConfigKeys.ENV_YARN_SITE_XML_PATH, remoteYarnSiteXmlPath.toString());
        }
        if (remoteKrb5Path != null) {
            appMasterEnv.put(YarnConfigKeys.ENV_KRB5_PATH, remoteKrb5Path.toString());
        }

        amContainer.setEnvironment(appMasterEnv);

        // Set up resource type requirements for ApplicationMaster
        Resource capability = Records.newRecord(Resource.class);
        capability.setMemorySize(clusterSpecification.getMasterMemoryMB());
        capability.setVirtualCores(flinkConfiguration.get(YarnConfigOptions.APP_MASTER_VCORES));

        final String customApplicationName = customName != null ? customName : applicationName;

        appContext.setApplicationName(customApplicationName);
        appContext.setApplicationType(applicationType != null ? applicationType : "Apache Flink");
        appContext.setAMContainerSpec(amContainer);
        appContext.setResource(capability);

        // Set priority for application
        int priorityNum = flinkConfiguration.get(YarnConfigOptions.APPLICATION_PRIORITY);
        if (priorityNum >= 0) {
            Priority priority = Priority.newInstance(priorityNum);
            appContext.setPriority(priority);
        }

        if (yarnQueue != null) {
            appContext.setQueue(yarnQueue);
        }

        setApplicationNodeLabel(appContext);

        setApplicationTags(appContext);

        setRolledLogConfigs(appContext);

        // add a hook to clean up in case deployment fails
        Thread deploymentFailureHook =
                new DeploymentFailureHook(yarnApplication, fileUploader.getApplicationDir());
        Runtime.getRuntime().addShutdownHook(deploymentFailureHook);
        LOG.info("Submitting application master " + appId);
        yarnClient.submitApplication(appContext);

        LOG.info("Waiting for the cluster to be allocated");
        final long startTime = System.currentTimeMillis();
        long lastLogTime = System.currentTimeMillis();
        ApplicationReport report;
        YarnApplicationState lastAppState = YarnApplicationState.NEW;
        loop:
        while (true) {
            try {
                report = yarnClient.getApplicationReport(appId);
            } catch (IOException e) {
                throw new YarnDeploymentException("Failed to deploy the cluster.", e);
            }
            YarnApplicationState appState = report.getYarnApplicationState();
            LOG.debug("Application State: {}", appState);
            switch (appState) {
                case FAILED:
                case KILLED:
                    throw new YarnDeploymentException(
                            "The YARN application unexpectedly switched to state "
                                    + appState
                                    + " during deployment. \n"
                                    + "Diagnostics from YARN: "
                                    + report.getDiagnostics()
                                    + "\n"
                                    + "If log aggregation is enabled on your cluster, use this command to further investigate the issue:\n"
                                    + "yarn logs -applicationId "
                                    + appId);
                // break ..
                case RUNNING:
                    LOG.info("YARN application has been deployed successfully.");
                    break loop;
                case FINISHED:
                    LOG.info("YARN application has been finished successfully.");
                    break loop;
                default:
                    if (appState != lastAppState) {
                        LOG.info("Deploying cluster, current state " + appState);
                    }
                    if (System.currentTimeMillis() - lastLogTime > 60000) {
                        lastLogTime = System.currentTimeMillis();
                        LOG.info(
                                "Deployment took more than {} seconds. Please check if the requested resources are available in the YARN cluster",
                                (lastLogTime - startTime) / 1000);
                    }
            }
            lastAppState = appState;
            Thread.sleep(250);
        }

        // since deployment was successful, remove the hook
        ShutdownHookUtil.removeShutdownHook(deploymentFailureHook, getClass().getSimpleName(), LOG);
        return report;
    }