in modules/yarn-ext/src/main/java/org/apache/ignite/yarn/IgniteYarnClient.java [62:172]
public static void main(String[] args) throws Exception {
checkArguments(args);
// Set path to app master jar.
String pathAppMasterJar = args[0];
ClusterProperties props = ClusterProperties.from(args.length == 2 ? args[1] : null);
YarnConfiguration conf = new YarnConfiguration();
YarnClient yarnClient = YarnClient.createYarnClient();
yarnClient.init(conf);
yarnClient.start();
// Create application via yarnClient
YarnClientApplication app = yarnClient.createApplication();
FileSystem fs = FileSystem.get(conf);
Path ignite;
// Load ignite and jar
if (props.ignitePath() == null)
ignite = getIgnite(props, fs);
else
ignite = new Path(props.ignitePath());
// Upload the jar file to HDFS.
Path appJar = IgniteYarnUtils.copyLocalToHdfs(fs, pathAppMasterJar,
props.igniteWorkDir() + File.separator + IgniteYarnUtils.JAR_NAME);
// Set up the container launch context for the application master
ContainerLaunchContext amContainer = Records.newRecord(ContainerLaunchContext.class);
amContainer.setCommands(
Collections.singletonList(
Environment.JAVA_HOME.$() + "/bin/java -Xmx512m " + ApplicationMaster.class.getName()
+ IgniteYarnUtils.SPACE + ignite.toUri()
+ IgniteYarnUtils.YARN_LOG_OUT
)
);
// Setup jar for ApplicationMaster
LocalResource appMasterJar = IgniteYarnUtils.setupFile(appJar, fs, LocalResourceType.FILE);
amContainer.setLocalResources(Collections.singletonMap(IgniteYarnUtils.JAR_NAME, appMasterJar));
// Setup CLASSPATH for ApplicationMaster
Map<String, String> appMasterEnv = props.toEnvs();
setupAppMasterEnv(appMasterEnv, conf);
amContainer.setEnvironment(appMasterEnv);
// Setup security tokens
if (UserGroupInformation.isSecurityEnabled()) {
Credentials creds = new Credentials();
String tokRenewer = conf.get(YarnConfiguration.RM_PRINCIPAL);
if (tokRenewer == null || tokRenewer.isEmpty())
throw new IOException("Master Kerberos principal for the RM is not set.");
log.info("Found RM principal: " + tokRenewer);
final Token<?> tokens[] = fs.addDelegationTokens(tokRenewer, creds);
if (tokens != null)
log.info("File system delegation tokens: " + Arrays.toString(tokens));
amContainer.setTokens(IgniteYarnUtils.createTokenBuffer(creds));
}
// Set up resource type requirements for ApplicationMaster
Resource capability = Records.newRecord(Resource.class);
capability.setMemory(512);
capability.setVirtualCores(1);
// Finally, set-up ApplicationSubmissionContext for the application
ApplicationSubmissionContext appCtx = app.getApplicationSubmissionContext();
appCtx.setApplicationName("ignition"); // application name
appCtx.setAMContainerSpec(amContainer);
appCtx.setResource(capability);
appCtx.setQueue(props.yarnQueue()); // queue
// Submit application
ApplicationId appId = appCtx.getApplicationId();
yarnClient.submitApplication(appCtx);
log.log(Level.INFO, "Submitted application. Application id: {0}", appId);
ApplicationReport appReport = yarnClient.getApplicationReport(appId);
YarnApplicationState appState = appReport.getYarnApplicationState();
while (appState == YarnApplicationState.NEW ||
appState == YarnApplicationState.NEW_SAVING ||
appState == YarnApplicationState.SUBMITTED ||
appState == YarnApplicationState.ACCEPTED) {
TimeUnit.SECONDS.sleep(1L);
appReport = yarnClient.getApplicationReport(appId);
if (appState != YarnApplicationState.ACCEPTED
&& appReport.getYarnApplicationState() == YarnApplicationState.ACCEPTED)
log.log(Level.INFO, "Application {0} is ACCEPTED.", appId);
appState = appReport.getYarnApplicationState();
}
log.log(Level.INFO, "Application {0} is {1}.", new Object[]{appId, appState});
}