client-tez/src/main/java/org/apache/tez/runtime/library/output/RssOrderedPartitionedKVOutput.java [134:180]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
    this.memoryUpdateCallbackHandler = new MemoryUpdateCallbackHandler();

    long memRequestSize =
        RssTezUtils.getInitialMemoryRequirement(conf, getContext().getTotalMemoryAvailableToTask());
    LOG.info("memRequestSize is {}", memRequestSize);
    getContext().requestInitialMemory(memRequestSize, memoryUpdateCallbackHandler);
    LOG.info("Got initialMemory.");

    this.sendEmptyPartitionDetails =
        conf.getBoolean(
            TezRuntimeConfiguration.TEZ_RUNTIME_EMPTY_PARTITION_INFO_VIA_EVENTS_ENABLED,
            TezRuntimeConfiguration.TEZ_RUNTIME_EMPTY_PARTITION_INFO_VIA_EVENTS_ENABLED_DEFAULT);

    this.host = this.conf.get(RSS_AM_SHUFFLE_MANAGER_ADDRESS);
    this.port = this.conf.getInt(RSS_AM_SHUFFLE_MANAGER_PORT, -1);
    final InetSocketAddress address = NetUtils.createSocketAddrForHost(host, port);

    UserGroupInformation taskOwner =
        UserGroupInformation.createRemoteUser(this.applicationId.toString());
    Credentials credentials = UserGroupInformation.getCurrentUser().getCredentials();
    Token<JobTokenIdentifier> jobToken = TokenCache.getSessionToken(credentials);
    SecurityUtil.setTokenService(jobToken, address);
    taskOwner.addToken(jobToken);
    final TezRemoteShuffleUmbilicalProtocol umbilical =
        taskOwner.doAs(
            new PrivilegedExceptionAction<TezRemoteShuffleUmbilicalProtocol>() {
              @Override
              public TezRemoteShuffleUmbilicalProtocol run() throws Exception {
                return RPC.getProxy(
                    TezRemoteShuffleUmbilicalProtocol.class,
                    TezRemoteShuffleUmbilicalProtocol.versionID,
                    address,
                    conf);
              }
            });
    TezVertexID tezVertexID = taskAttemptId.getTaskID().getVertexID();
    TezDAGID tezDAGID = tezVertexID.getDAGId();
    int sourceVertexId = this.conf.getInt(RSS_SHUFFLE_SOURCE_VERTEX_ID, -1);
    int destinationVertexId = this.conf.getInt(RSS_SHUFFLE_DESTINATION_VERTEX_ID, -1);
    if (sourceVertexId == -1) {
      throw new RssException("sourceVertexId should not be -1");
    }
    if (destinationVertexId == -1) {
      throw new RssException("destinationVertexId should not be -1");
    }
    this.shuffleId =
        RssTezUtils.computeShuffleId(tezDAGID.getId(), sourceVertexId, destinationVertexId);
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



client-tez/src/main/java/org/apache/tez/runtime/library/output/RssUnorderedKVOutput.java [131:177]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
    this.memoryUpdateCallbackHandler = new MemoryUpdateCallbackHandler();

    long memRequestSize =
        RssTezUtils.getInitialMemoryRequirement(conf, getContext().getTotalMemoryAvailableToTask());
    LOG.info("memRequestSize is {}", memRequestSize);
    getContext().requestInitialMemory(memRequestSize, memoryUpdateCallbackHandler);
    LOG.info("Got initialMemory.");

    this.sendEmptyPartitionDetails =
        conf.getBoolean(
            TezRuntimeConfiguration.TEZ_RUNTIME_EMPTY_PARTITION_INFO_VIA_EVENTS_ENABLED,
            TezRuntimeConfiguration.TEZ_RUNTIME_EMPTY_PARTITION_INFO_VIA_EVENTS_ENABLED_DEFAULT);

    this.host = this.conf.get(RSS_AM_SHUFFLE_MANAGER_ADDRESS);
    this.port = this.conf.getInt(RSS_AM_SHUFFLE_MANAGER_PORT, -1);
    final InetSocketAddress address = NetUtils.createSocketAddrForHost(host, port);

    UserGroupInformation taskOwner =
        UserGroupInformation.createRemoteUser(this.applicationId.toString());
    Credentials credentials = UserGroupInformation.getCurrentUser().getCredentials();
    Token<JobTokenIdentifier> jobToken = TokenCache.getSessionToken(credentials);
    SecurityUtil.setTokenService(jobToken, address);
    taskOwner.addToken(jobToken);
    final TezRemoteShuffleUmbilicalProtocol umbilical =
        taskOwner.doAs(
            new PrivilegedExceptionAction<TezRemoteShuffleUmbilicalProtocol>() {
              @Override
              public TezRemoteShuffleUmbilicalProtocol run() throws Exception {
                return RPC.getProxy(
                    TezRemoteShuffleUmbilicalProtocol.class,
                    TezRemoteShuffleUmbilicalProtocol.versionID,
                    address,
                    conf);
              }
            });
    TezVertexID tezVertexID = taskAttemptId.getTaskID().getVertexID();
    TezDAGID tezDAGID = tezVertexID.getDAGId();
    int sourceVertexId = this.conf.getInt(RSS_SHUFFLE_SOURCE_VERTEX_ID, -1);
    int destinationVertexId = this.conf.getInt(RSS_SHUFFLE_DESTINATION_VERTEX_ID, -1);
    if (sourceVertexId == -1) {
      throw new RssException("sourceVertexId should not be -1");
    }
    if (destinationVertexId == -1) {
      throw new RssException("destinationVertexId should not be -1");
    }
    this.shuffleId =
        RssTezUtils.computeShuffleId(tezDAGID.getId(), sourceVertexId, destinationVertexId);
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



