public synchronized void start()

in client-tez/src/main/java/org/apache/tez/runtime/library/input/RssUnorderedKVInput.java [144:233]


  public synchronized void start() throws IOException {
    if (!isStarted.get()) {
      ////// Initial configuration
      memoryUpdateCallbackHandler.validateUpdateReceived();
      CompressionCodec codec;
      if (ConfigUtils.isIntermediateInputCompressed(conf)) {
        Class<? extends CompressionCodec> codecClass =
            ConfigUtils.getIntermediateInputCompressorClass(conf, DefaultCodec.class);
        codec = ReflectionUtils.newInstance(codecClass, conf);
      } else {
        codec = null;
      }

      boolean ifileReadAhead =
          conf.getBoolean(
              TezRuntimeConfiguration.TEZ_RUNTIME_IFILE_READAHEAD,
              TezRuntimeConfiguration.TEZ_RUNTIME_IFILE_READAHEAD_DEFAULT);
      int ifileReadAheadLength = 0;
      int ifileBufferSize = 0;

      if (ifileReadAhead) {
        ifileReadAheadLength =
            conf.getInt(
                TezRuntimeConfiguration.TEZ_RUNTIME_IFILE_READAHEAD_BYTES,
                TezRuntimeConfiguration.TEZ_RUNTIME_IFILE_READAHEAD_BYTES_DEFAULT);
      }
      ifileBufferSize =
          conf.getInt(
              "io.file.buffer.size", TezRuntimeConfiguration.TEZ_RUNTIME_IFILE_BUFFER_SIZE_DEFAULT);

      LOG.info(
          "RssUnorderedKVInput, totalMemoryAvailable:{}, available memory:{}",
          getContext().getTotalMemoryAvailableToTask(),
          memoryUpdateCallbackHandler.getMemoryAssigned());

      boolean compositeFetch = ShuffleUtils.isTezShuffleHandler(conf);

      this.inputManager =
          new RssSimpleFetchedInputAllocator(
              TezUtilsInternal.cleanVertexName(getContext().getSourceVertexName()),
              getContext().getUniqueIdentifier(),
              getContext().getDagIdentifier(),
              conf,
              getContext().getTotalMemoryAvailableToTask(),
              memoryUpdateCallbackHandler.getMemoryAssigned());

      this.rssShuffleManager =
          new RssShuffleManager(
              getContext(),
              conf,
              getNumPhysicalInputs(),
              ifileBufferSize,
              ifileReadAhead,
              ifileReadAheadLength,
              codec,
              inputManager,
              shuffleId,
              applicationAttemptId);

      this.inputEventHandler =
          new ShuffleInputEventHandlerImpl(
              getContext(),
              rssShuffleManager,
              inputManager,
              codec,
              ifileReadAhead,
              ifileReadAheadLength,
              compositeFetch);

      ////// End of Initial configuration

      this.rssShuffleManager.run();
      this.kvReader =
          createReader(
              inputRecordCounter, codec, ifileBufferSize, ifileReadAhead, ifileReadAheadLength);
      List<Event> pending = new LinkedList<>();
      pendingEvents.drainTo(pending);
      if (pending.size() > 0) {
        if (LOG.isDebugEnabled()) {
          LOG.debug(
              getContext().getSourceVertexName()
                  + ": "
                  + "NoAutoStart delay in processing first event: "
                  + (System.currentTimeMillis() - firstEventReceivedTime));
        }
        inputEventHandler.handleEvents(pending);
      }
      isStarted.set(true);
    }
  }