in tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/FairCartesianProductVertexManager.java [371:487]
private boolean tryReconfigure() throws IOException {
if (numCPSrcNotInConfigureState > 0) {
return false;
}
for (Source src : sourcesByName.values()) {
if (src.getNumTask() == 0) {
parallelism = 0;
reconfigureWithZeroTask();
return true;
}
}
if (config.hasGroupingFraction() && config.getGroupingFraction() > 0) {
// every src vertex must complete a certain number of task before we do estimation
for (SrcVertex srcV : srcVerticesByName.values()) {
if (srcV.taskCompleted.getCardinality() < srcV.numTask
&& (srcV.numTask * config.getGroupingFraction() > srcV.taskCompleted.getCardinality()
|| srcV.numRecord == 0)) {
return false;
}
}
} else {
// every src vertex must generate enough output records before we do estimation
// or all its tasks already finish but we cannot get enough result for estimation
for (SrcVertex srcV : srcVerticesByName.values()) {
if (srcV.numRecord < minNumRecordForEstimation
&& srcV.taskWithVMEvent.getCardinality() < srcV.numTask) {
return false;
}
}
}
LOG.info("Start reconfiguring vertex " + getContext().getVertexName()
+ ", max parallelism: " + maxParallelism
+ ", min-ops-per-worker: " + minOpsPerWorker
+ ", num partition: " + numPartitions);
for (Source src : sourcesByName.values()) {
LOG.info(src.toString());
}
long totalOps = 1;
for (Source src : sourcesByName.values()) {
src.numRecord = src.estimateNumRecord();
if (src.numRecord == 0) {
LOG.info("Set parallelism to 0 because source " + src.name + " has 0 output recorc");
reconfigureWithZeroTask();
return true;
}
try {
totalOps = LongMath.checkedMultiply(totalOps, src.numRecord);
} catch (ArithmeticException e) {
LOG.info("totalOps exceeds " + Long.MAX_VALUE + ", capping to " + Long.MAX_VALUE);
totalOps = Long.MAX_VALUE;
}
}
// determine initial parallelism
if (totalOps / minOpsPerWorker >= maxParallelism) {
parallelism = maxParallelism;
} else {
parallelism = (int) ((totalOps + minOpsPerWorker - 1) / minOpsPerWorker);
}
LOG.info("Total ops " + totalOps + ", initial parallelism " + parallelism);
if (enableGrouping) {
determineNumChunks(sourcesByName, parallelism);
} else {
for (Source src : sourcesByName.values()) {
src.numChunk = src.getSrcVertexWithMostOutput().numTask;
}
}
// final parallelism will be product of all #chunk
parallelism = 1;
for (Source src : sourcesByName.values()) {
parallelism *= src.numChunk;
}
LOG.info("After reconfigure, final parallelism " + parallelism);
for (Source src : sourcesByName.values()) {
LOG.info(src.toString());
}
for (int i = 0; i < numChunksPerSrc.length; i++) {
numChunksPerSrc[i] = sourcesByName.get(sourceList.get(i)).numChunk;
}
CartesianProductConfigProto.Builder builder = CartesianProductConfigProto.newBuilder(config);
builder.addAllNumChunks(Ints.asList(this.numChunksPerSrc));
Map<String, EdgeProperty> edgeProperties = getContext().getInputVertexEdgeProperties();
Iterator<Map.Entry<String, EdgeProperty>> iter = edgeProperties.entrySet().iterator();
while (iter.hasNext()) {
Map.Entry<String, EdgeProperty> e = iter.next();
if (e.getValue().getDataMovementType() != CUSTOM) {
iter.remove();
}
}
// send out vertex group info for computing physical input id of destination task
for (Source src : sourcesByName.values()) {
builder.clearNumTaskPerVertexInGroup();
for (int i = 0; i < src.srcVertices.size(); i++) {
SrcVertex srcV = src.srcVertices.get(i);
builder.setPositionInGroup(i);
edgeProperties.get(srcV.name).getEdgeManagerDescriptor()
.setUserPayload(UserPayload.create(ByteBuffer.wrap(builder.build().toByteArray())));
builder.addNumTaskPerVertexInGroup(srcV.numTask);
}
}
getContext().reconfigureVertex(parallelism, null, edgeProperties);
vertexReconfigured = true;
getContext().doneReconfiguringVertex();
return true;
}