private void writePrimitiveType()

in hdfswriter/src/main/java/com/alibaba/datax/plugin/writer/hdfswriter/ParquetFileSupport.java [494:586]


    private void writePrimitiveType(Type type, Object value, int index) {
        if (value == null) {
            return;
        }

        recordConsumer.startField(type.getName(), index);
        PrimitiveType primitiveType = type.asPrimitiveType();

        switch (primitiveType.getPrimitiveTypeName()) {
            case BOOLEAN:
                recordConsumer.addBoolean((Boolean) value);
                break;
            case FLOAT:
                if (value instanceof Float) {
                    recordConsumer.addFloat(((Float) value).floatValue());
                } else if (value instanceof Double) {
                    recordConsumer.addFloat(((Double) value).floatValue());
                } else if (value instanceof Long) {
                    recordConsumer.addFloat(((Long) value).floatValue());
                } else if (value instanceof Integer) {
                    recordConsumer.addFloat(((Integer) value).floatValue());
                }
                break;
            case DOUBLE:
                if (value instanceof Float) {
                    recordConsumer.addDouble(((Float) value).doubleValue());
                } else if (value instanceof Double) {
                    recordConsumer.addDouble(((Double) value).doubleValue());
                } else if (value instanceof Long) {
                    recordConsumer.addDouble(((Long) value).doubleValue());
                } else if (value instanceof Integer) {
                    recordConsumer.addDouble(((Integer) value).doubleValue());
                }
                break;
            case INT32:
                if (value instanceof Integer) {
                    recordConsumer.addInteger((Integer) value);
                } else if (value instanceof Long) {
                    recordConsumer.addInteger(((Long) value).intValue());
                } else {
                    // 之前代码写的有问题,导致这里丢列了没抛异常,先收集,后续看看有没有任务命中在决定怎么改
                    LimitLogger.limit("dirtyDataHiveWriterParquet", TimeUnit.MINUTES.toMillis(1), () -> LOGGER.warn("dirtyDataHiveWriterParquet {}", String.format("Invalid value: %s(clazz: %s) for field: %s", value, value.getClass(), type.getName())));
                }
                break;
            case INT64:
                if (value instanceof Integer) {
                    recordConsumer.addLong(((Integer) value).longValue());
                } else if (value instanceof Long) {
                    recordConsumer.addInteger(((Long) value).intValue());
                } else {
                    // 之前代码写的有问题,导致这里丢列了没抛异常,先收集,后续看看有没有任务命中在决定怎么改
                    LimitLogger.limit("dirtyDataHiveWriterParquet", TimeUnit.MINUTES.toMillis(1), () -> LOGGER.warn("dirtyDataHiveWriterParquet {}", String.format("Invalid value: %s(clazz: %s) for field: %s", value, value.getClass(), type.getName())));
                }
                break;
            case INT96:
                if (value instanceof Integer) {
                    recordConsumer.addBinary(timestampColToBinary(new LongColumn((Integer) value)));
                } else if (value instanceof Long) {
                    recordConsumer.addBinary(timestampColToBinary(new LongColumn((Long) value)));
                } else if (value instanceof Timestamp) {
                    recordConsumer.addBinary(timestampColToBinary(new DateColumn((Timestamp) value)));
                } else if (value instanceof Date) {
                    recordConsumer.addBinary(timestampColToBinary(new DateColumn((Date) value)));
                } else {
                    recordConsumer.addBinary(timestampColToBinary(new StringColumn(value.toString())));
                }
                break;
            case FIXED_LEN_BYTE_ARRAY:
                if (primitiveType.getDecimalMetadata() != null) {
                    // decimal
                    Column column;
                    if (value instanceof Integer) {
                        column = new LongColumn((Integer) value);
                    } else if (value instanceof Long) {
                        column = new LongColumn((Long) value);
                    } else if (value instanceof Double) {
                        column = new DoubleColumn((Double) value);
                    } else if (value instanceof BigDecimal) {
                        column = new DoubleColumn((BigDecimal) value);
                    } else {
                        column = new StringColumn(value.toString());
                    }
                    recordConsumer.addBinary(decimalToBinary(column, primitiveType.getDecimalMetadata().getPrecision(), primitiveType.getDecimalMetadata().getScale()));
                    break;
                }
                /* fall through */
            case BINARY:
            default:
                recordConsumer.addBinary(Binary.fromString((String) value));
                break;
        }
        recordConsumer.endField(type.getName(), index);
    }