private void setParallelismWrapper()

in tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java [1870:2017]


  private void setParallelismWrapper(int parallelism, VertexLocationHint vertexLocationHint,
      Map<String, EdgeProperty> sourceEdgeProperties,
      Map<String, InputSpecUpdate> rootInputSpecUpdates,
      boolean fromVertexManager) throws AMUserCodeException {
    Preconditions.checkArgument(parallelism >= 0, "Parallelism must be >=0. Value: " + parallelism
        + " for vertex: " + logIdentifier);
    writeLock.lock();
    this.setParallelismCalledFlag = true;
    try {
      // disallow changing things after a vertex has started
      if (!tasksNotYetScheduled) {
        String msg = "setParallelism cannot be called after scheduling tasks. Vertex: "
            + getLogIdentifier();
        LOG.info(msg);
        throw new TezUncheckedException(msg);
      }

      if (fromVertexManager && canInitVertex()) {
        // vertex is fully defined. setParallelism has been called. VertexManager should have 
        // informed us about this. Otherwise we would have notified listeners that we are fully 
        // defined before we are actually fully defined
        Preconditions
            .checkState(
                vertexToBeReconfiguredByManager,
                "Vertex is fully configured but still"
                    + " the reconfiguration API has been called. VertexManager must notify the framework using "
                    + " context.vertexReconfigurationPlanned() before re-configuring the vertex."
                    + " vertexId=" + logIdentifier);
      }
      
      // Input initializer/Vertex Manager/1-1 split expected to set parallelism.
      if (numTasks == -1) {
        if (getState() != VertexState.INITIALIZING) {
          throw new TezUncheckedException(
              "Vertex state is not Initializing. Value: " + getState()
                  + " for vertex: " + logIdentifier);
        }

        if(sourceEdgeProperties != null) {
          for(Map.Entry<String, EdgeProperty> entry : sourceEdgeProperties.entrySet()) {
            LOG.info("Replacing edge manager for source:"
                + entry.getKey() + " destination: " + getLogIdentifier());
            Vertex sourceVertex = appContext.getCurrentDAG().getVertex(entry.getKey());
            Edge edge = sourceVertices.get(sourceVertex);
            try {
              edge.setEdgeProperty(entry.getValue());
            } catch (Exception e) {
              throw new TezUncheckedException("Fail to update EdgeProperty for Edge,"
                  + "sourceVertex:" + edge.getSourceVertexName()
                  + "destinationVertex:" + edge.getDestinationVertexName(), e);
            }
          }
        }
        if (rootInputSpecUpdates != null) {
          LOG.info("Got updated RootInputsSpecs: " + rootInputSpecUpdates.toString());
          // Sanity check for correct number of updates.
          for (Entry<String, InputSpecUpdate> rootInputSpecUpdateEntry : rootInputSpecUpdates
              .entrySet()) {
            Preconditions
                .checkState(
                    rootInputSpecUpdateEntry.getValue().isForAllWorkUnits()
                        || (rootInputSpecUpdateEntry.getValue().getAllNumPhysicalInputs() != null && rootInputSpecUpdateEntry
                            .getValue().getAllNumPhysicalInputs().size() == parallelism),
                    "Not enough input spec updates for root input named "
                        + rootInputSpecUpdateEntry.getKey());
          }
          this.rootInputSpecs.putAll(rootInputSpecUpdates);
        }
        int oldNumTasks = numTasks;
        this.numTasks = parallelism;
        stateChangeNotifier.stateChanged(vertexId,
            new VertexStateUpdateParallelismUpdated(vertexName, numTasks, oldNumTasks));
        this.createTasks();
        setVertexLocationHint(vertexLocationHint);
        LOG.info("Vertex " + getLogIdentifier() +
            " parallelism set to " + parallelism);
        if (canInitVertex()) {
          getEventHandler().handle(new VertexEvent(getVertexId(), VertexEventType.V_READY_TO_INIT));
        }
      } else {
        // This is an artificial restriction since there's no way of knowing whether a VertexManager
        // will attempt to update root input specs. When parallelism has not been initialized, the
        // Vertex will not be in started state so it's safe to update the specifications.
        // TODO TEZ-937 - add e mechanism to query vertex managers, or for VMs to indicate readines
        // for a vertex to start.
        Preconditions.checkState(rootInputSpecUpdates == null,
            "Root Input specs can only be updated when the vertex is configured with -1 tasks");
 
        int oldNumTasks = numTasks;
        
        // start buffering incoming events so that we can re-route existing events
        for (Edge edge : sourceVertices.values()) {
          edge.startEventBuffering();
        }

        if (parallelism == numTasks) {
          LOG.info("setParallelism same as current value: " + parallelism +
              " for vertex: " + logIdentifier);
          Preconditions.checkArgument(sourceEdgeProperties != null,
              "Source edge managers or RootInputSpecs must be set when not changing parallelism");
        } else {
          LOG.info("Resetting vertex location hints due to change in parallelism for vertex: "
              + logIdentifier);
          vertexLocationHint = null;

          if (parallelism > numTasks) {
            addTasks((parallelism));
          } else if (parallelism < numTasks) {
            removeTasks(parallelism);
          }
        }
        Preconditions.checkState(this.numTasks == parallelism, getLogIdentifier());
        
        // set new vertex location hints
        setVertexLocationHint(vertexLocationHint);
        LOG.info("Vertex " + getLogIdentifier() + " parallelism set to " + parallelism + " from "
            + oldNumTasks);
        
        // notify listeners
        stateChangeNotifier.stateChanged(vertexId,
            new VertexStateUpdateParallelismUpdated(vertexName, numTasks, oldNumTasks));
        assert tasks.size() == numTasks;

        // set new edge managers
        if(sourceEdgeProperties != null) {
          for(Map.Entry<String, EdgeProperty> entry : sourceEdgeProperties.entrySet()) {
            LOG.info("Replacing edge manager for source:"
                + entry.getKey() + " destination: " + getLogIdentifier());
            Vertex sourceVertex = appContext.getCurrentDAG().getVertex(entry.getKey());
            Edge edge = sourceVertices.get(sourceVertex);
            try {
              edge.setEdgeProperty(entry.getValue());
            } catch (Exception e) {
              throw new TezUncheckedException(e);
            }
          }
        }

        // stop buffering events
        for (Edge edge : sourceVertices.values()) {
          edge.stopEventBuffering();
        }
      }

    } finally {
      writeLock.unlock();
    }
  }