in metron-analytics/metron-maas-service/src/main/java/org/apache/metron/maas/service/ApplicationMaster.java [436:539]
public void run() throws YarnException, IOException, InterruptedException {
LOG.info("Starting ApplicationMaster");
// Note: Credentials, Token, UserGroupInformation, DataOutputBuffer class
// are marked as LimitedPrivate
Credentials credentials =
UserGroupInformation.getCurrentUser().getCredentials();
allTokens = YarnUtils.INSTANCE.tokensFromCredentials(credentials);
// Create appSubmitterUgi and add original tokens to it
appSubmitterUgi = YarnUtils.INSTANCE.createUserGroup(credentials);
startTimelineClient(conf);
if(timelineClient != null) {
YarnUtils.INSTANCE.publishApplicationAttemptEvent(timelineClient, appAttemptID.toString(),
ContainerEvents.APP_ATTEMPT_START, domainId, appSubmitterUgi);
}
int minSize = getMinContainerMemoryIncrement(conf);
listener = new ContainerRequestListener(timelineClient , appSubmitterUgi , domainId, minSize);
amRMClient = AMRMClientAsync.createAMRMClientAsync(1000, listener);
amRMClient.init(conf);
amRMClient.start();
nmClientAsync = new NMClientAsyncImpl(listener);
nmClientAsync.init(conf);
nmClientAsync.start();
// Setup local RPC Server to accept status requests directly from clients
// TODO need to setup a protocol for client to be able to communicate to
// the RPC server
// TODO use the rpc port info to register with the RM for the client to
// send requests to this app master
// Register self with ResourceManager
// This will start heartbeating to the RM
appMasterHostname = NetUtils.getHostname();
RegisterApplicationMasterResponse response = amRMClient
.registerApplicationMaster(appMasterHostname, appMasterRpcPort,
appMasterTrackingUrl);
// Dump out information about cluster capability as seen by the
// resource manager
int maxMem = response.getMaximumResourceCapability().getMemory();
LOG.info("Max mem capabililty of resources in this cluster " + maxMem);
int maxVCores = response.getMaximumResourceCapability().getVirtualCores();
LOG.info("Max vcores capabililty of resources in this cluster " + maxVCores);
maasHandler = new MaaSHandler(zkQuorum, zkRoot);
try {
maasHandler.start();
maasHandler.getDiscoverer().resetState();
listener.initialize(amRMClient, nmClientAsync, maasHandler.getDiscoverer());
} catch (Exception e) {
throw new IllegalStateException("Unable to find zookeeper", e);
}
EnumMap<Resources, Integer> maxResources = Resources.toResourceMap( Resources.MEMORY.of(maxMem)
, Resources.V_CORE.of(maxVCores)
);
requestQueue = maasHandler.getConfig()
.createQueue(ImmutableMap.of(ZKQueue.ZK_CLIENT, maasHandler.getClient()
)
);
LOG.info("Ready to accept requests...");
while(true) {
ModelRequest request = requestQueue.dequeue();
if(request == null) {
LOG.error("Received a null request...");
continue;
}
LOG.info("[" + request.getAction() + "]: Received request for model " + request.getName() + ":" + request.getVersion() + "x" + request.getNumInstances()
+ " containers of size " + request.getMemory() + "M at path " + request.getPath()
);
EnumMap<Resources, Integer> resourceRequest = Resources.toResourceMap(Resources.MEMORY.of(request.getMemory())
,Resources.V_CORE.of(1)
);
EnumMap<Resources, Integer> resources = Resources.getRealisticResourceRequest( maxResources
, Resources.toResource(resourceRequest)
);
Resource resource = Resources.toResource(resources);
Path appMasterJar = getAppMasterJar();
if(request.getAction() == Action.ADD) {
listener.requestContainers(request.getNumInstances(), resource);
for (int i = 0; i < request.getNumInstances(); ++i) {
Container container = listener.getContainers(resource).take();
LOG.info("Found container id of " + container.getId().getContainerId());
executor.execute(new LaunchContainer(conf
, zkQuorum
, zkRoot
, nmClientAsync
, request
, container
, allTokens
, appMasterJar
)
);
listener.getContainerState().registerRequest(container, request);
}
}
else if(request.getAction() == Action.REMOVE) {
listener.removeContainers(request.getNumInstances(), request);
}
}
}