in flink-doris-connector/src/main/java/org/apache/doris/flink/serialization/RowBatch.java [173:338]
private boolean doConvert(int col,
int rowIndex,
Types.MinorType minorType,
String currentType,
FieldVector fieldVector) throws DorisException {
switch (currentType) {
case "NULL_TYPE":
break;
case "BOOLEAN":
if (!minorType.equals(Types.MinorType.BIT)) return false;
BitVector bitVector = (BitVector) fieldVector;
Object fieldValue = bitVector.isNull(rowIndex) ? null : bitVector.get(rowIndex) != 0;
addValueToRow(rowIndex, fieldValue);
break;
case "TINYINT":
if (!minorType.equals(Types.MinorType.TINYINT)) return false;
TinyIntVector tinyIntVector = (TinyIntVector) fieldVector;
fieldValue = tinyIntVector.isNull(rowIndex) ? null : tinyIntVector.get(rowIndex);
addValueToRow(rowIndex, fieldValue);
break;
case "SMALLINT":
if (!minorType.equals(Types.MinorType.SMALLINT)) return false;
SmallIntVector smallIntVector = (SmallIntVector) fieldVector;
fieldValue = smallIntVector.isNull(rowIndex) ? null : smallIntVector.get(rowIndex);
addValueToRow(rowIndex, fieldValue);
break;
case "INT":
if (!minorType.equals(Types.MinorType.INT)) return false;
IntVector intVector = (IntVector) fieldVector;
fieldValue = intVector.isNull(rowIndex) ? null : intVector.get(rowIndex);
addValueToRow(rowIndex, fieldValue);
break;
case "BIGINT":
if (!minorType.equals(Types.MinorType.BIGINT)) return false;
BigIntVector bigIntVector = (BigIntVector) fieldVector;
fieldValue = bigIntVector.isNull(rowIndex) ? null : bigIntVector.get(rowIndex);
addValueToRow(rowIndex, fieldValue);
break;
case "FLOAT":
if (!minorType.equals(Types.MinorType.FLOAT4)) return false;
Float4Vector float4Vector = (Float4Vector) fieldVector;
fieldValue = float4Vector.isNull(rowIndex) ? null : float4Vector.get(rowIndex);
addValueToRow(rowIndex, fieldValue);
break;
case "TIME":
case "DOUBLE":
if (!minorType.equals(Types.MinorType.FLOAT8)) return false;
Float8Vector float8Vector = (Float8Vector) fieldVector;
fieldValue = float8Vector.isNull(rowIndex) ? null : float8Vector.get(rowIndex);
addValueToRow(rowIndex, fieldValue);
break;
case "BINARY":
if (!minorType.equals(Types.MinorType.VARBINARY)) return false;
VarBinaryVector varBinaryVector = (VarBinaryVector) fieldVector;
fieldValue = varBinaryVector.isNull(rowIndex) ? null : varBinaryVector.get(rowIndex);
addValueToRow(rowIndex, fieldValue);
break;
case "DECIMAL":
case "DECIMALV2":
case "DECIMAL32":
case "DECIMAL64":
case "DECIMAL128I":
if (!minorType.equals(Types.MinorType.DECIMAL)) return false;
DecimalVector decimalVector = (DecimalVector) fieldVector;
if (decimalVector.isNull(rowIndex)) {
addValueToRow(rowIndex, null);
break;
}
BigDecimal value = decimalVector.getObject(rowIndex).stripTrailingZeros();
addValueToRow(rowIndex, value);
break;
case "DATE":
case "DATEV2":
if (!minorType.equals(Types.MinorType.VARCHAR)) return false;
VarCharVector date = (VarCharVector) fieldVector;
if (date.isNull(rowIndex)) {
addValueToRow(rowIndex, null);
break;
}
String stringValue = new String(date.get(rowIndex));
LocalDate localDate = LocalDate.parse(stringValue, dateFormatter);
addValueToRow(rowIndex, localDate);
break;
case "DATETIME":
if (!minorType.equals(Types.MinorType.VARCHAR)) return false;
VarCharVector timeStampSecVector = (VarCharVector) fieldVector;
if (timeStampSecVector.isNull(rowIndex)) {
addValueToRow(rowIndex, null);
break;
}
stringValue = new String(timeStampSecVector.get(rowIndex));
LocalDateTime parse = LocalDateTime.parse(stringValue, dateTimeFormatter);
addValueToRow(rowIndex, parse);
break;
case "DATETIMEV2":
if (!minorType.equals(Types.MinorType.VARCHAR)) return false;
VarCharVector timeStampV2SecVector = (VarCharVector) fieldVector;
if (timeStampV2SecVector.isNull(rowIndex)) {
addValueToRow(rowIndex, null);
break;
}
stringValue = new String(timeStampV2SecVector.get(rowIndex));
stringValue = completeMilliseconds(stringValue);
parse = LocalDateTime.parse(stringValue, dateTimeV2Formatter);
addValueToRow(rowIndex, parse);
break;
case "LARGEINT":
if (!minorType.equals(Types.MinorType.FIXEDSIZEBINARY) &&
!minorType.equals(Types.MinorType.VARCHAR)) return false;
if (minorType.equals(Types.MinorType.FIXEDSIZEBINARY)) {
FixedSizeBinaryVector largeIntVector = (FixedSizeBinaryVector) fieldVector;
if (largeIntVector.isNull(rowIndex)) {
addValueToRow(rowIndex, null);
break;
}
byte[] bytes = largeIntVector.get(rowIndex);
int left = 0, right = bytes.length - 1;
while (left < right) {
byte temp = bytes[left];
bytes[left] = bytes[right];
bytes[right] = temp;
left++;
right--;
}
BigInteger largeInt = new BigInteger(bytes);
addValueToRow(rowIndex, largeInt);
break;
} else {
VarCharVector largeIntVector = (VarCharVector) fieldVector;
if (largeIntVector.isNull(rowIndex)) {
addValueToRow(rowIndex, null);
break;
}
stringValue = new String(largeIntVector.get(rowIndex));
BigInteger largeInt = new BigInteger(stringValue);
addValueToRow(rowIndex, largeInt);
break;
}
case "CHAR":
case "VARCHAR":
case "STRING":
case "JSONB":
if (!minorType.equals(Types.MinorType.VARCHAR)) return false;
VarCharVector varCharVector = (VarCharVector) fieldVector;
if (varCharVector.isNull(rowIndex)) {
addValueToRow(rowIndex, null);
break;
}
stringValue = new String(varCharVector.get(rowIndex));
addValueToRow(rowIndex, stringValue);
break;
case "ARRAY":
if (!minorType.equals(Types.MinorType.LIST)) return false;
ListVector listVector = (ListVector) fieldVector;
Object listValue = listVector.isNull(rowIndex) ? null : listVector.getObject(rowIndex);
//todo: when the subtype of array is date, conversion is required
addValueToRow(rowIndex, listValue);
break;
default:
String errMsg = "Unsupported type " + schema.get(col).getType();
logger.error(errMsg);
throw new DorisException(errMsg);
}
return true;
}