public synchronized void start()

in client-tez/tez/src/main/java/org/apache/tez/runtime/library/input/CelebornUnorderedKVInput.java [120:214]


  public synchronized void start() throws IOException {
    if (!isStarted.get()) {
      ////// Initial configuration
      memoryUpdateCallbackHandler.validateUpdateReceived();
      CompressionCodec codec = CodecUtils.getCodec(conf);

      boolean compositeFetch = ShuffleUtils.isTezShuffleHandler(conf);
      boolean ifileReadAhead =
          conf.getBoolean(
              TezRuntimeConfiguration.TEZ_RUNTIME_IFILE_READAHEAD,
              TezRuntimeConfiguration.TEZ_RUNTIME_IFILE_READAHEAD_DEFAULT);
      int ifileReadAheadLength = 0;
      int ifileBufferSize = 0;

      if (ifileReadAhead) {
        ifileReadAheadLength =
            conf.getInt(
                TezRuntimeConfiguration.TEZ_RUNTIME_IFILE_READAHEAD_BYTES,
                TezRuntimeConfiguration.TEZ_RUNTIME_IFILE_READAHEAD_BYTES_DEFAULT);
      }
      ifileBufferSize =
          conf.getInt(
              "io.file.buffer.size", TezRuntimeConfiguration.TEZ_RUNTIME_IFILE_BUFFER_SIZE_DEFAULT);

      this.inputManager =
          new SimpleFetchedInputAllocator(
              TezUtilsInternal.cleanVertexName(getContext().getSourceVertexName()),
              getContext().getUniqueIdentifier(),
              getContext().getDagIdentifier(),
              conf,
              getContext().getTotalMemoryAvailableToTask(),
              memoryUpdateCallbackHandler.getMemoryAssigned());

      String host = conf.get(TEZ_CELEBORN_LM_HOST);
      int port = conf.getInt(TEZ_CELEBORN_LM_PORT, -1);
      int shuffleId = conf.getInt(TEZ_SHUFFLE_ID, -1);
      String appId = conf.get(TEZ_CELEBORN_APPLICATION_ID);
      boolean broadcastOrOneToOne = conf.getBoolean(TEZ_BROADCAST_OR_ONETOONE, false);
      CelebornConf celebornConf = CelebornTezUtils.fromTezConfiguration(conf);
      ShuffleClient shuffleClient =
          ShuffleClient.get(
              appId,
              host,
              port,
              celebornConf,
              new UserIdentifier(
                  celebornConf.userSpecificTenant(), celebornConf.userSpecificUserName()),
              null);

      this.shuffleManager =
          new CelebornShuffleManager(
              getContext(),
              conf,
              getNumPhysicalInputs(),
              shuffleClient,
              ifileBufferSize,
              ifileReadAhead,
              ifileReadAheadLength,
              codec,
              inputManager,
              shuffleId,
              applicationAttemptId,
              broadcastOrOneToOne);

      this.inputEventHandler =
          new ShuffleInputEventHandlerImpl(
              getContext(),
              shuffleManager,
              inputManager,
              codec,
              ifileReadAhead,
              ifileReadAheadLength,
              compositeFetch);

      ////// End of Initial configuration

      this.shuffleManager.run();
      this.kvReader =
          createReader(
              inputRecordCounter, codec, ifileBufferSize, ifileReadAhead, ifileReadAheadLength);
      List<Event> pending = new LinkedList<Event>();
      pendingEvents.drainTo(pending);
      if (pending.size() > 0) {
        if (LOG.isDebugEnabled()) {
          LOG.debug(
              getContext().getInputOutputVertexNames()
                  + ": "
                  + "NoAutoStart delay in processing first event: "
                  + (System.currentTimeMillis() - firstEventReceivedTime));
        }
        inputEventHandler.handleEvents(pending);
      }
      isStarted.set(true);
    }
  }