integration/presto/src/main/prestodb/org/apache/carbondata/presto/CarbondataPageSource.java [138:324]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
    rowReader = new StreamRecordReader(queryModel, false);
    List<ProjectionDimension> queryDimension = queryModel.getProjectionDimensions();
    List<ProjectionMeasure> queryMeasures = queryModel.getProjectionMeasures();
    fields = new StructField[queryDimension.size() + queryMeasures.size()];
    for (int i = 0; i < queryDimension.size(); i++) {
      ProjectionDimension dim = queryDimension.get(i);
      if (dim.getDimension().isComplex()) {
        fields[dim.getOrdinal()] =
            new StructField(dim.getColumnName(), dim.getDimension().getDataType());
      } else if (dim.getDimension().getDataType() == DataTypes.DATE) {
        DirectDictionaryGenerator generator = DirectDictionaryKeyGeneratorFactory
            .getDirectDictionaryGenerator(dim.getDimension().getDataType());
        fields[dim.getOrdinal()] = new StructField(dim.getColumnName(), generator.getReturnType());
      } else {
        fields[dim.getOrdinal()] =
            new StructField(dim.getColumnName(), dim.getDimension().getDataType());
      }
    }

    for (ProjectionMeasure msr : queryMeasures) {
      DataType dataType = msr.getMeasure().getDataType();
      if (dataType == DataTypes.BOOLEAN || dataType == DataTypes.SHORT || dataType == DataTypes.INT
          || dataType == DataTypes.LONG) {
        fields[msr.getOrdinal()] =
            new StructField(msr.getColumnName(), msr.getMeasure().getDataType());
      } else if (DataTypes.isDecimal(dataType)) {
        fields[msr.getOrdinal()] =
            new StructField(msr.getColumnName(), msr.getMeasure().getDataType());
      } else {
        fields[msr.getOrdinal()] = new StructField(msr.getColumnName(), DataTypes.DOUBLE);
      }
    }

    this.columnCount = columnHandles.size();
    readSupport = new CarbonPrestoDecodeReadSupport();
    readSupport.initialize(queryModel.getProjectionColumns(), queryModel.getTable());
    this.dataTypes = readSupport.getDataTypes();
  }

  @Override
  public long getCompletedBytes() {
    return sizeOfData;
  }

  @Override
  public long getReadTimeNanos() {
    return nanoStart > 0L ? (nanoEnd == 0 ? System.nanoTime() : nanoEnd) - nanoStart : 0L;
  }

  @Override
  public boolean isFinished() {
    return closed;
  }

  @Override
  public Page getNextPage() {
    if (fileFormat.ordinal() == FileFormat.ROW_V1.ordinal()) {
      return getNextPageForRow();
    } else {
      return getNextPageForColumnar();
    }
  }

  private Page getNextPageForColumnar() {
    if (nanoStart == 0) {
      nanoStart = System.nanoTime();
    }
    CarbonVectorBatch columnarBatch = null;
    int batchSize = 0;
    try {
      batchId++;
      if (vectorReader.nextKeyValue()) {
        Object vectorBatch = vectorReader.getCurrentValue();
        if (vectorBatch instanceof CarbonVectorBatch) {
          columnarBatch = (CarbonVectorBatch) vectorBatch;
          batchSize = columnarBatch.numRows();
          if (batchSize == 0) {
            close();
            return null;
          }
        }
      } else {
        close();
        return null;
      }
      if (columnarBatch == null) {
        return null;
      }

      Block[] blocks = new Block[columnHandles.size()];
      for (int column = 0; column < blocks.length; column++) {
        blocks[column] = new LazyBlock(batchSize, new CarbondataBlockLoader(column));
      }
      Page page = new Page(batchSize, blocks);
      return page;
    } catch (PrestoException e) {
      closeWithSuppression(e);
      throw e;
    } catch (RuntimeException e) {
      closeWithSuppression(e);
      throw new CarbonDataLoadingException("Exception when creating the Carbon data Block", e);
    }
  }

  private Page getNextPageForRow() {
    if (isFrstPage) {
      isFrstPage = false;
      initialReaderForRow();
    }

    if (nanoStart == 0) {
      nanoStart = System.nanoTime();
    }
    int count = 0;
    try {
      Block[] blocks = new Block[columnCount];
      CarbonColumnVectorImpl[] columns = new CarbonColumnVectorImpl[columnCount];
      for (int i = 0; i < columnCount; ++i) {
        columns[i] = CarbonVectorBatch
            .createDirectStreamReader(batchSize, dataTypes[i], fields[i]);
      }

      while (rowReader.nextKeyValue()) {
        Object[] values = (Object[]) rowReader.getCurrentValue();
        for (int index = 0; index < columnCount; index++) {
          columns[index].putObject(count, values[index]);
        }
        count++;
        if (count == batchSize) {
          break;
        }
      }
      if (count == 0) {
        close();
        return null;
      } else {
        for (int index = 0; index < columnCount; index++) {
          blocks[index] = ((PrestoVectorBlockBuilder) columns[index]).buildBlock();
          sizeOfData += blocks[index].getSizeInBytes();
        }
      }
      return new Page(count, blocks);
    } catch (PrestoException e) {
      closeWithSuppression(e);
      throw e;
    } catch (RuntimeException | IOException e) {
      closeWithSuppression(e);
      throw new CarbonDataLoadingException("Exception when creating the Carbon data Block", e);
    }
  }

  private void initialReaderForRow() {
    SimpleDateFormat formatter = new SimpleDateFormat("yyyyMMddHHmm");
    String jobTrackerId = formatter.format(new Date());
    TaskAttemptID attemptId = new TaskAttemptID(jobTrackerId, 0, TaskType.MAP, 0, 0);
    TaskAttemptContextImpl attemptContext =
        new TaskAttemptContextImpl(FileFactory.getConfiguration(), attemptId);
    CarbonMultiBlockSplit carbonInputSplit = CarbonLocalMultiBlockSplit
        .convertSplit(split.getSchema().getProperty("carbonSplit"));
    try {
      rowReader.initialize(carbonInputSplit, attemptContext);
    } catch (IOException e) {
      throw new RuntimeException(e);
    }
  }

  @Override
  public long getSystemMemoryUsage() {
    return sizeOfData;
  }

  @Override
  public void close() {
    // some hive input formats are broken and bad things can happen if you close them multiple times
    if (closed) {
      return;
    }
    closed = true;
    try {
      if (vectorReader != null) {
        vectorReader.close();
      }
      if (rowReader != null) {
        rowReader.close();
      }
      nanoEnd = System.nanoTime();
    } catch (Exception e) {
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



integration/presto/src/main/prestosql/org/apache/carbondata/presto/CarbondataPageSource.java [144:330]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
    rowReader = new StreamRecordReader(queryModel, false);
    List<ProjectionDimension> queryDimension = queryModel.getProjectionDimensions();
    List<ProjectionMeasure> queryMeasures = queryModel.getProjectionMeasures();
    fields = new StructField[queryDimension.size() + queryMeasures.size()];
    for (int i = 0; i < queryDimension.size(); i++) {
      ProjectionDimension dim = queryDimension.get(i);
      if (dim.getDimension().isComplex()) {
        fields[dim.getOrdinal()] =
            new StructField(dim.getColumnName(), dim.getDimension().getDataType());
      } else if (dim.getDimension().getDataType() == DataTypes.DATE) {
        DirectDictionaryGenerator generator = DirectDictionaryKeyGeneratorFactory
            .getDirectDictionaryGenerator(dim.getDimension().getDataType());
        fields[dim.getOrdinal()] = new StructField(dim.getColumnName(), generator.getReturnType());
      } else {
        fields[dim.getOrdinal()] =
            new StructField(dim.getColumnName(), dim.getDimension().getDataType());
      }
    }

    for (ProjectionMeasure msr : queryMeasures) {
      DataType dataType = msr.getMeasure().getDataType();
      if (dataType == DataTypes.BOOLEAN || dataType == DataTypes.SHORT || dataType == DataTypes.INT
          || dataType == DataTypes.LONG) {
        fields[msr.getOrdinal()] =
            new StructField(msr.getColumnName(), msr.getMeasure().getDataType());
      } else if (DataTypes.isDecimal(dataType)) {
        fields[msr.getOrdinal()] =
            new StructField(msr.getColumnName(), msr.getMeasure().getDataType());
      } else {
        fields[msr.getOrdinal()] = new StructField(msr.getColumnName(), DataTypes.DOUBLE);
      }
    }

    this.columnCount = columnHandles.size();
    readSupport = new CarbonPrestoDecodeReadSupport();
    readSupport.initialize(queryModel.getProjectionColumns(), queryModel.getTable());
    this.dataTypes = readSupport.getDataTypes();
  }

  @Override
  public long getCompletedBytes() {
    return sizeOfData;
  }

  @Override
  public long getReadTimeNanos() {
    return nanoStart > 0L ? (nanoEnd == 0 ? System.nanoTime() : nanoEnd) - nanoStart : 0L;
  }

  @Override
  public boolean isFinished() {
    return closed;
  }

  @Override
  public Page getNextPage() {
    if (fileFormat.ordinal() == FileFormat.ROW_V1.ordinal()) {
      return getNextPageForRow();
    } else {
      return getNextPageForColumnar();
    }
  }

  private Page getNextPageForColumnar() {
    if (nanoStart == 0) {
      nanoStart = System.nanoTime();
    }
    CarbonVectorBatch columnarBatch = null;
    int batchSize = 0;
    try {
      batchId++;
      if (vectorReader.nextKeyValue()) {
        Object vectorBatch = vectorReader.getCurrentValue();
        if (vectorBatch instanceof CarbonVectorBatch) {
          columnarBatch = (CarbonVectorBatch) vectorBatch;
          batchSize = columnarBatch.numRows();
          if (batchSize == 0) {
            close();
            return null;
          }
        }
      } else {
        close();
        return null;
      }
      if (columnarBatch == null) {
        return null;
      }

      Block[] blocks = new Block[columnHandles.size()];
      for (int column = 0; column < blocks.length; column++) {
        blocks[column] = new LazyBlock(batchSize, new CarbondataBlockLoader(column));
      }
      Page page = new Page(batchSize, blocks);
      return page;
    } catch (PrestoException e) {
      closeWithSuppression(e);
      throw e;
    } catch (RuntimeException e) {
      closeWithSuppression(e);
      throw new CarbonDataLoadingException("Exception when creating the Carbon data Block", e);
    }
  }

  private Page getNextPageForRow() {
    if (isFrstPage) {
      isFrstPage = false;
      initialReaderForRow();
    }

    if (nanoStart == 0) {
      nanoStart = System.nanoTime();
    }
    int count = 0;
    try {
      Block[] blocks = new Block[columnCount];
      CarbonColumnVectorImpl[] columns = new CarbonColumnVectorImpl[columnCount];
      for (int i = 0; i < columnCount; ++i) {
        columns[i] = CarbonVectorBatch
            .createDirectStreamReader(batchSize, dataTypes[i], fields[i]);
      }

      while (rowReader.nextKeyValue()) {
        Object[] values = (Object[]) rowReader.getCurrentValue();
        for (int index = 0; index < columnCount; index++) {
          columns[index].putObject(count, values[index]);
        }
        count++;
        if (count == batchSize) {
          break;
        }
      }
      if (count == 0) {
        close();
        return null;
      } else {
        for (int index = 0; index < columnCount; index++) {
          blocks[index] = ((PrestoVectorBlockBuilder) columns[index]).buildBlock();
          sizeOfData += blocks[index].getSizeInBytes();
        }
      }
      return new Page(count, blocks);
    } catch (PrestoException e) {
      closeWithSuppression(e);
      throw e;
    } catch (RuntimeException | IOException e) {
      closeWithSuppression(e);
      throw new CarbonDataLoadingException("Exception when creating the Carbon data Block", e);
    }
  }

  private void initialReaderForRow() {
    SimpleDateFormat formatter = new SimpleDateFormat("yyyyMMddHHmm");
    String jobTrackerId = formatter.format(new Date());
    TaskAttemptID attemptId = new TaskAttemptID(jobTrackerId, 0, TaskType.MAP, 0, 0);
    TaskAttemptContextImpl attemptContext =
        new TaskAttemptContextImpl(FileFactory.getConfiguration(), attemptId);
    CarbonMultiBlockSplit carbonInputSplit = CarbonLocalMultiBlockSplit
        .convertSplit(split.getSchema().getProperty("carbonSplit"));
    try {
      rowReader.initialize(carbonInputSplit, attemptContext);
    } catch (IOException e) {
      throw new RuntimeException(e);
    }
  }

  @Override
  public long getSystemMemoryUsage() {
    return sizeOfData;
  }

  @Override
  public void close() {
    // some hive input formats are broken and bad things can happen if you close them multiple times
    if (closed) {
      return;
    }
    closed = true;
    try {
      if (vectorReader != null) {
        vectorReader.close();
      }
      if (rowReader != null) {
        rowReader.close();
      }
      nanoEnd = System.nanoTime();
    } catch (Exception e) {
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



