public void run()

in metron-analytics/metron-maas-service/src/main/java/org/apache/metron/maas/service/ApplicationMaster.java [436:539]


  public void run() throws YarnException, IOException, InterruptedException {
    LOG.info("Starting ApplicationMaster");

    // Note: Credentials, Token, UserGroupInformation, DataOutputBuffer class
    // are marked as LimitedPrivate
    Credentials credentials =
            UserGroupInformation.getCurrentUser().getCredentials();

    allTokens = YarnUtils.INSTANCE.tokensFromCredentials(credentials);

    // Create appSubmitterUgi and add original tokens to it
    appSubmitterUgi = YarnUtils.INSTANCE.createUserGroup(credentials);
    startTimelineClient(conf);
    if(timelineClient != null) {
      YarnUtils.INSTANCE.publishApplicationAttemptEvent(timelineClient, appAttemptID.toString(),
              ContainerEvents.APP_ATTEMPT_START, domainId, appSubmitterUgi);
    }
    int minSize = getMinContainerMemoryIncrement(conf);
    listener = new ContainerRequestListener(timelineClient , appSubmitterUgi , domainId, minSize);
    amRMClient = AMRMClientAsync.createAMRMClientAsync(1000, listener);
    amRMClient.init(conf);
    amRMClient.start();

    nmClientAsync = new NMClientAsyncImpl(listener);
    nmClientAsync.init(conf);
    nmClientAsync.start();


    // Setup local RPC Server to accept status requests directly from clients
    // TODO need to setup a protocol for client to be able to communicate to
    // the RPC server
    // TODO use the rpc port info to register with the RM for the client to
    // send requests to this app master

    // Register self with ResourceManager
    // This will start heartbeating to the RM
    appMasterHostname = NetUtils.getHostname();
    RegisterApplicationMasterResponse response = amRMClient
            .registerApplicationMaster(appMasterHostname, appMasterRpcPort,
                    appMasterTrackingUrl);
    // Dump out information about cluster capability as seen by the
    // resource manager
    int maxMem = response.getMaximumResourceCapability().getMemory();
    LOG.info("Max mem capabililty of resources in this cluster " + maxMem);

    int maxVCores = response.getMaximumResourceCapability().getVirtualCores();
    LOG.info("Max vcores capabililty of resources in this cluster " + maxVCores);
    maasHandler = new MaaSHandler(zkQuorum, zkRoot);
    try {
      maasHandler.start();
      maasHandler.getDiscoverer().resetState();
      listener.initialize(amRMClient, nmClientAsync, maasHandler.getDiscoverer());
    } catch (Exception e) {
      throw new IllegalStateException("Unable to find zookeeper", e);
    }
    EnumMap<Resources, Integer> maxResources = Resources.toResourceMap( Resources.MEMORY.of(maxMem)
                                                                      , Resources.V_CORE.of(maxVCores)
                                                                      );
    requestQueue = maasHandler.getConfig()
                              .createQueue(ImmutableMap.of(ZKQueue.ZK_CLIENT, maasHandler.getClient()
                                                          )
                                          );
    LOG.info("Ready to accept requests...");
    while(true) {

      ModelRequest request = requestQueue.dequeue();
      if(request == null) {
        LOG.error("Received a null request...");
        continue;
      }
      LOG.info("[" + request.getAction() + "]: Received request for model " + request.getName() + ":" + request.getVersion() + "x" + request.getNumInstances()
              + " containers of size " + request.getMemory() + "M at path " + request.getPath()
              );
      EnumMap<Resources, Integer> resourceRequest = Resources.toResourceMap(Resources.MEMORY.of(request.getMemory())
                                                                            ,Resources.V_CORE.of(1)
                                                                            );
      EnumMap<Resources, Integer> resources = Resources.getRealisticResourceRequest( maxResources
                                                                                   , Resources.toResource(resourceRequest)
                                                                                   );
      Resource resource = Resources.toResource(resources);
      Path appMasterJar  = getAppMasterJar();
      if(request.getAction() == Action.ADD) {
        listener.requestContainers(request.getNumInstances(), resource);
        for (int i = 0; i < request.getNumInstances(); ++i) {
          Container container = listener.getContainers(resource).take();
          LOG.info("Found container id of " + container.getId().getContainerId());
          executor.execute(new LaunchContainer(conf
                        , zkQuorum
                        , zkRoot
                        , nmClientAsync
                        , request
                        , container
                        , allTokens
                        , appMasterJar
                                              )
                          );
          listener.getContainerState().registerRequest(container, request);
        }
      }
      else if(request.getAction() == Action.REMOVE) {
        listener.removeContainers(request.getNumInstances(), request);
      }
    }
  }