private boolean tryReconfigure()

in tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/FairCartesianProductVertexManager.java [371:487]


  private boolean tryReconfigure() throws IOException {
    if (numCPSrcNotInConfigureState > 0) {
      return false;
    }

    for (Source src : sourcesByName.values()) {
      if (src.getNumTask() == 0) {
        parallelism = 0;
        reconfigureWithZeroTask();
        return true;
      }
    }

    if (config.hasGroupingFraction() && config.getGroupingFraction() > 0) {
      // every src vertex must complete a certain number of task before we do estimation
      for (SrcVertex srcV : srcVerticesByName.values()) {
        if (srcV.taskCompleted.getCardinality() < srcV.numTask
          && (srcV.numTask * config.getGroupingFraction() > srcV.taskCompleted.getCardinality()
          || srcV.numRecord == 0)) {
          return false;
        }
      }
    } else {
      // every src vertex must generate enough output records before we do estimation
      // or all its tasks already finish but we cannot get enough result for estimation
      for (SrcVertex srcV : srcVerticesByName.values()) {
        if (srcV.numRecord < minNumRecordForEstimation
          && srcV.taskWithVMEvent.getCardinality() < srcV.numTask) {
          return false;
        }
      }
    }

    LOG.info("Start reconfiguring vertex " + getContext().getVertexName()
      + ", max parallelism: " + maxParallelism
      + ", min-ops-per-worker: " + minOpsPerWorker
      + ", num partition: " + numPartitions);
    for (Source src : sourcesByName.values()) {
      LOG.info(src.toString());
    }

    long totalOps = 1;
    for (Source src : sourcesByName.values()) {
      src.numRecord = src.estimateNumRecord();
      if (src.numRecord == 0) {
        LOG.info("Set parallelism to 0 because source " + src.name + " has 0 output recorc");
        reconfigureWithZeroTask();
        return true;
      }

      try {
        totalOps  = LongMath.checkedMultiply(totalOps, src.numRecord);
      } catch (ArithmeticException e) {
        LOG.info("totalOps exceeds " + Long.MAX_VALUE + ", capping to " + Long.MAX_VALUE);
        totalOps = Long.MAX_VALUE;
      }
    }

    // determine initial parallelism
    if (totalOps / minOpsPerWorker >= maxParallelism) {
      parallelism = maxParallelism;
    } else {
      parallelism = (int) ((totalOps + minOpsPerWorker - 1) / minOpsPerWorker);
    }
    LOG.info("Total ops " + totalOps + ", initial parallelism " + parallelism);

    if (enableGrouping) {
      determineNumChunks(sourcesByName, parallelism);
    } else {
      for (Source src : sourcesByName.values()) {
        src.numChunk = src.getSrcVertexWithMostOutput().numTask;
      }
    }

    // final parallelism will be product of all #chunk
    parallelism = 1;
    for (Source src : sourcesByName.values()) {
      parallelism *= src.numChunk;
    }

    LOG.info("After reconfigure, final parallelism " + parallelism);
    for (Source src : sourcesByName.values()) {
      LOG.info(src.toString());
    }

    for (int i = 0; i < numChunksPerSrc.length; i++) {
      numChunksPerSrc[i] = sourcesByName.get(sourceList.get(i)).numChunk;
    }

    CartesianProductConfigProto.Builder builder = CartesianProductConfigProto.newBuilder(config);
    builder.addAllNumChunks(Ints.asList(this.numChunksPerSrc));

    Map<String, EdgeProperty> edgeProperties = getContext().getInputVertexEdgeProperties();
    Iterator<Map.Entry<String, EdgeProperty>> iter = edgeProperties.entrySet().iterator();
    while (iter.hasNext()) {
      Map.Entry<String, EdgeProperty> e = iter.next();
      if (e.getValue().getDataMovementType() != CUSTOM) {
        iter.remove();
      }
    }

    // send out vertex group info for computing physical input id of destination task
    for (Source src : sourcesByName.values()) {
      builder.clearNumTaskPerVertexInGroup();
      for (int i = 0; i < src.srcVertices.size(); i++) {
        SrcVertex srcV = src.srcVertices.get(i);
        builder.setPositionInGroup(i);
        edgeProperties.get(srcV.name).getEdgeManagerDescriptor()
          .setUserPayload(UserPayload.create(ByteBuffer.wrap(builder.build().toByteArray())));
        builder.addNumTaskPerVertexInGroup(srcV.numTask);
      }
    }
    getContext().reconfigureVertex(parallelism, null, edgeProperties);
    vertexReconfigured = true;
    getContext().doneReconfiguringVertex();
    return true;
  }