in core/src/main/java/org/apache/carbondata/core/scan/scanner/impl/BlockletFilterScanner.java [172:329]
private BlockletScannedResult executeFilter(RawBlockletColumnChunks rawBlockletColumnChunks)
throws FilterUnsupportedException, IOException {
long startTime = System.currentTimeMillis();
// set the indexed data if it has any during fgIndex pruning.
BitSetGroup fgBitSetGroup = rawBlockletColumnChunks.getDataBlock().getIndexedData();
rawBlockletColumnChunks.setBitSetGroup(fgBitSetGroup);
// apply filter on actual data, for each page
BitSetGroup bitSetGroup = this.filterExecutor.applyFilter(rawBlockletColumnChunks,
useBitSetPipeLine);
// if filter result is empty then return with empty result
if (bitSetGroup.isEmpty()) {
CarbonUtil.freeMemory(rawBlockletColumnChunks.getDimensionRawColumnChunks(),
rawBlockletColumnChunks.getMeasureRawColumnChunks());
addQueryStatistic(startTime, bitSetGroup.getScannedPages());
return createEmptyResult();
}
BlockletScannedResult scannedResult =
new FilterQueryScannedResult(blockExecutionInfo, queryStatisticsModel);
scannedResult.setBlockletId(blockExecutionInfo.getBlockIdString(),
String.valueOf(rawBlockletColumnChunks.getDataBlock().blockletIndex()));
// valid scanned blocklet
QueryStatistic validScannedBlockletStatistic = queryStatisticsModel.getStatisticsTypeAndObjMap()
.get(QueryStatisticsConstants.VALID_SCAN_BLOCKLET_NUM);
validScannedBlockletStatistic
.addCountStatistic(QueryStatisticsConstants.VALID_SCAN_BLOCKLET_NUM,
validScannedBlockletStatistic.getCount() + 1);
// adding statistics for valid number of pages
QueryStatistic validPages = queryStatisticsModel.getStatisticsTypeAndObjMap()
.get(QueryStatisticsConstants.VALID_PAGE_SCANNED);
validPages.addCountStatistic(QueryStatisticsConstants.VALID_PAGE_SCANNED,
validPages.getCount() + bitSetGroup.getValidPages());
QueryStatistic scannedBlocklets = queryStatisticsModel.getStatisticsTypeAndObjMap()
.get(QueryStatisticsConstants.BLOCKLET_SCANNED_NUM);
scannedBlocklets.addCountStatistic(QueryStatisticsConstants.BLOCKLET_SCANNED_NUM,
scannedBlocklets.getCount() + 1);
QueryStatistic scannedPages = queryStatisticsModel.getStatisticsTypeAndObjMap()
.get(QueryStatisticsConstants.PAGE_SCANNED);
scannedPages.addCountStatistic(QueryStatisticsConstants.PAGE_SCANNED,
scannedPages.getCount() + bitSetGroup.getScannedPages());
int[] pageFilteredRowCount = new int[bitSetGroup.getNumberOfPages()];
// get the row indexes from bit set for each page
int[][] pageFilteredRowId = new int[bitSetGroup.getNumberOfPages()][];
int numPages = pageFilteredRowId.length;
for (int pageId = 0; pageId < numPages; pageId++) {
BitSet bitSet = bitSetGroup.getBitSet(pageId);
if (bitSet != null && !bitSet.isEmpty()) {
int[] matchedRowId = new int[bitSet.cardinality()];
int index = 0;
for (int i = bitSet.nextSetBit(0); i >= 0; i = bitSet.nextSetBit(i + 1)) {
matchedRowId[index++] = i;
}
pageFilteredRowCount[pageId] = matchedRowId.length;
pageFilteredRowId[pageId] = matchedRowId;
}
}
FileReader fileReader = rawBlockletColumnChunks.getFileReader();
DimensionRawColumnChunk[] dimensionRawColumnChunks =
new DimensionRawColumnChunk[blockExecutionInfo.getTotalNumberDimensionToRead()];
int numDimensionChunks = dimensionRawColumnChunks.length;
// read dimension chunk blocks from file which is not present
for (int chunkIndex = 0; chunkIndex < numDimensionChunks; chunkIndex++) {
dimensionRawColumnChunks[chunkIndex] =
rawBlockletColumnChunks.getDimensionRawColumnChunks()[chunkIndex];
}
//dimensionReadTime is the time required to read the data from dimension array
long totalReadTime = System.currentTimeMillis();
int[][] allSelectedDimensionColumnIndexRange =
blockExecutionInfo.getAllSelectedDimensionColumnIndexRange();
DimensionRawColumnChunk[] projectionListDimensionChunk = rawBlockletColumnChunks.getDataBlock()
.readDimensionChunks(fileReader, allSelectedDimensionColumnIndexRange);
totalReadTime = System.currentTimeMillis() - totalReadTime;
for (int[] columnIndexRange : allSelectedDimensionColumnIndexRange) {
System.arraycopy(projectionListDimensionChunk, columnIndexRange[0],
dimensionRawColumnChunks, columnIndexRange[0],
columnIndexRange[1] + 1 - columnIndexRange[0]);
}
/*
* Below code is to read the dimension which is not read as part of filter or projection
* for example in case of or filter if first filter matches all the rows then it will not read
* second filter column and if it is present as part of projection, so needs to be read
*/
long filterDimensionReadTime = System.currentTimeMillis();
int[] projectionListDimensionIndexes = blockExecutionInfo.getProjectionListDimensionIndexes();
for (int projectionListDimensionIndex : projectionListDimensionIndexes) {
if (null == dimensionRawColumnChunks[projectionListDimensionIndex]) {
dimensionRawColumnChunks[projectionListDimensionIndex] =
rawBlockletColumnChunks.getDataBlock().readDimensionChunk(
fileReader, projectionListDimensionIndex);
}
}
totalReadTime += System.currentTimeMillis() - filterDimensionReadTime;
DimensionColumnPage[][] dimensionColumnPages =
new DimensionColumnPage[numDimensionChunks][numPages];
MeasureRawColumnChunk[] measureRawColumnChunks =
new MeasureRawColumnChunk[blockExecutionInfo.getTotalNumberOfMeasureToRead()];
int numMeasureChunks = measureRawColumnChunks.length;
// read the measure chunk blocks which is not present
for (int chunkIndex = 0; chunkIndex < numMeasureChunks; chunkIndex++) {
if (null != rawBlockletColumnChunks.getMeasureRawColumnChunks()[chunkIndex]) {
measureRawColumnChunks[chunkIndex] =
rawBlockletColumnChunks.getMeasureRawColumnChunks()[chunkIndex];
}
}
long measureReadTime = System.currentTimeMillis();
int[][] allSelectedMeasureColumnIndexRange =
blockExecutionInfo.getAllSelectedMeasureIndexRange();
MeasureRawColumnChunk[] projectionListMeasureChunk = rawBlockletColumnChunks.getDataBlock()
.readMeasureChunks(fileReader, allSelectedMeasureColumnIndexRange);
measureReadTime = System.currentTimeMillis() - measureReadTime;
for (int[] columnIndexRange : allSelectedMeasureColumnIndexRange) {
System.arraycopy(projectionListMeasureChunk, columnIndexRange[0], measureRawColumnChunks,
columnIndexRange[0], columnIndexRange[1] + 1 - columnIndexRange[0]);
}
/*
* Below code is to read the measure which is not read as part of filter or projection
* for example in case of or filter if first filter matches all the rows then it will not read
* second filter column and if it is present as part of projection, so needs to be read
*/
long filterMeasureReadTime = System.currentTimeMillis();
int[] projectionListMeasureIndexes = blockExecutionInfo.getProjectionListMeasureIndexes();
for (int projectionListMeasureIndex : projectionListMeasureIndexes) {
if (null == measureRawColumnChunks[projectionListMeasureIndex]) {
measureRawColumnChunks[projectionListMeasureIndex] = rawBlockletColumnChunks.getDataBlock()
.readMeasureChunk(fileReader, projectionListMeasureIndex);
}
}
measureReadTime += System.currentTimeMillis() - filterMeasureReadTime;
totalReadTime += measureReadTime;
ColumnPage[][] measureColumnPages = new ColumnPage[numMeasureChunks][numPages];
scannedResult.setDimensionColumnPages(dimensionColumnPages);
scannedResult.setPageFilteredRowId(pageFilteredRowId);
scannedResult.setMeasureColumnPages(measureColumnPages);
scannedResult.setDimRawColumnChunks(dimensionRawColumnChunks);
scannedResult.setMsrRawColumnChunks(measureRawColumnChunks);
scannedResult.setPageFilteredRowCount(pageFilteredRowCount);
scannedResult.fillDataChunks();
// adding statistics for carbon scan time
QueryStatistic scanTime = queryStatisticsModel.getStatisticsTypeAndObjMap()
.get(QueryStatisticsConstants.SCAN_BLOCKlET_TIME);
scanTime.addCountStatistic(QueryStatisticsConstants.SCAN_BLOCKlET_TIME,
scanTime.getCount() + (System.currentTimeMillis() - startTime - totalReadTime));
QueryStatistic readTime = queryStatisticsModel.getStatisticsTypeAndObjMap()
.get(QueryStatisticsConstants.READ_BLOCKlET_TIME);
readTime.addCountStatistic(QueryStatisticsConstants.READ_BLOCKlET_TIME,
readTime.getCount() + totalReadTime);
return scannedResult;
}