private boolean setParallelism()

in tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java [1115:1321]


  private boolean setParallelism(int parallelism, VertexLocationHint vertexLocationHint,
      Map<String, EdgeManagerDescriptor> sourceEdgeManagers,
      Map<String, RootInputSpecUpdate> rootInputSpecUpdates,
      boolean recovering) {
    if (recovering) {
      writeLock.lock();
      try {
        if (sourceEdgeManagers != null) {
          for(Map.Entry<String, EdgeManagerDescriptor> entry :
              sourceEdgeManagers.entrySet()) {
            LOG.info("Recovering edge manager for source:"
                + entry.getKey() + " destination: " + getVertexId());
            Vertex sourceVertex = appContext.getCurrentDAG().getVertex(entry.getKey());
            Edge edge = sourceVertices.get(sourceVertex);
            try {
              edge.setCustomEdgeManager(entry.getValue());
            } catch (Exception e) {
              LOG.warn("Failed to initialize edge manager for edge"
                  + ", sourceVertexName=" + sourceVertex.getName()
                  + ", destinationVertexName=" + edge.getDestinationVertexName(),
                  e);
              return false;
            }
          }
        }
        
        // Restore any rootInputSpecUpdates which may have been registered during a parallelism
        // update.
        if (rootInputSpecUpdates != null) {
          LOG.info("Got updated RootInputsSpecs during recovery: " + rootInputSpecUpdates.toString());
          this.rootInputSpecs.putAll(rootInputSpecUpdates);
        }
        return true;
      } finally {
        writeLock.unlock();
      }
    }
    Preconditions.checkArgument(parallelism >= 0, "Parallelism must be >=0. Value: " 
    + parallelism + " for vertex: " + logIdentifier);
    setVertexLocationHint(vertexLocationHint);
    writeLock.lock();
    try {
      if (parallelismSet == true) {
        LOG.info("Parallelism can only be set dynamically once per vertex: " + logIdentifier);
        return false;
      }
      
      parallelismSet = true;

      // Input initializer/Vertex Manager/1-1 split expected to set parallelism.
      if (numTasks == -1) {
        if (getState() != VertexState.INITIALIZING) {
          throw new TezUncheckedException(
              "Vertex state is not Initializing. Value: " + getState()
                  + " for vertex: " + logIdentifier);
        }
        
        if(sourceEdgeManagers != null) {
          for(Map.Entry<String, EdgeManagerDescriptor> entry : sourceEdgeManagers.entrySet()) {
            LOG.info("Replacing edge manager for source:"
                + entry.getKey() + " destination: " + getVertexId());
            Vertex sourceVertex = appContext.getCurrentDAG().getVertex(entry.getKey());
            Edge edge = sourceVertices.get(sourceVertex);
            try {
              edge.setCustomEdgeManager(entry.getValue());
            } catch (Exception e) {
              LOG.warn("Failed to initialize edge manager for edge"
                  + ", sourceVertexName=" + sourceVertex.getName()
                  + ", destinationVertexName=" + edge.getDestinationVertexName(),
                  e);
              return false;
            }
          }
        }
        if (rootInputSpecUpdates != null) {
          LOG.info("Got updated RootInputsSpecs: " + rootInputSpecUpdates.toString());
          // Sanity check for correct number of updates.
          for (Entry<String, RootInputSpecUpdate> rootInputSpecUpdateEntry : rootInputSpecUpdates
              .entrySet()) {
            Preconditions
                .checkState(
                    rootInputSpecUpdateEntry.getValue().isForAllWorkUnits()
                        || (rootInputSpecUpdateEntry.getValue().getAllNumPhysicalInputs() != null && rootInputSpecUpdateEntry
                            .getValue().getAllNumPhysicalInputs().size() == parallelism),
                    "Not enough input spec updates for root input named "
                        + rootInputSpecUpdateEntry.getKey());
          }
          this.rootInputSpecs.putAll(rootInputSpecUpdates);
        }
        this.numTasks = parallelism;
        this.createTasks();
        LOG.info("Vertex " + getVertexId() + 
            " parallelism set to " + parallelism);
        if (canInitVertex()) {
          getEventHandler().handle(new VertexEvent(getVertexId(), VertexEventType.V_READY_TO_INIT));
        }
      } else {
        // This is an artificial restriction since there's no way of knowing whether a VertexManager
        // will attempt to update root input specs. When parallelism has not been initialized, the
        // Vertex will not be in started state so it's safe to update the specifications.
        // TODO TEZ-937 - add e mechanism to query vertex managers, or for VMs to indicate readines
        // for a vertex to start.
        Preconditions.checkState(rootInputSpecUpdates == null,
            "Root Input specs can only be updated when the vertex is configured with -1 tasks");
        if (parallelism >= numTasks) {
          // not that hard to support perhaps. but checking right now since there
          // is no use case for it and checking may catch other bugs.
          LOG.warn("Increasing parallelism is not supported, vertexId="
              + logIdentifier);
          return false;
        }
        if (parallelism == numTasks) {
          LOG.info("setParallelism same as current value: " + parallelism + 
              " for vertex: " + logIdentifier);
          Preconditions.checkArgument(sourceEdgeManagers != null,
              "Source edge managers or RootInputSpecs must be set when not changing parallelism");
        } else {
          LOG.info(
              "Resetting vertex location hints due to change in parallelism for vertex: " + logIdentifier);
          vertexLocationHint = null;
        }

        // start buffering incoming events so that we can re-route existing events
        for (Edge edge : sourceVertices.values()) {
          edge.startEventBuffering();
        }
  
        // assign to local variable of LinkedHashMap to make sure that changing
        // type of task causes compile error. We depend on LinkedHashMap for order
        LinkedHashMap<TezTaskID, Task> currentTasks = this.tasks;
        Iterator<Map.Entry<TezTaskID, Task>> iter = currentTasks.entrySet()
            .iterator();
        int i = 0;
        while (iter.hasNext()) {
          i++;
          Map.Entry<TezTaskID, Task> entry = iter.next();
          Task task = entry.getValue();
          if (task.getState() != TaskState.NEW) {
            LOG.warn(
                "All tasks must be in initial state when changing parallelism"
                    + " for vertex: " + getVertexId() + " name: " + getName());
            return false;
          }
          if (i <= parallelism) {
            continue;
          }
          LOG.info("Removing task: " + entry.getKey());
          iter.remove();
        }
        LOG.info("Vertex " + logIdentifier + 
            " parallelism set to " + parallelism + " from " + numTasks);
        this.numTasks = parallelism;
        assert tasks.size() == numTasks;
  
        // set new edge managers
        if(sourceEdgeManagers != null) {
          for(Map.Entry<String, EdgeManagerDescriptor> entry : sourceEdgeManagers.entrySet()) {
            LOG.info("Replacing edge manager for source:"
                + entry.getKey() + " destination: " + getVertexId());
            Vertex sourceVertex = appContext.getCurrentDAG().getVertex(entry.getKey());
            Edge edge = sourceVertices.get(sourceVertex);
            try {
              edge.setCustomEdgeManager(entry.getValue());
            } catch (Exception e) {
              LOG.warn("Failed to initialize edge manager for edge"
                  + ", sourceVertexName=" + sourceVertex.getName()
                  + ", destinationVertexName=" + edge.getDestinationVertexName(),
                  e);
              return false;
            }
          }
        }

        VertexParallelismUpdatedEvent parallelismUpdatedEvent =
            new VertexParallelismUpdatedEvent(vertexId, numTasks,
                vertexLocationHint,
                sourceEdgeManagers, rootInputSpecUpdates);
        appContext.getHistoryHandler().handle(new DAGHistoryEvent(getDAGId(),
            parallelismUpdatedEvent));

        // stop buffering events
        for (Edge edge : sourceVertices.values()) {
          edge.stopEventBuffering();
        }
      }
      
      for (Map.Entry<Vertex, Edge> entry : targetVertices.entrySet()) {
        Edge edge = entry.getValue();
        if (edge.getEdgeProperty().getDataMovementType() 
            == DataMovementType.ONE_TO_ONE) {
          // inform these target vertices that we have changed parallelism
          VertexEventOneToOneSourceSplit event = 
              new VertexEventOneToOneSourceSplit(entry.getKey().getVertexId(),
                  getVertexId(),
                  ((originalOneToOneSplitSource!=null) ? 
                      originalOneToOneSplitSource : getVertexId()), 
                  numTasks);
          getEventHandler().handle(event);
        }
      }

    } finally {
      writeLock.unlock();
    }
    
    return true;
  }