client-tez/src/main/java/org/apache/tez/runtime/library/output/RssUnorderedKVOutput.java [131:215]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
    this.memoryUpdateCallbackHandler = new MemoryUpdateCallbackHandler();

    long memRequestSize =
        RssTezUtils.getInitialMemoryRequirement(conf, getContext().getTotalMemoryAvailableToTask());
    LOG.info("memRequestSize is {}", memRequestSize);
    getContext().requestInitialMemory(memRequestSize, memoryUpdateCallbackHandler);
    LOG.info("Got initialMemory.");

    this.sendEmptyPartitionDetails =
        conf.getBoolean(
            TezRuntimeConfiguration.TEZ_RUNTIME_EMPTY_PARTITION_INFO_VIA_EVENTS_ENABLED,
            TezRuntimeConfiguration.TEZ_RUNTIME_EMPTY_PARTITION_INFO_VIA_EVENTS_ENABLED_DEFAULT);

    this.host = this.conf.get(RSS_AM_SHUFFLE_MANAGER_ADDRESS);
    this.port = this.conf.getInt(RSS_AM_SHUFFLE_MANAGER_PORT, -1);
    final InetSocketAddress address = NetUtils.createSocketAddrForHost(host, port);

    UserGroupInformation taskOwner =
        UserGroupInformation.createRemoteUser(this.applicationId.toString());
    Credentials credentials = UserGroupInformation.getCurrentUser().getCredentials();
    Token<JobTokenIdentifier> jobToken = TokenCache.getSessionToken(credentials);
    SecurityUtil.setTokenService(jobToken, address);
    taskOwner.addToken(jobToken);
    final TezRemoteShuffleUmbilicalProtocol umbilical =
        taskOwner.doAs(
            new PrivilegedExceptionAction<TezRemoteShuffleUmbilicalProtocol>() {
              @Override
              public TezRemoteShuffleUmbilicalProtocol run() throws Exception {
                return RPC.getProxy(
                    TezRemoteShuffleUmbilicalProtocol.class,
                    TezRemoteShuffleUmbilicalProtocol.versionID,
                    address,
                    conf);
              }
            });
    TezVertexID tezVertexID = taskAttemptId.getTaskID().getVertexID();
    TezDAGID tezDAGID = tezVertexID.getDAGId();
    int sourceVertexId = this.conf.getInt(RSS_SHUFFLE_SOURCE_VERTEX_ID, -1);
    int destinationVertexId = this.conf.getInt(RSS_SHUFFLE_DESTINATION_VERTEX_ID, -1);
    if (sourceVertexId == -1) {
      throw new RssException("sourceVertexId should not be -1");
    }
    if (destinationVertexId == -1) {
      throw new RssException("destinationVertexId should not be -1");
    }
    this.shuffleId =
        RssTezUtils.computeShuffleId(tezDAGID.getId(), sourceVertexId, destinationVertexId);
    this.applicationAttemptId =
        ApplicationAttemptId.newInstance(
            outputContext.getApplicationId(), outputContext.getDAGAttemptNumber());
    GetShuffleServerRequest request =
        new GetShuffleServerRequest(
            this.taskAttemptId, this.mapNum, this.numOutputs, this.shuffleId);

    GetShuffleServerResponse response = umbilical.getShuffleAssignments(request);
    this.partitionToServers =
        response
            .getShuffleAssignmentsInfoWritable()
            .getShuffleAssignmentsInfo()
            .getPartitionToServers();

    LOG.info("Got response from am.");
    return Collections.emptyList();
  }

  @Override
  public void handleEvents(List<Event> list) {}

  @Override
  public List<Event> close() throws Exception {
    List<Event> returnEvents = Lists.newLinkedList();
    if (sorter != null) {
      sorter.flush();
      sorter.close();
      this.endTime = System.nanoTime();
      returnEvents.addAll(generateEvents());
      sorter = null;
    } else {
      LOG.warn(
          getContext().getDestinationVertexName()
              + ": Attempting to close output {} of type {} before it was started. "
              + "Generating empty events",
          getContext().getDestinationVertexName(),
          this.getClass().getSimpleName());
      returnEvents = generateEmptyEvents();
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



client-tez/src/main/java/org/apache/tez/runtime/library/output/RssUnorderedPartitionedKVOutput.java [129:213]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
    this.memoryUpdateCallbackHandler = new MemoryUpdateCallbackHandler();

    long memRequestSize =
        RssTezUtils.getInitialMemoryRequirement(conf, getContext().getTotalMemoryAvailableToTask());
    LOG.info("memRequestSize is {}", memRequestSize);
    getContext().requestInitialMemory(memRequestSize, memoryUpdateCallbackHandler);
    LOG.info("Got initialMemory.");

    this.sendEmptyPartitionDetails =
        conf.getBoolean(
            TezRuntimeConfiguration.TEZ_RUNTIME_EMPTY_PARTITION_INFO_VIA_EVENTS_ENABLED,
            TezRuntimeConfiguration.TEZ_RUNTIME_EMPTY_PARTITION_INFO_VIA_EVENTS_ENABLED_DEFAULT);

    this.host = this.conf.get(RSS_AM_SHUFFLE_MANAGER_ADDRESS);
    this.port = this.conf.getInt(RSS_AM_SHUFFLE_MANAGER_PORT, -1);
    final InetSocketAddress address = NetUtils.createSocketAddrForHost(host, port);

    UserGroupInformation taskOwner =
        UserGroupInformation.createRemoteUser(this.applicationId.toString());
    Credentials credentials = UserGroupInformation.getCurrentUser().getCredentials();
    Token<JobTokenIdentifier> jobToken = TokenCache.getSessionToken(credentials);
    SecurityUtil.setTokenService(jobToken, address);
    taskOwner.addToken(jobToken);
    final TezRemoteShuffleUmbilicalProtocol umbilical =
        taskOwner.doAs(
            new PrivilegedExceptionAction<TezRemoteShuffleUmbilicalProtocol>() {
              @Override
              public TezRemoteShuffleUmbilicalProtocol run() throws Exception {
                return RPC.getProxy(
                    TezRemoteShuffleUmbilicalProtocol.class,
                    TezRemoteShuffleUmbilicalProtocol.versionID,
                    address,
                    conf);
              }
            });
    TezVertexID tezVertexID = taskAttemptId.getTaskID().getVertexID();
    TezDAGID tezDAGID = tezVertexID.getDAGId();
    int sourceVertexId = this.conf.getInt(RSS_SHUFFLE_SOURCE_VERTEX_ID, -1);
    int destinationVertexId = this.conf.getInt(RSS_SHUFFLE_DESTINATION_VERTEX_ID, -1);
    if (sourceVertexId == -1) {
      throw new RssException("sourceVertexId should not be -1");
    }
    if (destinationVertexId == -1) {
      throw new RssException("destinationVertexId should not be -1");
    }
    this.shuffleId =
        RssTezUtils.computeShuffleId(tezDAGID.getId(), sourceVertexId, destinationVertexId);
    this.applicationAttemptId =
        ApplicationAttemptId.newInstance(
            outputContext.getApplicationId(), outputContext.getDAGAttemptNumber());
    GetShuffleServerRequest request =
        new GetShuffleServerRequest(
            this.taskAttemptId, this.mapNum, this.numOutputs, this.shuffleId);
    GetShuffleServerResponse response = umbilical.getShuffleAssignments(request);

    this.partitionToServers =
        response
            .getShuffleAssignmentsInfoWritable()
            .getShuffleAssignmentsInfo()
            .getPartitionToServers();

    LOG.info("Got response from am.");
    return Collections.emptyList();
  }

  @Override
  public void handleEvents(List<Event> list) {}

  @Override
  public List<Event> close() throws Exception {
    List<Event> returnEvents = Lists.newLinkedList();
    if (sorter != null) {
      sorter.flush();
      sorter.close();
      this.endTime = System.nanoTime();
      returnEvents.addAll(generateEvents());
      sorter = null;
    } else {
      LOG.warn(
          getContext().getDestinationVertexName()
              + ": Attempting to close output {} of type {} before it was started. "
              + "Generating empty events",
          getContext().getDestinationVertexName(),
          this.getClass().getSimpleName());
      returnEvents = generateEmptyEvents();
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



