private void writeStructJson()

in hologres-connector-kafka/src/main/java/com/alibaba/hologres/kafka/sink/HoloSinkWriter.java [111:205]


    private void writeStructJson(SinkRecord record) throws KafkaHoloException {
        Put put = new Put(schema);
        for (Field field : record.valueSchema().fields()) {
            String fieldName = field.name();
            Schema.Type fieldType = field.schema().type();
            Object fieldValue;
            if (field.schema().name() != null) {
                switch (field.schema().name()) {
                        // 前三个case使用kafka.connect.data类型,对输入格式要求较为严格
                    case Decimal.LOGICAL_NAME:
                        fieldValue = (BigDecimal) ((Struct) record.value()).get(fieldName);
                        break;
                    case Date.LOGICAL_NAME:
                        java.util.Date dateValue =
                                (java.util.Date) ((Struct) record.value()).get(fieldName);
                        fieldValue = new java.sql.Date(dateValue.getTime());
                        break;
                    case Timestamp.LOGICAL_NAME:
                        java.util.Date timestampValue =
                                (java.util.Date) ((Struct) record.value()).get(fieldName);
                        fieldValue = new java.sql.Timestamp(timestampValue.getTime());
                        break;
                        // 以下三个case使用string类型读入并写入holo,可读性高
                    case "Decimal":
                        fieldValue = new BigDecimal(((Struct) record.value()).getString(fieldName));
                        break;
                    case "Date":
                        fieldValue =
                                java.sql.Date.valueOf(
                                        ((Struct) record.value()).getString(fieldName));
                        break;
                    case "Timestamp":
                        fieldValue =
                                java.sql.Timestamp.valueOf(
                                        ((Struct) record.value()).getString(fieldName));
                        break;
                    default:
                        throw new IllegalArgumentException(
                                "not support type name " + field.schema().name());
                }
            } else {
                switch (fieldType) {
                    case INT8:
                        fieldValue = ((Struct) record.value()).getInt8(fieldName);
                        break;
                    case INT16:
                        fieldValue = ((Struct) record.value()).getInt16(fieldName);
                        break;
                    case INT32:
                        fieldValue = ((Struct) record.value()).getInt32(fieldName);
                        break;
                    case INT64:
                        fieldValue = ((Struct) record.value()).getInt64(fieldName);
                        break;
                    case FLOAT32:
                        fieldValue = ((Struct) record.value()).getFloat32(fieldName);
                        break;
                    case FLOAT64:
                        fieldValue = ((Struct) record.value()).getFloat64(fieldName);
                        break;
                    case BOOLEAN:
                        fieldValue = ((Struct) record.value()).getBoolean(fieldName);
                        break;
                    case STRING:
                        fieldValue = ((Struct) record.value()).getString(fieldName);
                        break;
                    case BYTES:
                        fieldValue = ((Struct) record.value()).getBytes(fieldName);
                        break;
                    case ARRAY:
                        fieldValue = ((Struct) record.value()).getArray(fieldName);
                        break;
                    default:
                        throw new IllegalArgumentException("not support type " + fieldType.name());
                }
            }
            try {
                put.setObject(fieldName, fieldValue);
            } catch (InvalidParameterException e) {
                if (e.getMessage().contains("can not found column") && !schemaForceCheck) {
                    logger.warn(
                            "field {} not exists in holo table {} but we ignore it because schemaForceCheck is false",
                            fieldName,
                            schema.getTableName());
                } else {
                    throw new KafkaHoloException(
                            String.format(
                                    "hologres table %s not have column named %s",
                                    schema.getTableName(), fieldName));
                }
            }
            putMessageInfo(record, put);
        }
        holoWriter.write(put);
    }