integration/presto/src/main/prestodb/org/apache/carbondata/presto/readers/ComplexTypeStreamReader.java [45:193]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
public class ComplexTypeStreamReader extends CarbonColumnVectorImpl
    implements PrestoVectorBlockBuilder {

  protected int batchSize;

  protected Type type;
  protected BlockBuilder builder;

  public ComplexTypeStreamReader(int batchSize, StructField field) {
    super(batchSize, field.getDataType());
    this.batchSize = batchSize;
    this.type = getType(field);
    List<CarbonColumnVector> childrenList = new ArrayList<>();
    for (StructField child : field.getChildren()) {
      childrenList.add(new ColumnarVectorWrapperDirect(Objects.requireNonNull(
          CarbonVectorBatch.createDirectStreamReader(this.batchSize, child.getDataType(), child))));
    }
    setChildrenVector(childrenList);
    this.builder = type.createBlockBuilder(null, batchSize);
  }

  Type getType(StructField field) {
    DataType dataType = field.getDataType();
    if (dataType == DataTypes.STRING || dataType == DataTypes.VARCHAR) {
      return VarcharType.VARCHAR;
    } else if (dataType == DataTypes.SHORT) {
      return SmallintType.SMALLINT;
    } else if (dataType == DataTypes.INT) {
      return IntegerType.INTEGER;
    } else if (dataType == DataTypes.LONG) {
      return BigintType.BIGINT;
    } else if (dataType == DataTypes.DOUBLE) {
      return DoubleType.DOUBLE;
    } else if (dataType == DataTypes.FLOAT) {
      return RealType.REAL;
    } else if (dataType == DataTypes.BOOLEAN) {
      return BooleanType.BOOLEAN;
    } else if (dataType == DataTypes.BINARY) {
      return VarbinaryType.VARBINARY;
    } else if (dataType == DataTypes.DATE) {
      return DateType.DATE;
    } else if (dataType == DataTypes.TIMESTAMP) {
      return TimestampType.TIMESTAMP;
    } else if (dataType == DataTypes.BYTE) {
      return TinyintType.TINYINT;
    } else if (DataTypes.isDecimal(dataType)) {
      org.apache.carbondata.core.metadata.datatype.DecimalType decimal =
          (org.apache.carbondata.core.metadata.datatype.DecimalType) dataType;
      return DecimalType.createDecimalType(decimal.getPrecision(), decimal.getScale());
    } else if (DataTypes.isArrayType(dataType)) {
      return new ArrayType(getType(field.getChildren().get(0)));
    } else if (DataTypes.isStructType(dataType)) {
      List<RowType.Field> children = new ArrayList<>();
      for (StructField child : field.getChildren()) {
        children.add(new RowType.Field(Optional.of(child.getFieldName()), getType(child)));
      }
      return RowType.from(children);
    } else {
      throw new UnsupportedOperationException("Unsupported type: " + dataType);
    }
  }

  @Override public Block buildBlock() {
    return builder.build();
  }

  @Override public void setBatchSize(int batchSize) {
    this.batchSize = batchSize;
  }

  @Override
  public void putComplexObject(List<Integer> offsetVector) {
    if (type instanceof ArrayType) {
      // build child block
      Block childBlock = buildChildBlock(getChildrenVector().get(0));
      // prepare an offset vector with 0 as initial offset
      int[] offsetVectorArray = new int[offsetVector.size() + 1];
      for (int i = 1; i <= offsetVector.size(); i++) {
        offsetVectorArray[i] = offsetVectorArray[i - 1] + offsetVector.get(i - 1);
      }
      // prepare Array block
      Block arrayBlock = ArrayBlock
          .fromElementBlock(offsetVector.size(), Optional.empty(), offsetVectorArray,
              childBlock);
      for (int position = 0; position < offsetVector.size(); position++) {
        type.writeObject(builder, arrayBlock.getObject(position, Block.class));
      }
      getChildrenVector().get(0).getColumnVector().reset();
    } else {
      // build child blocks
      List<Block> childBlocks = new ArrayList<>(getChildrenVector().size());
      for (CarbonColumnVector child : getChildrenVector()) {
        childBlocks.add(buildChildBlock(child));
      }
      // prepare ROW block
      Block rowBlock = RowBlock
          .fromFieldBlocks(offsetVector.size(), Optional.empty(),
              childBlocks.toArray(new Block[0]));
      for (int position = 0; position < offsetVector.size(); position++) {
        type.writeObject(builder, rowBlock.getObject(position, Block.class));
      }
      for (CarbonColumnVector child : getChildrenVector()) {
        child.getColumnVector().reset();
      }
    }
  }

  private Block buildChildBlock(CarbonColumnVector carbonColumnVector) {
    DataType dataType = carbonColumnVector.getType();
    carbonColumnVector = carbonColumnVector.getColumnVector();
    if (dataType == DataTypes.STRING || dataType == DataTypes.BINARY
        || dataType == DataTypes.VARCHAR) {
      return ((SliceStreamReader) carbonColumnVector).buildBlock();
    } else if (dataType == DataTypes.SHORT) {
      return ((ShortStreamReader) carbonColumnVector).buildBlock();
    } else if (dataType == DataTypes.INT || dataType == DataTypes.DATE) {
      return ((IntegerStreamReader) carbonColumnVector).buildBlock();
    } else if (dataType == DataTypes.LONG) {
      return ((LongStreamReader) carbonColumnVector).buildBlock();
    } else if (dataType == DataTypes.DOUBLE) {
      return ((DoubleStreamReader) carbonColumnVector).buildBlock();
    } else if (dataType == DataTypes.FLOAT) {
      return ((FloatStreamReader) carbonColumnVector).buildBlock();
    } else if (dataType == DataTypes.TIMESTAMP) {
      return ((TimestampStreamReader) carbonColumnVector).buildBlock();
    } else if (dataType == DataTypes.BOOLEAN) {
      return ((BooleanStreamReader) carbonColumnVector).buildBlock();
    } else if (DataTypes.isDecimal(dataType)) {
      return ((DecimalSliceStreamReader) carbonColumnVector).buildBlock();
    } else if (dataType == DataTypes.BYTE) {
      return ((ByteStreamReader) carbonColumnVector).buildBlock();
    } else if (DataTypes.isArrayType(dataType) || (DataTypes.isStructType(dataType))) {
      return ((ComplexTypeStreamReader) carbonColumnVector).buildBlock();
    } else {
      throw new UnsupportedOperationException("unsupported for type :" + dataType);
    }
  }

  @Override public void putNull(int rowId) {
    builder.appendNull();
  }

  @Override public void reset() {
    builder = type.createBlockBuilder(null, batchSize);
  }

  @Override public void putNulls(int rowId, int count) {
    for (int i = 0; i < count; i++) {
      builder.appendNull();
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



integration/presto/src/main/prestosql/org/apache/carbondata/presto/readers/ComplexTypeStreamReader.java [45:193]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
public class ComplexTypeStreamReader extends CarbonColumnVectorImpl
    implements PrestoVectorBlockBuilder {

  protected int batchSize;

  protected Type type;
  protected BlockBuilder builder;

  public ComplexTypeStreamReader(int batchSize, StructField field) {
    super(batchSize, field.getDataType());
    this.batchSize = batchSize;
    this.type = getType(field);
    List<CarbonColumnVector> childrenList = new ArrayList<>();
    for (StructField child : field.getChildren()) {
      childrenList.add(new ColumnarVectorWrapperDirect(Objects.requireNonNull(
          CarbonVectorBatch.createDirectStreamReader(this.batchSize, child.getDataType(), child))));
    }
    setChildrenVector(childrenList);
    this.builder = type.createBlockBuilder(null, batchSize);
  }

  Type getType(StructField field) {
    DataType dataType = field.getDataType();
    if (dataType == DataTypes.STRING || dataType == DataTypes.VARCHAR) {
      return VarcharType.VARCHAR;
    } else if (dataType == DataTypes.SHORT) {
      return SmallintType.SMALLINT;
    } else if (dataType == DataTypes.INT) {
      return IntegerType.INTEGER;
    } else if (dataType == DataTypes.LONG) {
      return BigintType.BIGINT;
    } else if (dataType == DataTypes.DOUBLE) {
      return DoubleType.DOUBLE;
    } else if (dataType == DataTypes.FLOAT) {
      return RealType.REAL;
    } else if (dataType == DataTypes.BOOLEAN) {
      return BooleanType.BOOLEAN;
    } else if (dataType == DataTypes.BINARY) {
      return VarbinaryType.VARBINARY;
    } else if (dataType == DataTypes.DATE) {
      return DateType.DATE;
    } else if (dataType == DataTypes.TIMESTAMP) {
      return TimestampType.TIMESTAMP;
    } else if (dataType == DataTypes.BYTE) {
      return TinyintType.TINYINT;
    } else if (DataTypes.isDecimal(dataType)) {
      org.apache.carbondata.core.metadata.datatype.DecimalType decimal =
          (org.apache.carbondata.core.metadata.datatype.DecimalType) dataType;
      return DecimalType.createDecimalType(decimal.getPrecision(), decimal.getScale());
    } else if (DataTypes.isArrayType(dataType)) {
      return new ArrayType(getType(field.getChildren().get(0)));
    } else if (DataTypes.isStructType(dataType)) {
      List<RowType.Field> children = new ArrayList<>();
      for (StructField child : field.getChildren()) {
        children.add(new RowType.Field(Optional.of(child.getFieldName()), getType(child)));
      }
      return RowType.from(children);
    } else {
      throw new UnsupportedOperationException("Unsupported type: " + dataType);
    }
  }

  @Override public Block buildBlock() {
    return builder.build();
  }

  @Override public void setBatchSize(int batchSize) {
    this.batchSize = batchSize;
  }

  @Override
  public void putComplexObject(List<Integer> offsetVector) {
    if (type instanceof ArrayType) {
      // build child block
      Block childBlock = buildChildBlock(getChildrenVector().get(0));
      // prepare an offset vector with 0 as initial offset
      int[] offsetVectorArray = new int[offsetVector.size() + 1];
      for (int i = 1; i <= offsetVector.size(); i++) {
        offsetVectorArray[i] = offsetVectorArray[i - 1] + offsetVector.get(i - 1);
      }
      // prepare Array block
      Block arrayBlock = ArrayBlock
          .fromElementBlock(offsetVector.size(), Optional.empty(), offsetVectorArray,
              childBlock);
      for (int position = 0; position < offsetVector.size(); position++) {
        type.writeObject(builder, arrayBlock.getObject(position, Block.class));
      }
      getChildrenVector().get(0).getColumnVector().reset();
    } else {
      // build child blocks
      List<Block> childBlocks = new ArrayList<>(getChildrenVector().size());
      for (CarbonColumnVector child : getChildrenVector()) {
        childBlocks.add(buildChildBlock(child));
      }
      // prepare ROW block
      Block rowBlock = RowBlock
          .fromFieldBlocks(offsetVector.size(), Optional.empty(),
              childBlocks.toArray(new Block[0]));
      for (int position = 0; position < offsetVector.size(); position++) {
        type.writeObject(builder, rowBlock.getObject(position, Block.class));
      }
      for (CarbonColumnVector child : getChildrenVector()) {
        child.getColumnVector().reset();
      }
    }
  }

  private Block buildChildBlock(CarbonColumnVector carbonColumnVector) {
    DataType dataType = carbonColumnVector.getType();
    carbonColumnVector = carbonColumnVector.getColumnVector();
    if (dataType == DataTypes.STRING || dataType == DataTypes.BINARY
        || dataType == DataTypes.VARCHAR) {
      return ((SliceStreamReader) carbonColumnVector).buildBlock();
    } else if (dataType == DataTypes.SHORT) {
      return ((ShortStreamReader) carbonColumnVector).buildBlock();
    } else if (dataType == DataTypes.INT || dataType == DataTypes.DATE) {
      return ((IntegerStreamReader) carbonColumnVector).buildBlock();
    } else if (dataType == DataTypes.LONG) {
      return ((LongStreamReader) carbonColumnVector).buildBlock();
    } else if (dataType == DataTypes.DOUBLE) {
      return ((DoubleStreamReader) carbonColumnVector).buildBlock();
    } else if (dataType == DataTypes.FLOAT) {
      return ((FloatStreamReader) carbonColumnVector).buildBlock();
    } else if (dataType == DataTypes.TIMESTAMP) {
      return ((TimestampStreamReader) carbonColumnVector).buildBlock();
    } else if (dataType == DataTypes.BOOLEAN) {
      return ((BooleanStreamReader) carbonColumnVector).buildBlock();
    } else if (DataTypes.isDecimal(dataType)) {
      return ((DecimalSliceStreamReader) carbonColumnVector).buildBlock();
    } else if (dataType == DataTypes.BYTE) {
      return ((ByteStreamReader) carbonColumnVector).buildBlock();
    } else if (DataTypes.isArrayType(dataType) || (DataTypes.isStructType(dataType))) {
      return ((ComplexTypeStreamReader) carbonColumnVector).buildBlock();
    } else {
      throw new UnsupportedOperationException("unsupported for type :" + dataType);
    }
  }

  @Override public void putNull(int rowId) {
    builder.appendNull();
  }

  @Override public void reset() {
    builder = type.createBlockBuilder(null, batchSize);
  }

  @Override public void putNulls(int rowId, int count) {
    for (int i = 0; i < count; i++) {
      builder.appendNull();
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



