void handleSourceTaskFinished()

in tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/InputReadyVertexManager.java [138:205]


  void handleSourceTaskFinished(String vertex, Integer taskId) {
    SourceVertexInfo srcInfo = srcVertexInfo.get(vertex);
    if (srcInfo.taskIsFinished[taskId.intValue()] == null) {
      // not a duplicate completion
      srcInfo.taskIsFinished[taskId.intValue()] = new Boolean(true);
      srcInfo.numFinishedTasks++;
      if (srcInfo.edgeProperty.getDataMovementType() == DataMovementType.ONE_TO_ONE) {
        oneToOneSrcTasksDoneCount[taskId.intValue()]++;
        // keep the latest container that completed as the location hint
        // After there is standard data size info available then use it
        oneToOneLocationHints[taskId.intValue()] = context.getTaskContainer(vertex, taskId);
      }
    }
    
    // custom edge needs to tell us which of our tasks its connected to
    // for now only-built in edges supported
    // Check if current source task's vertex is completed.
    if (srcInfo.edgeProperty.getDataMovementType() != DataMovementType.ONE_TO_ONE
        && srcInfo.numTasks != srcInfo.numFinishedTasks) {
      // we depend on all tasks to finish. So nothing to do now.
      return;
    }
    
    // currently finished vertex task may trigger us to schedule
    for (SourceVertexInfo vInfo : srcVertexInfo.values()) {
      if (vInfo.edgeProperty.getDataMovementType() != DataMovementType.ONE_TO_ONE) {
        // we depend on all tasks to finish.
        if (vInfo.numTasks != vInfo.numFinishedTasks) {
          // we depend on all tasks to finish. So nothing to do now.
          return;
        }
      }
    }
    
    // all source vertices will full dependencies are done
    List<TaskWithLocationHint> tasksToStart = null;
    if (numOneToOneEdges == 0) {
      // no 1-1 dependency. Start all tasks
      int numTasks = taskIsStarted.length;
      LOG.info("Starting all " + numTasks + "tasks for vertex: " + context.getVertexName());
      tasksToStart = Lists.newArrayListWithCapacity(numTasks);
      for (int i=0; i<numTasks; ++i) {
        taskIsStarted[i] = true;
        tasksToStart.add(new TaskWithLocationHint(new Integer(i), null));
      }
    } else {
      // start only the ready 1-1 tasks
      tasksToStart = Lists.newLinkedList();
      for (int i=0; i<taskIsStarted.length; ++i) {
        if (!taskIsStarted[i] && oneToOneSrcTasksDoneCount[i] == numOneToOneEdges) {
          taskIsStarted[i] = true;
          TaskLocationHint locationHint = null;
          if (oneToOneLocationHints[i] != null) {
            locationHint = new TaskLocationHint(oneToOneLocationHints[i].getId());
          }
          LOG.info("Starting task " + i + " for vertex: "
              + context.getVertexName() + " with location: "
              + ((locationHint != null) ? locationHint.getAffinitizedContainer() : "null"));
          tasksToStart.add(new TaskWithLocationHint(new Integer(i), locationHint));
        }
      }
    }
    
    if (tasksToStart != null && !tasksToStart.isEmpty()) {
      context.scheduleVertexTasks(tasksToStart);
    }
    
  }