in hdfswriter/src/main/java/com/alibaba/datax/plugin/writer/hdfswriter/ParquetFileSupport.java [494:586]
private void writePrimitiveType(Type type, Object value, int index) {
if (value == null) {
return;
}
recordConsumer.startField(type.getName(), index);
PrimitiveType primitiveType = type.asPrimitiveType();
switch (primitiveType.getPrimitiveTypeName()) {
case BOOLEAN:
recordConsumer.addBoolean((Boolean) value);
break;
case FLOAT:
if (value instanceof Float) {
recordConsumer.addFloat(((Float) value).floatValue());
} else if (value instanceof Double) {
recordConsumer.addFloat(((Double) value).floatValue());
} else if (value instanceof Long) {
recordConsumer.addFloat(((Long) value).floatValue());
} else if (value instanceof Integer) {
recordConsumer.addFloat(((Integer) value).floatValue());
}
break;
case DOUBLE:
if (value instanceof Float) {
recordConsumer.addDouble(((Float) value).doubleValue());
} else if (value instanceof Double) {
recordConsumer.addDouble(((Double) value).doubleValue());
} else if (value instanceof Long) {
recordConsumer.addDouble(((Long) value).doubleValue());
} else if (value instanceof Integer) {
recordConsumer.addDouble(((Integer) value).doubleValue());
}
break;
case INT32:
if (value instanceof Integer) {
recordConsumer.addInteger((Integer) value);
} else if (value instanceof Long) {
recordConsumer.addInteger(((Long) value).intValue());
} else {
// 之前代码写的有问题,导致这里丢列了没抛异常,先收集,后续看看有没有任务命中在决定怎么改
LimitLogger.limit("dirtyDataHiveWriterParquet", TimeUnit.MINUTES.toMillis(1), () -> LOGGER.warn("dirtyDataHiveWriterParquet {}", String.format("Invalid value: %s(clazz: %s) for field: %s", value, value.getClass(), type.getName())));
}
break;
case INT64:
if (value instanceof Integer) {
recordConsumer.addLong(((Integer) value).longValue());
} else if (value instanceof Long) {
recordConsumer.addInteger(((Long) value).intValue());
} else {
// 之前代码写的有问题,导致这里丢列了没抛异常,先收集,后续看看有没有任务命中在决定怎么改
LimitLogger.limit("dirtyDataHiveWriterParquet", TimeUnit.MINUTES.toMillis(1), () -> LOGGER.warn("dirtyDataHiveWriterParquet {}", String.format("Invalid value: %s(clazz: %s) for field: %s", value, value.getClass(), type.getName())));
}
break;
case INT96:
if (value instanceof Integer) {
recordConsumer.addBinary(timestampColToBinary(new LongColumn((Integer) value)));
} else if (value instanceof Long) {
recordConsumer.addBinary(timestampColToBinary(new LongColumn((Long) value)));
} else if (value instanceof Timestamp) {
recordConsumer.addBinary(timestampColToBinary(new DateColumn((Timestamp) value)));
} else if (value instanceof Date) {
recordConsumer.addBinary(timestampColToBinary(new DateColumn((Date) value)));
} else {
recordConsumer.addBinary(timestampColToBinary(new StringColumn(value.toString())));
}
break;
case FIXED_LEN_BYTE_ARRAY:
if (primitiveType.getDecimalMetadata() != null) {
// decimal
Column column;
if (value instanceof Integer) {
column = new LongColumn((Integer) value);
} else if (value instanceof Long) {
column = new LongColumn((Long) value);
} else if (value instanceof Double) {
column = new DoubleColumn((Double) value);
} else if (value instanceof BigDecimal) {
column = new DoubleColumn((BigDecimal) value);
} else {
column = new StringColumn(value.toString());
}
recordConsumer.addBinary(decimalToBinary(column, primitiveType.getDecimalMetadata().getPrecision(), primitiveType.getDecimalMetadata().getScale()));
break;
}
/* fall through */
case BINARY:
default:
recordConsumer.addBinary(Binary.fromString((String) value));
break;
}
recordConsumer.endField(type.getName(), index);
}