in hologres-connector-kafka/src/main/java/com/alibaba/hologres/kafka/sink/HoloSinkWriter.java [208:284]
private void writeJson(SinkRecord record) throws KafkaHoloException {
Put put = new Put(schema);
HashMap<String, Object> jsonMap = (HashMap<String, Object>) record.value();
for (Map.Entry<String, Object> entry : jsonMap.entrySet()) {
Object value;
Column holoColumn;
try {
holoColumn = schema.getColumn(schema.getColumnIndex(entry.getKey()));
} catch (NullPointerException e) {
if (!schemaForceCheck) {
logger.warn(
"field {} not exists in holo table {} but we ignore it because schemaForceCheck is false",
entry.getKey(),
schema.getTableName());
continue;
} else {
throw new KafkaHoloException(
String.format(
"hologres table %s not have column named %s",
schema.getTableName(), entry.getKey()));
}
}
switch (holoColumn.getType()) {
case Types.CHAR:
case Types.VARCHAR:
value = entry.getValue().toString();
break;
case Types.BIT:
case Types.BOOLEAN:
value = Boolean.valueOf(entry.getValue().toString());
break;
case Types.NUMERIC:
case Types.DECIMAL:
value = new BigDecimal(entry.getValue().toString());
break;
case Types.SMALLINT:
value = (short) entry.getValue();
break;
case Types.INTEGER:
value = Integer.valueOf(entry.getValue().toString());
break;
case Types.BIGINT:
value = (long) entry.getValue();
break;
case Types.REAL:
case Types.FLOAT:
value = (float) entry.getValue();
break;
case Types.DOUBLE:
value = (double) entry.getValue();
break;
case Types.DATE:
if (entry.getValue() instanceof Long) {
value = new java.sql.Date((Long) entry.getValue());
} else {
value = java.sql.Date.valueOf(entry.getValue().toString());
}
break;
case Types.TIMESTAMP:
case Types.TIMESTAMP_WITH_TIMEZONE:
if (entry.getValue() instanceof Long) {
value = new java.sql.Timestamp((Long) entry.getValue());
} else {
value = java.sql.Timestamp.valueOf(entry.getValue().toString());
}
break;
default:
throw new IllegalArgumentException(
"not support hologres type " + holoColumn.getTypeName());
}
put.setObject(entry.getKey(), value);
putMessageInfo(record, put);
}
holoWriter.write(put);
}