in flink-doris-connector/src/main/java/org/apache/doris/flink/serialization/RowBatch.java [241:557]
public boolean doConvert(
int col, int rowIndex, MinorType minorType, String currentType, FieldVector fieldVector)
throws DorisException {
switch (currentType) {
case "NULL_TYPE":
break;
case "BOOLEAN":
if (!minorType.equals(MinorType.BIT)) {
return false;
}
BitVector bitVector = (BitVector) fieldVector;
Object fieldValue =
bitVector.isNull(rowIndex) ? null : bitVector.get(rowIndex) != 0;
addValueToRow(rowIndex, fieldValue);
break;
case "TINYINT":
if (!minorType.equals(MinorType.TINYINT)) {
return false;
}
TinyIntVector tinyIntVector = (TinyIntVector) fieldVector;
fieldValue = tinyIntVector.isNull(rowIndex) ? null : tinyIntVector.get(rowIndex);
addValueToRow(rowIndex, fieldValue);
break;
case "SMALLINT":
if (!minorType.equals(MinorType.SMALLINT)) {
return false;
}
SmallIntVector smallIntVector = (SmallIntVector) fieldVector;
fieldValue = smallIntVector.isNull(rowIndex) ? null : smallIntVector.get(rowIndex);
addValueToRow(rowIndex, fieldValue);
break;
case "INT":
if (!minorType.equals(MinorType.INT)) {
return false;
}
IntVector intVector = (IntVector) fieldVector;
fieldValue = intVector.isNull(rowIndex) ? null : intVector.get(rowIndex);
addValueToRow(rowIndex, fieldValue);
break;
case "IPV4":
if (!minorType.equals(MinorType.UINT4)
&& !minorType.equals(MinorType.INT)
&& !minorType.equals(MinorType.VARCHAR)) {
return false;
}
if (fieldVector.isNull(rowIndex)) {
addValueToRow(rowIndex, null);
break;
}
if (minorType.equals(MinorType.VARCHAR)) {
VarCharVector ipv4VarcharVector = (VarCharVector) fieldVector;
String ipv4Str =
new String(ipv4VarcharVector.get(rowIndex), StandardCharsets.UTF_8);
addValueToRow(rowIndex, ipv4Str);
} else {
BaseIntVector ipv4Vector;
if (minorType.equals(MinorType.INT)) {
ipv4Vector = (IntVector) fieldVector;
} else {
ipv4Vector = (UInt4Vector) fieldVector;
}
fieldValue =
ipv4Vector.isNull(rowIndex)
? null
: convertLongToIPv4Address(ipv4Vector.getValueAsLong(rowIndex));
addValueToRow(rowIndex, fieldValue);
}
break;
case "BIGINT":
if (!minorType.equals(MinorType.BIGINT)) {
return false;
}
BigIntVector bigIntVector = (BigIntVector) fieldVector;
fieldValue = bigIntVector.isNull(rowIndex) ? null : bigIntVector.get(rowIndex);
addValueToRow(rowIndex, fieldValue);
break;
case "FLOAT":
if (!minorType.equals(MinorType.FLOAT4)) {
return false;
}
Float4Vector float4Vector = (Float4Vector) fieldVector;
fieldValue = float4Vector.isNull(rowIndex) ? null : float4Vector.get(rowIndex);
addValueToRow(rowIndex, fieldValue);
break;
case "TIME":
case "DOUBLE":
if (!minorType.equals(MinorType.FLOAT8)) {
return false;
}
Float8Vector float8Vector = (Float8Vector) fieldVector;
fieldValue = float8Vector.isNull(rowIndex) ? null : float8Vector.get(rowIndex);
addValueToRow(rowIndex, fieldValue);
break;
case "BINARY":
if (!minorType.equals(MinorType.VARBINARY)) {
return false;
}
VarBinaryVector varBinaryVector = (VarBinaryVector) fieldVector;
fieldValue =
varBinaryVector.isNull(rowIndex) ? null : varBinaryVector.get(rowIndex);
addValueToRow(rowIndex, fieldValue);
break;
case "DECIMAL":
case "DECIMALV2":
case "DECIMAL32":
case "DECIMAL64":
case "DECIMAL128I":
case "DECIMAL128":
if (!minorType.equals(MinorType.DECIMAL)) {
return false;
}
DecimalVector decimalVector = (DecimalVector) fieldVector;
if (decimalVector.isNull(rowIndex)) {
addValueToRow(rowIndex, null);
break;
}
BigDecimal value = decimalVector.getObject(rowIndex).stripTrailingZeros();
addValueToRow(rowIndex, value);
break;
case "DATE":
case "DATEV2":
if (!minorType.equals(MinorType.DATEDAY) && !minorType.equals(MinorType.VARCHAR)) {
return false;
}
if (minorType.equals(MinorType.VARCHAR)) {
VarCharVector date = (VarCharVector) fieldVector;
if (date.isNull(rowIndex)) {
addValueToRow(rowIndex, null);
break;
}
String stringValue = new String(date.get(rowIndex), StandardCharsets.UTF_8);
LocalDate localDate = FastDateUtil.fastParseDate(stringValue, DATE_PATTERN);
addValueToRow(rowIndex, localDate);
} else {
DateDayVector date = (DateDayVector) fieldVector;
if (date.isNull(rowIndex)) {
addValueToRow(rowIndex, null);
break;
}
LocalDate localDate = LocalDate.ofEpochDay(date.get(rowIndex));
addValueToRow(rowIndex, localDate);
}
break;
case "DATETIME":
if (minorType.equals(MinorType.VARCHAR)) {
VarCharVector varCharVector = (VarCharVector) fieldVector;
if (varCharVector.isNull(rowIndex)) {
addValueToRow(rowIndex, null);
break;
}
String stringValue =
new String(varCharVector.get(rowIndex), StandardCharsets.UTF_8);
stringValue = completeMilliseconds(stringValue);
LocalDateTime parse =
FastDateUtil.fastParseDateTime(stringValue, DATETIME_PATTERN);
addValueToRow(rowIndex, parse);
} else if (fieldVector instanceof TimeStampVector) {
LocalDateTime dateTime = getDateTime(rowIndex, fieldVector);
addValueToRow(rowIndex, dateTime);
} else {
logger.error(
"Unsupported type for DATETIME, minorType {}, class is {}",
minorType.name(),
fieldVector == null ? null : fieldVector.getClass());
return false;
}
break;
case "DATETIMEV2":
if (minorType.equals(MinorType.VARCHAR)) {
VarCharVector varCharVector = (VarCharVector) fieldVector;
if (varCharVector.isNull(rowIndex)) {
addValueToRow(rowIndex, null);
break;
}
String stringValue =
new String(varCharVector.get(rowIndex), StandardCharsets.UTF_8);
stringValue = completeMilliseconds(stringValue);
LocalDateTime parse =
FastDateUtil.fastParseDateTimeV2(stringValue, DATETIMEV2_PATTERN);
addValueToRow(rowIndex, parse);
} else if (fieldVector instanceof TimeStampVector) {
LocalDateTime dateTime = getDateTime(rowIndex, fieldVector);
addValueToRow(rowIndex, dateTime);
} else {
logger.error(
"Unsupported type for DATETIMEV2, minorType {}, class is {}",
minorType.name(),
fieldVector == null ? null : fieldVector.getClass());
return false;
}
break;
case "LARGEINT":
if (!minorType.equals(MinorType.FIXEDSIZEBINARY)
&& !minorType.equals(MinorType.VARCHAR)) {
return false;
}
if (minorType.equals(MinorType.FIXEDSIZEBINARY)) {
FixedSizeBinaryVector largeIntVector = (FixedSizeBinaryVector) fieldVector;
if (largeIntVector.isNull(rowIndex)) {
addValueToRow(rowIndex, null);
break;
}
byte[] bytes = largeIntVector.get(rowIndex);
int left = 0, right = bytes.length - 1;
while (left < right) {
byte temp = bytes[left];
bytes[left] = bytes[right];
bytes[right] = temp;
left++;
right--;
}
BigInteger largeInt = new BigInteger(bytes);
addValueToRow(rowIndex, largeInt);
break;
} else {
VarCharVector largeIntVector = (VarCharVector) fieldVector;
if (largeIntVector.isNull(rowIndex)) {
addValueToRow(rowIndex, null);
break;
}
String stringValue =
new String(largeIntVector.get(rowIndex), StandardCharsets.UTF_8);
BigInteger largeInt = new BigInteger(stringValue);
addValueToRow(rowIndex, largeInt);
break;
}
case "CHAR":
case "VARCHAR":
case "STRING":
case "JSONB":
case "JSON":
case "VARIANT":
if (!minorType.equals(MinorType.VARCHAR)) {
return false;
}
VarCharVector varCharVector = (VarCharVector) fieldVector;
if (varCharVector.isNull(rowIndex)) {
addValueToRow(rowIndex, null);
break;
}
String stringValue =
new String(varCharVector.get(rowIndex), StandardCharsets.UTF_8);
addValueToRow(rowIndex, stringValue);
break;
case "IPV6":
if (!minorType.equals(MinorType.VARCHAR)) {
return false;
}
if (fieldVector.isNull(rowIndex)) {
addValueToRow(rowIndex, null);
break;
}
VarCharVector ipv6VarcharVector = (VarCharVector) fieldVector;
String ipv6Str =
new String(ipv6VarcharVector.get(rowIndex), StandardCharsets.UTF_8);
if (ipv6Str.contains(":")) {
addValueToRow(rowIndex, ipv6Str);
} else {
String ipv6Address = IPUtils.fromBigInteger(new BigInteger(ipv6Str));
addValueToRow(rowIndex, ipv6Address);
}
break;
case "ARRAY":
if (!minorType.equals(MinorType.LIST)) {
return false;
}
ListVector listVector = (ListVector) fieldVector;
Object listValue =
listVector.isNull(rowIndex) ? null : listVector.getObject(rowIndex);
// todo: when the subtype of array is date, conversion is required
addValueToRow(rowIndex, listValue);
break;
case "MAP":
if (!minorType.equals(MinorType.MAP)) {
return false;
}
MapVector mapVector = (MapVector) fieldVector;
UnionMapReader reader = mapVector.getReader();
if (mapVector.isNull(rowIndex)) {
addValueToRow(rowIndex, null);
break;
}
reader.setPosition(rowIndex);
Map<String, Object> mapValue = new HashMap<>();
while (reader.next()) {
FieldReader keyReader = reader.key();
FieldReader valueReader = reader.value();
Object mapKeyObj = handleMapFieldReader(keyReader);
Object mapValueObj = handleMapFieldReader(valueReader);
mapValue.put(mapKeyObj.toString(), mapValueObj);
}
addValueToRow(rowIndex, mapValue);
break;
case "STRUCT":
if (!minorType.equals(MinorType.STRUCT)) {
return false;
}
StructVector structVector = (StructVector) fieldVector;
if (structVector.isNull(rowIndex)) {
addValueToRow(rowIndex, null);
break;
}
Map<String, ?> structValue = structVector.getObject(rowIndex);
addValueToRow(rowIndex, structValue);
break;
default:
String errMsg = "Unsupported type " + schema.get(col).getType();
logger.error(errMsg);
throw new DorisException(errMsg);
}
return true;
}