in hologres-connector-hive-base/src/main/java/com/alibaba/hologres/hive/HoloSerDe.java [384:522]
public HoloRecordWritable serialize(Object row, ObjectInspector objInspector)
throws SerDeException {
LOGGER.trace("Serializing from SerDe");
if (row == null || hiveColumnTypes == null) {
throw new SerDeException("Holo SerDe has no columns to serialize");
}
if (((Object[]) row).length != hiveColumnCount) {
throw new SerDeException(
String.format(
"Required %d columns, received %d.",
hiveColumnCount, ((Object[]) row).length));
}
dbRecordWritable.clear();
for (int i = 0; i < hiveColumnCount; i++) {
Object rowData = ((Object[]) row)[i];
if (null != rowData) {
if (hiveColumnTypes[i].getCategory() == Category.LIST) {
String arrayElementTypeName =
((ListTypeInfo) hiveColumnTypes[i])
.getListElementTypeInfo()
.getTypeName();
// 根据获取数据源方式的不同,此处array rowData的class可能会有不同:LazyArray、LazyBinaryArray、ArrayList。
if (rowData instanceof LazyBinaryArray) {
rowData = ((LazyBinaryArray) rowData).getList();
} else if (rowData instanceof LazyArray) {
rowData = ((LazyArray) rowData).getList();
} else if (rowData instanceof ArrayList) {
// do nothing
} else {
throw new IllegalArgumentException(
String.format(
"Does not support array class %s, this array type is %s , column name is %s!",
rowData.getClass(),
arrayElementTypeName,
hiveColumnNames[i]));
}
switch (arrayElementTypeName) {
case "int":
rowData = DataTypeUtils.castIntWritableArrayListToArray(rowData);
break;
case "bigint":
rowData = DataTypeUtils.castLongWritableArrayListToArray(rowData);
break;
case "float":
rowData = DataTypeUtils.castFloatWritableArrayListToArray(rowData);
break;
case "double":
rowData = DataTypeUtils.castDoubleWritableArrayListToArray(rowData);
break;
case "boolean":
rowData = DataTypeUtils.castBooleanWritableArrayListToArray(rowData);
break;
case "string":
rowData = DataTypeUtils.castHiveTextArrayListToArray(rowData);
break;
default:
throw new IllegalArgumentException(
String.format(
"Does not support array element type %s , column name %s!",
arrayElementTypeName, hiveColumnNames[i]));
}
} else {
PrimitiveCategory columnType =
((PrimitiveTypeInfo) hiveColumnTypes[i]).getPrimitiveCategory();
// 根据获取数据源方式的不同,如读取外表,直接insert数据,此处rowData的class可能会有不同:LazyX、LazyBinaryX、XWritable,因此统一toString后处理。
switch (columnType) {
case INT:
rowData = Integer.valueOf(rowData.toString());
break;
case SHORT:
rowData = Short.valueOf(rowData.toString());
break;
case BYTE:
rowData = Byte.valueOf(rowData.toString());
break;
case LONG:
rowData = Long.valueOf(rowData.toString());
break;
case FLOAT:
rowData = Float.valueOf(rowData.toString());
break;
case DOUBLE:
rowData = Double.valueOf(rowData.toString());
break;
case DECIMAL:
rowData =
new HiveDecimalWritable(rowData.toString())
.getHiveDecimal()
.bigDecimalValue();
break;
case BOOLEAN:
rowData = Boolean.valueOf(rowData.toString());
break;
case CHAR:
case VARCHAR:
case STRING:
rowData = String.valueOf(rowData.toString());
break;
case DATE:
// the object is DateWritable in hive2, but DateWritableV2 in hive3, so
// we use string to convert.
rowData = java.sql.Date.valueOf(rowData.toString());
break;
case TIMESTAMP:
// the object is TimestampWritable in hive2, but TimestampWritableV2 in
// hive3, so we use string to convert.
rowData = java.sql.Timestamp.valueOf(rowData.toString());
break;
case BINARY:
if (rowData instanceof BytesWritable) {
rowData = ((BytesWritable) rowData).getBytes();
} else if (rowData instanceof LazyBinary) {
rowData = ((LazyBinary) rowData).getWritableObject().getBytes();
} else if (rowData instanceof LazyBinaryBinary) {
rowData =
((LazyBinaryBinary) rowData).getWritableObject().getBytes();
} else {
throw new SerDeException(
String.format(
"hologres connector SerDe need binary field instance BytesWritable/LazyBinary/LazyBinaryBinary but is was %s",
rowData.getClass()));
}
break;
default:
throw new SerDeException(
String.format(
"hologres connector not support type %s, column name %s",
columnType.name(), hiveColumnNames[i]));
}
}
}
dbRecordWritable.set(i, rowData);
}
return dbRecordWritable;
}