in tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java [72:211]
public static void scheduleFragmentsForJoinQuery(TaskSchedulerContext schedulerContext, SubQuery subQuery)
throws IOException {
MasterPlan masterPlan = subQuery.getMasterPlan();
ExecutionBlock execBlock = subQuery.getBlock();
QueryMasterTask.QueryMasterTaskContext masterContext = subQuery.getContext();
AbstractStorageManager storageManager = subQuery.getStorageManager();
ScanNode[] scans = execBlock.getScanNodes();
Path tablePath;
FileFragment[] fragments = new FileFragment[2];
long[] stats = new long[2];
// initialize variables from the child operators
for (int i = 0; i < 2; i++) {
TableDesc tableDesc = masterContext.getTableDescMap().get(scans[i].getCanonicalName());
if (tableDesc == null) { // if it is a real table stored on storage
// TODO - to be fixed (wrong directory)
ExecutionBlock [] childBlocks = new ExecutionBlock[2];
childBlocks[0] = masterPlan.getChild(execBlock.getId(), 0);
childBlocks[1] = masterPlan.getChild(execBlock.getId(), 1);
tablePath = storageManager.getTablePath(scans[i].getTableName());
stats[i] = masterContext.getSubQuery(childBlocks[i].getId()).getResultStats().getNumBytes();
fragments[i] = new FileFragment(scans[i].getCanonicalName(), tablePath, 0, 0, new String[]{UNKNOWN_HOST});
} else {
tablePath = tableDesc.getPath();
try {
stats[i] = GlobalPlanner.computeDescendentVolume(scans[i]);
} catch (PlanningException e) {
throw new IOException(e);
}
// if table has no data, storageManager will return empty FileFragment.
// So, we need to handle FileFragment by its size.
// If we don't check its size, it can cause IndexOutOfBoundsException.
List<FileFragment> fileFragments = storageManager.getSplits(scans[i].getCanonicalName(), tableDesc.getMeta(), tableDesc.getSchema(), tablePath);
if (fileFragments.size() > 0) {
fragments[i] = fileFragments.get(0);
} else {
fragments[i] = new FileFragment(scans[i].getCanonicalName(), tablePath, 0, 0, new String[]{UNKNOWN_HOST});
}
}
}
LOG.info(String.format("Left Volume: %d, Right Volume: %d", stats[0], stats[1]));
// If one of inner join tables has no input data,
// it should return zero rows.
JoinNode joinNode = PlannerUtil.findMostBottomNode(execBlock.getPlan(), NodeType.JOIN);
if (joinNode != null) {
if ( (joinNode.getJoinType().equals(JoinType.INNER)) && (stats[0] == 0 || stats[1] == 0)) {
return;
}
}
// Assigning either fragments or fetch urls to query units
boolean leftSmall = execBlock.isBroadcastTable(scans[0].getCanonicalName());
boolean rightSmall = execBlock.isBroadcastTable(scans[1].getCanonicalName());
if (leftSmall && rightSmall) {
LOG.info("[Distributed Join Strategy] : Immediate Two Way Join on Single Machine");
SubQuery.scheduleFragment(subQuery, fragments[0], fragments[1]);
schedulerContext.setEstimatedTaskNum(1);
} else if (leftSmall ^ rightSmall) {
int broadcastIdx = leftSmall ? 0 : 1;
int baseScanIdx = leftSmall ? 1 : 0;
LOG.info(String.format("[BRDCAST JOIN] base_table=%s, base_volume=%d",
scans[baseScanIdx].getCanonicalName(), stats[baseScanIdx]));
scheduleLeafTasksWithBroadcastTable(schedulerContext, subQuery, baseScanIdx, fragments[broadcastIdx]);
} else {
LOG.info("[Distributed Join Strategy] : Symmetric Repartition Join");
// The hash map is modeling as follows:
// <Part Id, <Table Name, Intermediate Data>>
Map<Integer, Map<String, List<IntermediateEntry>>> hashEntries = new HashMap<Integer, Map<String, List<IntermediateEntry>>>();
// Grouping IntermediateData by a partition key and a table name
for (ScanNode scan : scans) {
SubQuery childSubQuery = masterContext.getSubQuery(TajoIdUtils.createExecutionBlockId(scan.getCanonicalName()));
for (QueryUnit task : childSubQuery.getQueryUnits()) {
if (task.getIntermediateData() != null) {
for (IntermediateEntry intermEntry : task.getIntermediateData()) {
if (hashEntries.containsKey(intermEntry.getPartId())) {
Map<String, List<IntermediateEntry>> tbNameToInterm =
hashEntries.get(intermEntry.getPartId());
if (tbNameToInterm.containsKey(scan.getCanonicalName())) {
tbNameToInterm.get(scan.getCanonicalName()).add(intermEntry);
} else {
tbNameToInterm.put(scan.getCanonicalName(), TUtil.newList(intermEntry));
}
} else {
Map<String, List<IntermediateEntry>> tbNameToInterm =
new HashMap<String, List<IntermediateEntry>>();
tbNameToInterm.put(scan.getCanonicalName(), TUtil.newList(intermEntry));
hashEntries.put(intermEntry.getPartId(), tbNameToInterm);
}
}
}
}
}
// hashEntries can be zero if there are no input data.
// In the case, it will cause the zero divided exception.
// it avoids this problem.
int [] avgSize = new int[2];
avgSize[0] = hashEntries.size() == 0 ? 0 : (int) (stats[0] / hashEntries.size());
avgSize[1] = hashEntries.size() == 0 ? 0 : (int) (stats[1] / hashEntries.size());
int bothFetchSize = avgSize[0] + avgSize[1];
// Getting the desire number of join tasks according to the volumn
// of a larger table
int largerIdx = stats[0] >= stats[1] ? 0 : 1;
int desireJoinTaskVolumn = subQuery.getContext().getConf().
getIntVar(ConfVars.DIST_QUERY_JOIN_TASK_VOLUME);
// calculate the number of tasks according to the data size
int mb = (int) Math.ceil((double)stats[largerIdx] / 1048576);
LOG.info("Larger intermediate data is approximately " + mb + " MB");
// determine the number of task per 64MB
int maxTaskNum = (int) Math.ceil((double)mb / desireJoinTaskVolumn);
LOG.info("The calculated number of tasks is " + maxTaskNum);
LOG.info("The number of total shuffle keys is " + hashEntries.size());
// the number of join tasks cannot be larger than the number of
// distinct partition ids.
int joinTaskNum = Math.min(maxTaskNum, hashEntries.size());
LOG.info("The determined number of join tasks is " + joinTaskNum);
SubQuery.scheduleFragment(subQuery, fragments[0], fragments[1]);
// Assign partitions to tasks in a round robin manner.
for (Entry<Integer, Map<String, List<IntermediateEntry>>> entry
: hashEntries.entrySet()) {
addJoinShuffle(subQuery, entry.getKey(), entry.getValue());
}
schedulerContext.setTaskSize((int) Math.ceil((double) bothFetchSize / joinTaskNum));
schedulerContext.setEstimatedTaskNum(joinTaskNum);
}
}