in hologres-connector-kafka/src/main/java/com/alibaba/hologres/kafka/sink/HoloSinkWriter.java [111:205]
private void writeStructJson(SinkRecord record) throws KafkaHoloException {
Put put = new Put(schema);
for (Field field : record.valueSchema().fields()) {
String fieldName = field.name();
Schema.Type fieldType = field.schema().type();
Object fieldValue;
if (field.schema().name() != null) {
switch (field.schema().name()) {
// 前三个case使用kafka.connect.data类型,对输入格式要求较为严格
case Decimal.LOGICAL_NAME:
fieldValue = (BigDecimal) ((Struct) record.value()).get(fieldName);
break;
case Date.LOGICAL_NAME:
java.util.Date dateValue =
(java.util.Date) ((Struct) record.value()).get(fieldName);
fieldValue = new java.sql.Date(dateValue.getTime());
break;
case Timestamp.LOGICAL_NAME:
java.util.Date timestampValue =
(java.util.Date) ((Struct) record.value()).get(fieldName);
fieldValue = new java.sql.Timestamp(timestampValue.getTime());
break;
// 以下三个case使用string类型读入并写入holo,可读性高
case "Decimal":
fieldValue = new BigDecimal(((Struct) record.value()).getString(fieldName));
break;
case "Date":
fieldValue =
java.sql.Date.valueOf(
((Struct) record.value()).getString(fieldName));
break;
case "Timestamp":
fieldValue =
java.sql.Timestamp.valueOf(
((Struct) record.value()).getString(fieldName));
break;
default:
throw new IllegalArgumentException(
"not support type name " + field.schema().name());
}
} else {
switch (fieldType) {
case INT8:
fieldValue = ((Struct) record.value()).getInt8(fieldName);
break;
case INT16:
fieldValue = ((Struct) record.value()).getInt16(fieldName);
break;
case INT32:
fieldValue = ((Struct) record.value()).getInt32(fieldName);
break;
case INT64:
fieldValue = ((Struct) record.value()).getInt64(fieldName);
break;
case FLOAT32:
fieldValue = ((Struct) record.value()).getFloat32(fieldName);
break;
case FLOAT64:
fieldValue = ((Struct) record.value()).getFloat64(fieldName);
break;
case BOOLEAN:
fieldValue = ((Struct) record.value()).getBoolean(fieldName);
break;
case STRING:
fieldValue = ((Struct) record.value()).getString(fieldName);
break;
case BYTES:
fieldValue = ((Struct) record.value()).getBytes(fieldName);
break;
case ARRAY:
fieldValue = ((Struct) record.value()).getArray(fieldName);
break;
default:
throw new IllegalArgumentException("not support type " + fieldType.name());
}
}
try {
put.setObject(fieldName, fieldValue);
} catch (InvalidParameterException e) {
if (e.getMessage().contains("can not found column") && !schemaForceCheck) {
logger.warn(
"field {} not exists in holo table {} but we ignore it because schemaForceCheck is false",
fieldName,
schema.getTableName());
} else {
throw new KafkaHoloException(
String.format(
"hologres table %s not have column named %s",
schema.getTableName(), fieldName));
}
}
putMessageInfo(record, put);
}
holoWriter.write(put);
}