public static void scheduleFragmentsForJoinQuery()

in tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java [72:211]


  public static void scheduleFragmentsForJoinQuery(TaskSchedulerContext schedulerContext, SubQuery subQuery)
      throws IOException {
    MasterPlan masterPlan = subQuery.getMasterPlan();
    ExecutionBlock execBlock = subQuery.getBlock();
    QueryMasterTask.QueryMasterTaskContext masterContext = subQuery.getContext();
    AbstractStorageManager storageManager = subQuery.getStorageManager();

    ScanNode[] scans = execBlock.getScanNodes();

    Path tablePath;
    FileFragment[] fragments = new FileFragment[2];
    long[] stats = new long[2];

    // initialize variables from the child operators
    for (int i = 0; i < 2; i++) {
      TableDesc tableDesc = masterContext.getTableDescMap().get(scans[i].getCanonicalName());
      if (tableDesc == null) { // if it is a real table stored on storage
        // TODO - to be fixed (wrong directory)
        ExecutionBlock [] childBlocks = new ExecutionBlock[2];
        childBlocks[0] = masterPlan.getChild(execBlock.getId(), 0);
        childBlocks[1] = masterPlan.getChild(execBlock.getId(), 1);

        tablePath = storageManager.getTablePath(scans[i].getTableName());
        stats[i] = masterContext.getSubQuery(childBlocks[i].getId()).getResultStats().getNumBytes();
        fragments[i] = new FileFragment(scans[i].getCanonicalName(), tablePath, 0, 0, new String[]{UNKNOWN_HOST});
      } else {
        tablePath = tableDesc.getPath();
        try {
          stats[i] = GlobalPlanner.computeDescendentVolume(scans[i]);
        } catch (PlanningException e) {
          throw new IOException(e);
        }

        // if table has no data, storageManager will return empty FileFragment.
        // So, we need to handle FileFragment by its size.
        // If we don't check its size, it can cause IndexOutOfBoundsException.
        List<FileFragment> fileFragments = storageManager.getSplits(scans[i].getCanonicalName(), tableDesc.getMeta(), tableDesc.getSchema(), tablePath);
        if (fileFragments.size() > 0) {
          fragments[i] = fileFragments.get(0);
        } else {
          fragments[i] = new FileFragment(scans[i].getCanonicalName(), tablePath, 0, 0, new String[]{UNKNOWN_HOST});
        }
      }
    }

    LOG.info(String.format("Left Volume: %d, Right Volume: %d", stats[0], stats[1]));

    // If one of inner join tables has no input data,
    // it should return zero rows.
    JoinNode joinNode = PlannerUtil.findMostBottomNode(execBlock.getPlan(), NodeType.JOIN);
    if (joinNode != null) {
      if ( (joinNode.getJoinType().equals(JoinType.INNER)) && (stats[0] == 0 || stats[1] == 0)) {
        return;
      }
    }

    // Assigning either fragments or fetch urls to query units
    boolean leftSmall = execBlock.isBroadcastTable(scans[0].getCanonicalName());
    boolean rightSmall = execBlock.isBroadcastTable(scans[1].getCanonicalName());

    if (leftSmall && rightSmall) {
      LOG.info("[Distributed Join Strategy] : Immediate Two Way Join on Single Machine");
      SubQuery.scheduleFragment(subQuery, fragments[0], fragments[1]);
      schedulerContext.setEstimatedTaskNum(1);
    } else if (leftSmall ^ rightSmall) {
      int broadcastIdx = leftSmall ? 0 : 1;
      int baseScanIdx = leftSmall ? 1 : 0;
      LOG.info(String.format("[BRDCAST JOIN] base_table=%s, base_volume=%d",
          scans[baseScanIdx].getCanonicalName(), stats[baseScanIdx]));
      scheduleLeafTasksWithBroadcastTable(schedulerContext, subQuery, baseScanIdx, fragments[broadcastIdx]);
    } else {
      LOG.info("[Distributed Join Strategy] : Symmetric Repartition Join");
      // The hash map is modeling as follows:
      // <Part Id, <Table Name, Intermediate Data>>
      Map<Integer, Map<String, List<IntermediateEntry>>> hashEntries = new HashMap<Integer, Map<String, List<IntermediateEntry>>>();

      // Grouping IntermediateData by a partition key and a table name
      for (ScanNode scan : scans) {
        SubQuery childSubQuery = masterContext.getSubQuery(TajoIdUtils.createExecutionBlockId(scan.getCanonicalName()));
        for (QueryUnit task : childSubQuery.getQueryUnits()) {
          if (task.getIntermediateData() != null) {
            for (IntermediateEntry intermEntry : task.getIntermediateData()) {
              if (hashEntries.containsKey(intermEntry.getPartId())) {
                Map<String, List<IntermediateEntry>> tbNameToInterm =
                    hashEntries.get(intermEntry.getPartId());

                if (tbNameToInterm.containsKey(scan.getCanonicalName())) {
                  tbNameToInterm.get(scan.getCanonicalName()).add(intermEntry);
                } else {
                  tbNameToInterm.put(scan.getCanonicalName(), TUtil.newList(intermEntry));
                }
              } else {
                Map<String, List<IntermediateEntry>> tbNameToInterm =
                    new HashMap<String, List<IntermediateEntry>>();
                tbNameToInterm.put(scan.getCanonicalName(), TUtil.newList(intermEntry));
                hashEntries.put(intermEntry.getPartId(), tbNameToInterm);
              }
            }
          }
        }
      }

      // hashEntries can be zero if there are no input data.
      // In the case, it will cause the zero divided exception.
      // it avoids this problem.
      int [] avgSize = new int[2];
      avgSize[0] = hashEntries.size() == 0 ? 0 : (int) (stats[0] / hashEntries.size());
      avgSize[1] = hashEntries.size() == 0 ? 0 : (int) (stats[1] / hashEntries.size());
      int bothFetchSize = avgSize[0] + avgSize[1];

      // Getting the desire number of join tasks according to the volumn
      // of a larger table
      int largerIdx = stats[0] >= stats[1] ? 0 : 1;
      int desireJoinTaskVolumn = subQuery.getContext().getConf().
          getIntVar(ConfVars.DIST_QUERY_JOIN_TASK_VOLUME);

      // calculate the number of tasks according to the data size
      int mb = (int) Math.ceil((double)stats[largerIdx] / 1048576);
      LOG.info("Larger intermediate data is approximately " + mb + " MB");
      // determine the number of task per 64MB
      int maxTaskNum = (int) Math.ceil((double)mb / desireJoinTaskVolumn);
      LOG.info("The calculated number of tasks is " + maxTaskNum);
      LOG.info("The number of total shuffle keys is " + hashEntries.size());
      // the number of join tasks cannot be larger than the number of
      // distinct partition ids.
      int joinTaskNum = Math.min(maxTaskNum, hashEntries.size());
      LOG.info("The determined number of join tasks is " + joinTaskNum);

      SubQuery.scheduleFragment(subQuery, fragments[0], fragments[1]);

      // Assign partitions to tasks in a round robin manner.
      for (Entry<Integer, Map<String, List<IntermediateEntry>>> entry
          : hashEntries.entrySet()) {
        addJoinShuffle(subQuery, entry.getKey(), entry.getValue());
      }

      schedulerContext.setTaskSize((int) Math.ceil((double) bothFetchSize / joinTaskNum));
      schedulerContext.setEstimatedTaskNum(joinTaskNum);
    }
  }