in flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java [833:1310]
private ApplicationReport startAppMaster(
Configuration configuration,
String applicationName,
String yarnClusterEntrypoint,
JobGraph jobGraph,
YarnClient yarnClient,
YarnClientApplication yarnApplication,
ClusterSpecification clusterSpecification)
throws Exception {
// ------------------ Initialize the file systems -------------------------
org.apache.flink.core.fs.FileSystem.initialize(
configuration, PluginUtils.createPluginManagerFromRootFolder(configuration));
final FileSystem fs = FileSystem.get(yarnConfiguration);
// hard coded check for the GoogleHDFS client because its not overriding the getScheme()
// method.
if (!fs.getClass().getSimpleName().equals("GoogleHadoopFileSystem")
&& fs.getScheme().startsWith("file")) {
LOG.warn(
"The file system scheme is '"
+ fs.getScheme()
+ "'. This indicates that the "
+ "specified Hadoop configuration path is wrong and the system is using the default Hadoop configuration values."
+ "The Flink YARN client needs to store its files in a distributed file system");
}
ApplicationSubmissionContext appContext = yarnApplication.getApplicationSubmissionContext();
final List<Path> providedLibDirs =
Utils.getQualifiedRemoteProvidedLibDirs(configuration, yarnConfiguration);
final Optional<Path> providedUsrLibDir =
Utils.getQualifiedRemoteProvidedUsrLib(configuration, yarnConfiguration);
Path stagingDirPath = getStagingDir(fs);
FileSystem stagingDirFs = stagingDirPath.getFileSystem(yarnConfiguration);
final YarnApplicationFileUploader fileUploader =
YarnApplicationFileUploader.from(
stagingDirFs,
stagingDirPath,
providedLibDirs,
appContext.getApplicationId(),
getFileReplication());
// The files need to be shipped and added to classpath.
Set<Path> systemShipFiles = new HashSet<>(shipFiles);
final String logConfigFilePath =
configuration.get(YarnConfigOptionsInternal.APPLICATION_LOG_CONFIG_FILE);
if (logConfigFilePath != null) {
systemShipFiles.add(getPathFromLocalFilePathStr(logConfigFilePath));
}
// Set-up ApplicationSubmissionContext for the application
final ApplicationId appId = appContext.getApplicationId();
// ------------------ Add Zookeeper namespace to local flinkConfiguration ------
setHAClusterIdIfNotSet(configuration, appId);
if (HighAvailabilityMode.isHighAvailabilityModeActivated(configuration)) {
// activate re-execution of failed applications
appContext.setMaxAppAttempts(
configuration.get(
YarnConfigOptions.APPLICATION_ATTEMPTS,
YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS));
activateHighAvailabilitySupport(appContext);
} else {
// set number of application retries to 1 in the default case
appContext.setMaxAppAttempts(
configuration.get(YarnConfigOptions.APPLICATION_ATTEMPTS, 1));
}
final Set<Path> userJarFiles = new HashSet<>();
if (jobGraph != null) {
userJarFiles.addAll(
jobGraph.getUserJars().stream()
.map(f -> f.toUri())
.map(Path::new)
.collect(Collectors.toSet()));
}
final List<URI> jarUrls =
ConfigUtils.decodeListFromConfig(configuration, PipelineOptions.JARS, URI::create);
if (jarUrls != null
&& YarnApplicationClusterEntryPoint.class.getName().equals(yarnClusterEntrypoint)) {
userJarFiles.addAll(jarUrls.stream().map(Path::new).collect(Collectors.toSet()));
}
if (providedLibDirs == null || providedLibDirs.isEmpty()) {
addLibFoldersToShipFiles(systemShipFiles);
}
// Register all files in provided lib dirs as local resources with public visibility
// and upload the remaining dependencies as local resources with APPLICATION visibility.
final List<String> systemClassPaths = fileUploader.registerProvidedLocalResources();
final List<String> uploadedDependencies =
fileUploader.registerMultipleLocalResources(
systemShipFiles, Path.CUR_DIR, LocalResourceType.FILE);
systemClassPaths.addAll(uploadedDependencies);
// upload and register ship-only files
// Plugin files only need to be shipped and should not be added to classpath.
if (providedLibDirs == null || providedLibDirs.isEmpty()) {
Set<Path> shipOnlyFiles = new HashSet<>();
addPluginsFoldersToShipFiles(shipOnlyFiles);
fileUploader.registerMultipleLocalResources(
shipOnlyFiles, Path.CUR_DIR, LocalResourceType.FILE);
}
if (!shipArchives.isEmpty()) {
fileUploader.registerMultipleLocalResources(
shipArchives, Path.CUR_DIR, LocalResourceType.ARCHIVE);
}
// only for application mode
if (YarnApplicationClusterEntryPoint.class.getName().equals(yarnClusterEntrypoint)) {
// Python jar/Sql Gateway jar only need to be shipped and should not be added to
// classpath.
if (PackagedProgramUtils.isPython(configuration.get(APPLICATION_MAIN_CLASS))) {
fileUploader.registerMultipleLocalResources(
Collections.singletonList(
new Path(PackagedProgramUtils.getPythonJar().toURI())),
ConfigConstants.DEFAULT_FLINK_OPT_DIR,
LocalResourceType.FILE);
} else if (PackagedProgramUtils.isSqlApplication(
configuration.get(APPLICATION_MAIN_CLASS))) {
fileUploader.registerMultipleLocalResources(
Collections.singletonList(
new Path(PackagedProgramUtils.getSqlGatewayJar().toURI())),
ConfigConstants.DEFAULT_FLINK_OPT_DIR,
LocalResourceType.FILE);
}
}
// Upload and register user jars
final List<String> userClassPaths =
fileUploader.registerMultipleLocalResources(
userJarFiles,
userJarInclusion == YarnConfigOptions.UserJarInclusion.DISABLED
? ConfigConstants.DEFAULT_FLINK_USR_LIB_DIR
: Path.CUR_DIR,
LocalResourceType.FILE);
// usrlib in remote will be used first.
if (providedUsrLibDir.isPresent()) {
final List<String> usrLibClassPaths =
fileUploader.registerMultipleLocalResources(
Collections.singletonList(providedUsrLibDir.get()),
Path.CUR_DIR,
LocalResourceType.FILE);
userClassPaths.addAll(usrLibClassPaths);
} else if (ClusterEntrypointUtils.tryFindUserLibDirectory().isPresent()) {
// local usrlib will be automatically shipped if it exists and there is no remote
// usrlib.
final Set<File> usrLibShipFiles = new HashSet<>();
addUsrLibFolderToShipFiles(usrLibShipFiles);
final List<String> usrLibClassPaths =
fileUploader.registerMultipleLocalResources(
usrLibShipFiles.stream()
.map(e -> new Path(e.toURI()))
.collect(Collectors.toSet()),
Path.CUR_DIR,
LocalResourceType.FILE);
userClassPaths.addAll(usrLibClassPaths);
}
if (userJarInclusion == YarnConfigOptions.UserJarInclusion.ORDER) {
systemClassPaths.addAll(userClassPaths);
}
// normalize classpath by sorting
Collections.sort(systemClassPaths);
Collections.sort(userClassPaths);
// classpath assembler
StringBuilder classPathBuilder = new StringBuilder();
if (userJarInclusion == YarnConfigOptions.UserJarInclusion.FIRST) {
for (String userClassPath : userClassPaths) {
classPathBuilder.append(userClassPath).append(File.pathSeparator);
}
}
for (String classPath : systemClassPaths) {
classPathBuilder.append(classPath).append(File.pathSeparator);
}
// Setup jar for ApplicationMaster
final YarnLocalResourceDescriptor localResourceDescFlinkJar =
fileUploader.uploadFlinkDist(flinkJarPath);
classPathBuilder
.append(localResourceDescFlinkJar.getResourceKey())
.append(File.pathSeparator);
// write job graph to tmp file and add it to local resource
// TODO: server use user main method to generate job graph
if (jobGraph != null) {
File tmpJobGraphFile = null;
try {
tmpJobGraphFile = File.createTempFile(appId.toString(), null);
try (FileOutputStream output = new FileOutputStream(tmpJobGraphFile);
ObjectOutputStream obOutput = new ObjectOutputStream(output)) {
obOutput.writeObject(jobGraph);
}
final String jobGraphFilename = "job.graph";
configuration.set(JOB_GRAPH_FILE_PATH, jobGraphFilename);
fileUploader.registerSingleLocalResource(
jobGraphFilename,
new Path(tmpJobGraphFile.toURI()),
"",
LocalResourceType.FILE,
true,
false);
classPathBuilder.append(jobGraphFilename).append(File.pathSeparator);
} catch (Exception e) {
LOG.warn("Add job graph to local resource fail.");
throw e;
} finally {
if (tmpJobGraphFile != null && !tmpJobGraphFile.delete()) {
LOG.warn("Fail to delete temporary file {}.", tmpJobGraphFile.toPath());
}
}
}
// Upload the flink configuration
// write out configuration file
File tmpConfigurationFile = null;
try {
String flinkConfigFileName = GlobalConfiguration.getFlinkConfFilename();
tmpConfigurationFile = File.createTempFile(appId + "-" + flinkConfigFileName, null);
// remove localhost bind hosts as they render production clusters unusable
removeLocalhostBindHostSetting(configuration, JobManagerOptions.BIND_HOST);
removeLocalhostBindHostSetting(configuration, TaskManagerOptions.BIND_HOST);
// this setting is unconditionally overridden anyway, so we remove it for clarity
configuration.removeConfig(TaskManagerOptions.HOST);
BootstrapTools.writeConfiguration(configuration, tmpConfigurationFile);
fileUploader.registerSingleLocalResource(
flinkConfigFileName,
new Path(tmpConfigurationFile.toURI()),
"",
LocalResourceType.FILE,
true,
true);
classPathBuilder.append(flinkConfigFileName).append(File.pathSeparator);
} finally {
if (tmpConfigurationFile != null && !tmpConfigurationFile.delete()) {
LOG.warn("Fail to delete temporary file {}.", tmpConfigurationFile.toPath());
}
}
if (userJarInclusion == YarnConfigOptions.UserJarInclusion.LAST) {
for (String userClassPath : userClassPaths) {
classPathBuilder.append(userClassPath).append(File.pathSeparator);
}
}
// To support Yarn Secure Integration Test Scenario
// In Integration test setup, the Yarn containers created by YarnMiniCluster does not have
// the Yarn site XML
// and KRB5 configuration files. We are adding these files as container local resources for
// the container
// applications (JM/TMs) to have proper secure cluster setup
Path remoteYarnSiteXmlPath = null;
if (System.getenv("IN_TESTS") != null) {
File f = new File(System.getenv("YARN_CONF_DIR"), Utils.YARN_SITE_FILE_NAME);
LOG.info(
"Adding Yarn configuration {} to the AM container local resource bucket",
f.getAbsolutePath());
Path yarnSitePath = new Path(f.getAbsolutePath());
remoteYarnSiteXmlPath =
fileUploader
.registerSingleLocalResource(
Utils.YARN_SITE_FILE_NAME,
yarnSitePath,
"",
LocalResourceType.FILE,
false,
false)
.getPath();
if (System.getProperty("java.security.krb5.conf") != null) {
configuration.set(
SecurityOptions.KERBEROS_KRB5_PATH,
System.getProperty("java.security.krb5.conf"));
}
}
Path remoteKrb5Path = null;
boolean hasKrb5 = false;
String krb5Config = configuration.get(SecurityOptions.KERBEROS_KRB5_PATH);
if (!StringUtils.isNullOrWhitespaceOnly(krb5Config)) {
final File krb5 = new File(krb5Config);
LOG.info(
"Adding KRB5 configuration {} to the AM container local resource bucket",
krb5.getAbsolutePath());
final Path krb5ConfPath = new Path(krb5.getAbsolutePath());
remoteKrb5Path =
fileUploader
.registerSingleLocalResource(
Utils.KRB5_FILE_NAME,
krb5ConfPath,
"",
LocalResourceType.FILE,
false,
false)
.getPath();
hasKrb5 = true;
}
Path remotePathKeytab = null;
String localizedKeytabPath = null;
String keytab = configuration.get(SecurityOptions.KERBEROS_LOGIN_KEYTAB);
if (keytab != null) {
boolean localizeKeytab = flinkConfiguration.get(YarnConfigOptions.SHIP_LOCAL_KEYTAB);
localizedKeytabPath = flinkConfiguration.get(YarnConfigOptions.LOCALIZED_KEYTAB_PATH);
if (localizeKeytab) {
// Localize the keytab to YARN containers via local resource.
LOG.info("Adding keytab {} to the AM container local resource bucket", keytab);
remotePathKeytab =
fileUploader
.registerSingleLocalResource(
localizedKeytabPath,
new Path(keytab),
"",
LocalResourceType.FILE,
false,
false)
.getPath();
} else {
// // Assume Keytab is pre-installed in the container.
localizedKeytabPath =
flinkConfiguration.get(YarnConfigOptions.LOCALIZED_KEYTAB_PATH);
}
}
final JobManagerProcessSpec processSpec =
JobManagerProcessUtils.processSpecFromConfig(flinkConfiguration);
final ContainerLaunchContext amContainer =
setupApplicationMasterContainer(yarnClusterEntrypoint, hasKrb5, processSpec);
boolean fetchToken = configuration.get(SecurityOptions.DELEGATION_TOKENS_ENABLED);
KerberosLoginProvider kerberosLoginProvider = new KerberosLoginProvider(configuration);
if (kerberosLoginProvider.isLoginPossible(true)) {
setTokensFor(amContainer, fetchToken);
} else {
LOG.info(
"Cannot use kerberos delegation token manager, no valid kerberos credentials provided.");
}
amContainer.setLocalResources(fileUploader.getRegisteredLocalResources());
fileUploader.close();
Utils.setAclsFor(amContainer, flinkConfiguration);
// Setup CLASSPATH and environment variables for ApplicationMaster
final Map<String, String> appMasterEnv =
generateApplicationMasterEnv(
fileUploader,
classPathBuilder.toString(),
localResourceDescFlinkJar.toString(),
appId.toString());
if (localizedKeytabPath != null) {
appMasterEnv.put(YarnConfigKeys.LOCAL_KEYTAB_PATH, localizedKeytabPath);
String principal = configuration.get(SecurityOptions.KERBEROS_LOGIN_PRINCIPAL);
appMasterEnv.put(YarnConfigKeys.KEYTAB_PRINCIPAL, principal);
if (remotePathKeytab != null) {
appMasterEnv.put(YarnConfigKeys.REMOTE_KEYTAB_PATH, remotePathKeytab.toString());
}
}
// To support Yarn Secure Integration Test Scenario
if (remoteYarnSiteXmlPath != null) {
appMasterEnv.put(
YarnConfigKeys.ENV_YARN_SITE_XML_PATH, remoteYarnSiteXmlPath.toString());
}
if (remoteKrb5Path != null) {
appMasterEnv.put(YarnConfigKeys.ENV_KRB5_PATH, remoteKrb5Path.toString());
}
amContainer.setEnvironment(appMasterEnv);
// Set up resource type requirements for ApplicationMaster
Resource capability = Records.newRecord(Resource.class);
capability.setMemorySize(clusterSpecification.getMasterMemoryMB());
capability.setVirtualCores(flinkConfiguration.get(YarnConfigOptions.APP_MASTER_VCORES));
final String customApplicationName = customName != null ? customName : applicationName;
appContext.setApplicationName(customApplicationName);
appContext.setApplicationType(applicationType != null ? applicationType : "Apache Flink");
appContext.setAMContainerSpec(amContainer);
appContext.setResource(capability);
// Set priority for application
int priorityNum = flinkConfiguration.get(YarnConfigOptions.APPLICATION_PRIORITY);
if (priorityNum >= 0) {
Priority priority = Priority.newInstance(priorityNum);
appContext.setPriority(priority);
}
if (yarnQueue != null) {
appContext.setQueue(yarnQueue);
}
setApplicationNodeLabel(appContext);
setApplicationTags(appContext);
setRolledLogConfigs(appContext);
// add a hook to clean up in case deployment fails
Thread deploymentFailureHook =
new DeploymentFailureHook(yarnApplication, fileUploader.getApplicationDir());
Runtime.getRuntime().addShutdownHook(deploymentFailureHook);
LOG.info("Submitting application master " + appId);
yarnClient.submitApplication(appContext);
LOG.info("Waiting for the cluster to be allocated");
final long startTime = System.currentTimeMillis();
long lastLogTime = System.currentTimeMillis();
ApplicationReport report;
YarnApplicationState lastAppState = YarnApplicationState.NEW;
loop:
while (true) {
try {
report = yarnClient.getApplicationReport(appId);
} catch (IOException e) {
throw new YarnDeploymentException("Failed to deploy the cluster.", e);
}
YarnApplicationState appState = report.getYarnApplicationState();
LOG.debug("Application State: {}", appState);
switch (appState) {
case FAILED:
case KILLED:
throw new YarnDeploymentException(
"The YARN application unexpectedly switched to state "
+ appState
+ " during deployment. \n"
+ "Diagnostics from YARN: "
+ report.getDiagnostics()
+ "\n"
+ "If log aggregation is enabled on your cluster, use this command to further investigate the issue:\n"
+ "yarn logs -applicationId "
+ appId);
// break ..
case RUNNING:
LOG.info("YARN application has been deployed successfully.");
break loop;
case FINISHED:
LOG.info("YARN application has been finished successfully.");
break loop;
default:
if (appState != lastAppState) {
LOG.info("Deploying cluster, current state " + appState);
}
if (System.currentTimeMillis() - lastLogTime > 60000) {
lastLogTime = System.currentTimeMillis();
LOG.info(
"Deployment took more than {} seconds. Please check if the requested resources are available in the YARN cluster",
(lastLogTime - startTime) / 1000);
}
}
lastAppState = appState;
Thread.sleep(250);
}
// since deployment was successful, remove the hook
ShutdownHookUtil.removeShutdownHook(deploymentFailureHook, getClass().getSimpleName(), LOG);
return report;
}