in hdfswriter/src/main/java/com/alibaba/datax/plugin/writer/hdfswriter/ParquetFileSupport.java [94:234]
public void write(Record values) {
if (dataxParquetMode.equalsIgnoreCase("fields")) {
writeBaseOnFields(values);
return;
}
// NOTE: 下面的实现其实是不对的,只是看代码注释貌似有用户已经在用
// 所以暂时不动下面的逻辑。
// 默认走的就是下面的这条代码路径
if (values != null && columns != null && values.getColumnNumber() == columns.size()) {
recordConsumer.startMessage();
for (int i = 0; i < columns.size(); i++) {
Column value = values.getColumn(i);
ColumnDescriptor columnDescriptor = columns.get(i);
Type type = this.schema.getFields().get(i);
if (value != null) {
try {
if (this.useRawDataTransf) {
if (value.getRawData() == null) {
continue;
}
recordConsumer.startField(columnDescriptor.getPath()[0], i);
// 原来使用Column->RawData的方法其实是错误的类型转换策略,会将DataX的数据内部表示形象序列化出去
// 但是 Parquet 已经有用户使用了,故暂时只是配置项切换
String rawData = value.getRawData().toString();
switch (columnDescriptor.getType()) {
case BOOLEAN:
recordConsumer.addBoolean(Boolean.parseBoolean(rawData));
break;
case FLOAT:
recordConsumer.addFloat(Float.parseFloat(rawData));
break;
case DOUBLE:
recordConsumer.addDouble(Double.parseDouble(rawData));
break;
case INT32:
OriginalType originalType = type.getOriginalType();
if (originalType != null && StringUtils.equalsIgnoreCase("DATE", originalType.name())) {
int realVal = (int) (new java.sql.Date(Long.parseLong(rawData)).toLocalDate().toEpochDay());
recordConsumer.addInteger(realVal);
} else {
recordConsumer.addInteger(Integer.parseInt(rawData));
}
break;
case INT64:
recordConsumer.addLong(Long.valueOf(rawData));
break;
case INT96:
recordConsumer.addBinary(timestampColToBinary(value));
break;
case BINARY:
recordConsumer.addBinary(Binary.fromString(rawData));
break;
case FIXED_LEN_BYTE_ARRAY:
PrimitiveType primitiveType = type.asPrimitiveType();
if (primitiveType.getDecimalMetadata() != null) {
// decimal
recordConsumer.addBinary(decimalToBinary(value, primitiveType.getDecimalMetadata().getPrecision(), primitiveType.getDecimalMetadata().getScale()));
break;
}
/* fall through */
default:
recordConsumer.addBinary(Binary.fromString(rawData));
break;
}
recordConsumer.endField(columnDescriptor.getPath()[0], i);
} else {
boolean isNull = null == value.getRawData();
if (!isNull) {
recordConsumer.startField(columnDescriptor.getPath()[0], i);
// no skip: empty fields are illegal, the field should be ommited completely instead
switch (columnDescriptor.getType()) {
case BOOLEAN:
recordConsumer.addBoolean(value.asBoolean());
break;
case FLOAT:
recordConsumer.addFloat(value.asDouble().floatValue());
break;
case DOUBLE:
recordConsumer.addDouble(value.asDouble());
break;
case INT32:
OriginalType originalType = type.getOriginalType();
if (originalType != null && StringUtils.equalsIgnoreCase("DATE", originalType.name())) {
int realVal = (int) (new java.sql.Date(value.asLong()).toLocalDate().toEpochDay());
recordConsumer.addInteger(realVal);
} else {
recordConsumer.addInteger(value.asLong().intValue());
}
break;
case INT64:
recordConsumer.addLong(value.asLong());
break;
case INT96:
recordConsumer.addBinary(timestampColToBinary(value));
break;
case BINARY:
String valueAsString2Write = null;
if (Column.Type.DATE == value.getType() && null != this.dateParse) {
valueAsString2Write = dateParse.format(value.asDate());
} else {
valueAsString2Write = value.asString();
}
recordConsumer.addBinary(Binary.fromString(valueAsString2Write));
break;
case FIXED_LEN_BYTE_ARRAY:
PrimitiveType primitiveType = type.asPrimitiveType();
if (primitiveType.getDecimalMetadata() != null) {
// decimal
recordConsumer.addBinary(decimalToBinary(value, primitiveType.getDecimalMetadata().getPrecision(), primitiveType.getDecimalMetadata().getScale()));
break;
}
/* fall through */
default:
recordConsumer.addBinary(Binary.fromString(value.asString()));
break;
}
recordConsumer.endField(columnDescriptor.getPath()[0], i);
}
}
} catch (Exception e) {
if (printStackTrace) {
printStackTrace = false;
LOGGER.warn("write to parquet error: {}", e.getMessage(), e);
}
// dirty data
if (null != this.taskPluginCollector) {
// job post 里面的merge taskPluginCollector 为null
this.taskPluginCollector.collectDirtyRecord(values, e, e.getMessage());
}
}
} else {
recordConsumer.addBinary(this.binaryForNull);
}
}
recordConsumer.endMessage();
}
}