public AllocationResponse allocate()

in twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/ports/AMRMClientImpl.java [176:229]


  public AllocationResponse allocate(float progressIndicator)
    throws YarnRemoteException {
    AllocateResponse allocateResponse = null;
    ArrayList<ResourceRequest> askList = null;
    ArrayList<ContainerId> releaseList = null;
    AllocateRequest allocateRequest = null;

    try {
      synchronized (this) {
        askList = new ArrayList<ResourceRequest>(ask);
        releaseList = new ArrayList<ContainerId>(release);
        // optimistically clear this collection assuming no RPC failure
        ask.clear();
        release.clear();
        allocateRequest = BuilderUtils
          .newAllocateRequest(appAttemptId, lastResponseId, progressIndicator,
                              askList, releaseList);
      }

      allocateResponse = rmClient.allocate(allocateRequest);
      AllocationResponse response = AllocationResponses.create(allocateResponse);

      synchronized (this) {
        // update these on successful RPC
        clusterNodeCount = allocateResponse.getNumClusterNodes();
        lastResponseId = response.getResponseId();
        clusterAvailableResources = response.getAvailableResources();
      }

      return response;
    } finally {
      // TODO how to differentiate remote YARN exception vs error in RPC
      if (allocateResponse == null) {
        // We hit an exception in allocate()
        // Preserve ask and release for next call to allocate()
        synchronized (this) {
          release.addAll(releaseList);
          // Requests could have been added or deleted during call to allocate.
          // If requests were added/removed then there is nothing to do since
          // the ResourceRequest object in ask would have the actual new value.
          // If ask does not have this ResourceRequest then it was unchanged and
          // so we can add the value back safely.
          // This assumes that there will no concurrent calls to allocate() and
          // so we don't have to worry about ask being changed in the
          // synchronized block at the beginning of this method.
          for (ResourceRequest oldAsk : askList) {
            if (!ask.contains(oldAsk)) {
              ask.add(oldAsk);
            }
          }
        }
      }
    }
  }