private List pruneMultiThread()

in core/src/main/java/org/apache/carbondata/core/index/TableIndex.java [277:465]


  private List<ExtendedBlocklet> pruneMultiThread(List<Segment> segments,
      final IndexFilter filter, List<ExtendedBlocklet> blocklets,
      final Map<Segment, List<Index>> indexes, int totalFiles) {
    /*
     *********************************************************************************
     * Below is the example of how this part of code works.
     * consider a scenario of having 5 segments, 10 indexes in each segment,
     * and each index has one record. So total 50 records.
     *
     * indexes in each segment looks like below.
     * s0 [0-9], s1 [0-9], s2 [0-9], s3[0-9], s4[0-9]
     *
     * If number of threads are 4. so filesPerEachThread = 50/4 = 12 files per each thread.
     *
     * SegmentIndexGroup look like below: [SegmentId, fromIndex, toIndex]
     * In each segment only those indexes are processed between fromIndex and toIndex.
     *
     * Final result will be: (4 list created as numOfThreadsForPruning is 4)
     * Thread1 list: s0 [0-9], s1 [0-1]  : 12 files
     * Thread2 list: s1 [2-9], s2 [0-3]  : 12 files
     * Thread3 list: s2 [4-9], s3 [0-5]  : 12 files
     * Thread4 list: s3 [6-9], s4 [0-9]  : 14 files
     * so each thread will process almost equal number of records.
     *
     *********************************************************************************
     */

    int numOfThreadsForPruning = CarbonProperties.getNumOfThreadsForPruning();
    int filesPerEachThread = totalFiles / numOfThreadsForPruning;
    int prev;
    int filesCount = 0;
    int processedFileCount = 0;
    List<List<SegmentIndexGroup>> indexListForEachThread =
        new ArrayList<>(numOfThreadsForPruning);
    List<SegmentIndexGroup> segmentIndexGroupList = new ArrayList<>();
    Set<String> missingSISegments = filter.getMissingSISegments();
    for (Segment segment : segments) {
      List<Index> eachSegmentIndexList = indexes.get(segment);
      prev = 0;
      for (int i = 0; i < eachSegmentIndexList.size(); i++) {
        Index index = eachSegmentIndexList.get(i);
        filesCount += index.getNumberOfEntries();
        if (filesCount >= filesPerEachThread) {
          if (indexListForEachThread.size() != numOfThreadsForPruning - 1) {
            // not the last segmentList
            segmentIndexGroupList.add(new SegmentIndexGroup(segment, prev, i));
            // save the last value to process in next thread
            prev = i + 1;
            indexListForEachThread.add(segmentIndexGroupList);
            segmentIndexGroupList = new ArrayList<>();
            processedFileCount += filesCount;
            filesCount = 0;
          } else {
            // add remaining in the end
            processedFileCount += filesCount;
            filesCount = 0;
          }
        }
      }
      if (prev == 0 || prev != eachSegmentIndexList.size()) {
        // if prev == 0. Add a segment's all indexes
        // eachSegmentIndexList.size() != prev, adding the last remaining indexes of this segment
        segmentIndexGroupList
            .add(new SegmentIndexGroup(segment, prev, eachSegmentIndexList.size() - 1));
      }
    }
    // adding the last segmentList data
    indexListForEachThread.add(segmentIndexGroupList);
    processedFileCount += filesCount;
    if (processedFileCount != totalFiles) {
      // this should not happen
      throw new RuntimeException(" not all the files processed ");
    }
    if (indexListForEachThread.size() < numOfThreadsForPruning) {
      // If the total indexes fitted in lesser number of threads than numOfThreadsForPruning.
      // Launch only that many threads where indexes are fitted while grouping.
      LOG.info("indexes is distributed in " + indexListForEachThread.size() + " threads");
      numOfThreadsForPruning = indexListForEachThread.size();
    }
    LOG.info(
        "Number of threads selected for multi-thread block pruning is " + numOfThreadsForPruning
            + ". total files: " + totalFiles + ". total segments: " + segments.size());
    List<Future<Void>> results = new ArrayList<>(numOfThreadsForPruning);
    final Map<Segment, List<ExtendedBlocklet>> prunedBlockletMap =
        new ConcurrentHashMap<>(segments.size());
    final ExecutorService executorService = Executors.newFixedThreadPool(numOfThreadsForPruning);
    final String threadName = Thread.currentThread().getName();
    for (int i = 0; i < numOfThreadsForPruning; i++) {
      final List<SegmentIndexGroup> segmentIndexGroups = indexListForEachThread.get(i);
      results.add(executorService.submit(new Callable<Void>() {
        @Override
        public Void call() throws IOException {
          Thread.currentThread().setName(threadName);
          for (SegmentIndexGroup segmentIndexGroup : segmentIndexGroups) {
            List<ExtendedBlocklet> pruneBlocklets = new ArrayList<>();
            List<Index> indexList = indexes.get(segmentIndexGroup.getSegment());
            SegmentProperties segmentProperties =
                segmentPropertiesFetcher.getSegmentPropertiesFromIndex(indexList.get(0));
            Segment segment = segmentIndexGroup.getSegment();
            boolean isExternalOrMissingSISegment = segment.getSegmentPath() != null ||
                (missingSISegments != null && missingSISegments.contains(segment.getSegmentNo()));
            if (filter.isResolvedOnSegment(segmentProperties)) {
              FilterExecutor filterExecutor;
              if (!isExternalOrMissingSISegment) {
                filterExecutor = FilterUtil
                    .getFilterExecutorTree(filter.getResolver(), segmentProperties, null,
                        table.getMinMaxCacheColumns(segmentProperties), false);
              } else {
                filterExecutor = FilterUtil
                    .getFilterExecutorTree(filter.getExternalSegmentResolver(), segmentProperties,
                        null, table.getMinMaxCacheColumns(segmentProperties), false);
              }
              for (int i = segmentIndexGroup.getFromIndex();
                   i <= segmentIndexGroup.getToIndex(); i++) {
                List<Blocklet> dmPruneBlocklets;
                if (!isExternalOrMissingSISegment) {
                  dmPruneBlocklets = indexList.get(i)
                      .prune(filter.getResolver(), segmentProperties, filterExecutor, table);
                } else {
                  dmPruneBlocklets = indexList.get(i)
                      .prune(filter.getExternalSegmentResolver(), segmentProperties, filterExecutor,
                          table);
                }
                pruneBlocklets.addAll(addSegmentId(
                    blockletDetailsFetcher.getExtendedBlocklets(dmPruneBlocklets, segment),
                    segment));
              }
            } else {
              Expression filterExpression = filter.getNewCopyOfExpression();
              FilterExecutor filterExecutor;
              if (!isExternalOrMissingSISegment) {
                filterExecutor = FilterUtil.getFilterExecutorTree(
                    new IndexFilter(segmentProperties, table, filterExpression).getResolver(),
                    segmentProperties, null, table.getMinMaxCacheColumns(segmentProperties), false);
              } else {
                filterExecutor = FilterUtil.getFilterExecutorTree(
                    new IndexFilter(segmentProperties, table, filterExpression)
                        .getExternalSegmentResolver(), segmentProperties, null,
                    table.getMinMaxCacheColumns(segmentProperties), false);
              }
              for (int i = segmentIndexGroup.getFromIndex();
                   i <= segmentIndexGroup.getToIndex(); i++) {
                List<Blocklet> dmPruneBlocklets;
                if (!isExternalOrMissingSISegment) {
                  dmPruneBlocklets = indexList.get(i)
                      .prune(filterExpression, segmentProperties, table, filterExecutor);
                } else {
                  dmPruneBlocklets = indexList.get(i)
                      .prune(filter.getExternalSegmentFilter(), segmentProperties, table,
                          filterExecutor);
                }
                pruneBlocklets.addAll(addSegmentId(
                    blockletDetailsFetcher.getExtendedBlocklets(dmPruneBlocklets, segment),
                    segment));
              }
            }
            synchronized (prunedBlockletMap) {
              List<ExtendedBlocklet> pruneBlockletsExisting =
                  prunedBlockletMap.get(segmentIndexGroup.getSegment());
              if (pruneBlockletsExisting != null) {
                pruneBlockletsExisting.addAll(pruneBlocklets);
              } else {
                prunedBlockletMap.put(segmentIndexGroup.getSegment(), pruneBlocklets);
              }
            }
          }
          return null;
        }
      }));
    }
    executorService.shutdown();
    try {
      executorService.awaitTermination(2, TimeUnit.HOURS);
    } catch (InterruptedException e) {
      LOG.error("Error in pruning index in multi-thread: " + e.getMessage());
    }
    // check for error
    for (Future<Void> result : results) {
      try {
        result.get();
      } catch (InterruptedException | ExecutionException e) {
        throw new RuntimeException(e);
      }
    }
    for (Map.Entry<Segment, List<ExtendedBlocklet>> entry : prunedBlockletMap.entrySet()) {
      blocklets.addAll(entry.getValue());
    }
    return blocklets;
  }