public void initialize()

in tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java [168:300]


  public void initialize() throws Exception {
    LOG.info("Initializing LogicalProcessorIORuntimeTask");
    Preconditions.checkState(this.state == State.NEW, "Already initialized");
    this.state = State.INITED;
    
    int numTasks = 0;
    
    int inputIndex = 0;
    for (InputSpec inputSpec : taskSpec.getInputs()) {
      this.initializerCompletionService.submit(
          new InitializeInputCallable(inputSpec, inputIndex++));
      numTasks++;
    }
    
    int outputIndex = 0;
    for (OutputSpec outputSpec : taskSpec.getOutputs()) {
      this.initializerCompletionService.submit(
          new InitializeOutputCallable(outputSpec, outputIndex++));
      numTasks++;
    }
    
    // Initialize processor in the current thread.
    initializeLogicalIOProcessor();
    
    int completedTasks = 0;
    while (completedTasks < numTasks) {
      LOG.info("Waiting for " + (numTasks-completedTasks) + " initializers to finish");
      Future<Void> future = initializerCompletionService.take();
      try {
        future.get();
        completedTasks++;
      } catch (ExecutionException e) {
        if (e.getCause() instanceof Exception) {
          throw (Exception) e.getCause();
        } else {
          throw new Exception(e);
        }
      }
    }
    LOG.info("All initializers finished");
    // group inputs depend on inputs beings initialized. So must be done after.
    initializeGroupInputs();
    // Register the groups so that appropriate calls can be made.
    this.inputReadyTracker
        .setGroupedInputs(groupInputsMap == null ? null : groupInputsMap.values());
    // Grouped input start will be controlled by the start of the GroupedInput
    
    // Construct the set of groupedInputs up front so that start is not invoked on them.
    Set<String> groupInputs = Sets.newHashSet();
    // Construct Inputs/Outputs map argument for processor.run()
    // first add the group inputs
    if (groupInputSpecs !=null && !groupInputSpecs.isEmpty()) {
      for (GroupInputSpec groupInputSpec : groupInputSpecs) {
        runInputMap.put(groupInputSpec.getGroupName(), 
                                 groupInputsMap.get(groupInputSpec.getGroupName()));
        groupInputs.addAll(groupInputSpec.getGroupVertices());
      }
    }

    initialMemoryDistributor.makeInitialAllocations();

    LOG.info("Starting Inputs/Outputs");
    int numAutoStarts = 0;
    for (InputSpec inputSpec : inputSpecs) {
      if (groupInputs.contains(inputSpec.getSourceVertexName())) {
        LOG.info("Ignoring " + inputSpec.getSourceVertexName()
            + " for start, since it will be controlled via it's Group");
        continue;
      }
      if (!inputAlreadyStarted(taskSpec.getVertexName(), inputSpec.getSourceVertexName())) {
        startedInputsMap.put(taskSpec.getVertexName(), inputSpec.getSourceVertexName());
        numAutoStarts++;
        this.initializerCompletionService.submit(new StartInputCallable(inputsMap.get(inputSpec
            .getSourceVertexName()), inputSpec.getSourceVertexName()));
        LOG.info("Input: " + inputSpec.getSourceVertexName()
            + " being auto started by the framework. Subsequent instances will not be auto-started");
      }
    }

    if (groupInputSpecs != null) {      
      for (GroupInputSpec group : groupInputSpecs) {
        if (!inputAlreadyStarted(taskSpec.getVertexName(), group.getGroupName())) {
          numAutoStarts++;
          this.initializerCompletionService.submit(new StartInputCallable(groupInputsMap.get(group
              .getGroupName()), group.getGroupName()));
          LOG.info("InputGroup: " + group.getGroupName()
              + " being auto started by the framework. Subsequent instance will not be auto-started");
        }
      }
    }

    // Shutdown after all tasks complete.
    this.initializerExecutor.shutdown();
    
    completedTasks = 0;
    LOG.info("Num IOs determined for AutoStart: " + numAutoStarts);
    while (completedTasks < numAutoStarts) {
      LOG.info("Waiting for " + (numAutoStarts - completedTasks) + " IOs to start");
      Future<Void> future = initializerCompletionService.take();
      try {
        future.get();
        completedTasks++;
      } catch (ExecutionException e) {
        if (e.getCause() instanceof Exception) {
          throw (Exception) e.getCause();
        } else {
          throw new Exception(e);
        }
      }
    }
    LOG.info("AutoStartComplete");

    

    // then add the non-grouped inputs
    for (InputSpec inputSpec : inputSpecs) {
      if (!groupInputs.contains(inputSpec.getSourceVertexName())) {
        LogicalInput input = inputsMap.get(inputSpec.getSourceVertexName());
        runInputMap.put(inputSpec.getSourceVertexName(), input);
      }
    }

    for (OutputSpec outputSpec : outputSpecs) {
      LogicalOutput output = outputsMap.get(outputSpec.getDestinationVertexName());
      String outputName = outputSpec.getDestinationVertexName();
      runOutputMap.put(outputName, output);
    }
    
    // TODO Maybe close initialized inputs / outputs in case of failure to
    // initialize.
  
    startRouterThread();
  }