public boolean maybeAddTezEventForDestinationTask()

in tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java [576:683]


  public boolean maybeAddTezEventForDestinationTask(TezEvent tezEvent, TezTaskAttemptID attemptID,
      int srcTaskIndex, List<TezEvent> listToAdd, int listMaxSize, 
      PendingEventRouteMetadata pendingRoutes) 
          throws AMUserCodeException {
    if (!routingNeeded) {
      if (LOG.isDebugEnabled()) {
        LOG.debug("Not routing events since destination vertex has 0 tasks" +
            generateCommonDebugString(srcTaskIndex, tezEvent));
      }
      return true;
    } else {
      try {
        EdgeManagerPluginOnDemand edgeManagerOnDemand = (EdgeManagerPluginOnDemand) edgeManager;
        int taskIndex = attemptID.getTaskID().getId();
        switch (tezEvent.getEventType()) {
        case COMPOSITE_DATA_MOVEMENT_EVENT:
          {
            CompositeDataMovementEvent compEvent = (CompositeDataMovementEvent) tezEvent.getEvent(); 
            CompositeEventRouteMetadata routeMeta = edgeManagerOnDemand
                  .routeCompositeDataMovementEventToDestination(srcTaskIndex, taskIndex);

            if (routeMeta != null) {
              CompositeRoutedDataMovementEvent edme = compEvent.expandRouted(routeMeta);
              TezEvent tezEventToSend = new TezEvent(edme, tezEvent.getSourceInfo(), tezEvent.getEventReceivedTime());
              tezEventToSend.setDestinationInfo(destinationMetaInfo);
              listToAdd.add(tezEventToSend);
            }
          }
          break;
        case INPUT_FAILED_EVENT:
          {
            InputFailedEvent ifEvent = (InputFailedEvent) tezEvent.getEvent();
            EventRouteMetadata routeMeta;
            int numEventsDone;
            if (pendingRoutes != null) {
              routeMeta = pendingRoutes.getRouteMeta();
              numEventsDone = pendingRoutes.getNumEventsRouted();
            } else {
              routeMeta = edgeManagerOnDemand.routeInputSourceTaskFailedEventToDestination(
                  srcTaskIndex, taskIndex);
              numEventsDone = 0;
            }
            if (routeMeta != null) {
              int listSize = listToAdd.size();
              int numEvents = routeMeta.getNumEvents();
              int[] targetIndices = routeMeta.getTargetIndices();
              while (numEventsDone < numEvents && listSize++ < listMaxSize) {
                InputFailedEvent e = ifEvent.makeCopy(targetIndices[numEventsDone]);
                numEventsDone++;
                TezEvent tezEventToSend = new TezEvent(e, tezEvent.getSourceInfo(),
                    tezEvent.getEventReceivedTime());
                tezEventToSend.setDestinationInfo(destinationMetaInfo);
                listToAdd.add(tezEventToSend);
              }
              if (numEventsDone < numEvents) {
                pendingEvents.put(attemptID, new PendingEventRouteMetadata(routeMeta, tezEvent,
                    numEventsDone));
                return false;
              }
            }
          }
          break;
        case DATA_MOVEMENT_EVENT:
          {
            DataMovementEvent dmEvent = (DataMovementEvent) tezEvent.getEvent();
            EventRouteMetadata routeMeta;
            int numEventsDone;
            if (pendingRoutes != null) {
              routeMeta = pendingRoutes.getRouteMeta();
              numEventsDone = pendingRoutes.getNumEventsRouted();
            } else {
              routeMeta = edgeManagerOnDemand.routeDataMovementEventToDestination(srcTaskIndex,
                  dmEvent.getSourceIndex(), taskIndex);
              numEventsDone = 0;
            }
            if (routeMeta != null) {
              int listSize = listToAdd.size();
              int numEvents = routeMeta.getNumEvents();
              int[] targetIndices = routeMeta.getTargetIndices();
              while (numEventsDone < numEvents && listSize++ < listMaxSize) {
                DataMovementEvent e = dmEvent.makeCopy(targetIndices[numEventsDone]);
                numEventsDone++;
                TezEvent tezEventToSend = new TezEvent(e, tezEvent.getSourceInfo(),
                    tezEvent.getEventReceivedTime());
                tezEventToSend.setDestinationInfo(destinationMetaInfo);
                listToAdd.add(tezEventToSend);
              }
              if (numEventsDone < numEvents) {
                pendingEvents.put(attemptID, new PendingEventRouteMetadata(routeMeta, tezEvent,
                    numEventsDone));
                return false;
              }
            }
          }
          break;
        default:
          throw new TezUncheckedException("Unhandled tez event type: "
              + tezEvent.getEventType());
        }
      } catch (Exception e){
        throw new AMUserCodeException(Source.EdgeManager,
            "Fail to maybeAddTezEventForDestinationTask, event:" + tezEvent.getEvent()
            + ", sourceInfo:" + tezEvent.getSourceInfo() + ", destinationInfo:"
            + tezEvent.getDestinationInfo() + ", " + getEdgeInfo(), e);
      }
    }
    return true;
  }