in hdfsreader/src/main/java/com/alibaba/datax/plugin/reader/hdfsreader/DFSUtil.java [436:556]
private Record transportOneRecord(List<ColumnEntry> columnConfigs, List<Object> recordFields
, RecordSender recordSender, TaskPluginCollector taskPluginCollector, boolean isReadAllColumns, String nullFormat, ArrayList<Column> hiveParitionColumns) {
Record record = recordSender.createRecord();
Column columnGenerated;
try {
if (isReadAllColumns) {
// 读取所有列,创建都为String类型的column
for (Object recordField : recordFields) {
String columnValue = null;
if (recordField != null) {
columnValue = recordField.toString();
}
columnGenerated = new StringColumn(columnValue);
record.addColumn(columnGenerated);
}
} else {
for (ColumnEntry columnConfig : columnConfigs) {
String columnType = columnConfig.getType();
Integer columnIndex = columnConfig.getIndex();
String columnConst = columnConfig.getValue();
String columnValue = null;
if (null != columnIndex) {
if (null != recordFields.get(columnIndex))
columnValue = recordFields.get(columnIndex).toString();
} else {
columnValue = columnConst;
}
Type type = Type.valueOf(columnType.toUpperCase());
// it's all ok if nullFormat is null
if (StringUtils.equals(columnValue, nullFormat)) {
columnValue = null;
}
switch (type) {
case STRING:
columnGenerated = new StringColumn(columnValue);
break;
case LONG:
try {
columnGenerated = new LongColumn(columnValue);
} catch (Exception e) {
throw new IllegalArgumentException(String.format(
"类型转换错误, 无法将[%s] 转换为[%s]", columnValue,
"LONG"));
}
break;
case DOUBLE:
try {
columnGenerated = new DoubleColumn(columnValue);
} catch (Exception e) {
throw new IllegalArgumentException(String.format(
"类型转换错误, 无法将[%s] 转换为[%s]", columnValue,
"DOUBLE"));
}
break;
case BOOLEAN:
try {
columnGenerated = new BoolColumn(columnValue);
} catch (Exception e) {
throw new IllegalArgumentException(String.format(
"类型转换错误, 无法将[%s] 转换为[%s]", columnValue,
"BOOLEAN"));
}
break;
case DATE:
try {
if (columnValue == null) {
columnGenerated = new DateColumn((Date) null);
} else {
String formatString = columnConfig.getFormat();
if (StringUtils.isNotBlank(formatString)) {
// 用户自己配置的格式转换
SimpleDateFormat format = new SimpleDateFormat(
formatString);
columnGenerated = new DateColumn(
format.parse(columnValue));
} else {
// 框架尝试转换
columnGenerated = new DateColumn(
new StringColumn(columnValue)
.asDate());
}
}
} catch (Exception e) {
throw new IllegalArgumentException(String.format(
"类型转换错误, 无法将[%s] 转换为[%s]", columnValue,
"DATE"));
}
break;
default:
String errorMessage = String.format(
"您配置的列类型暂不支持 : [%s]", columnType);
LOG.error(errorMessage);
throw DataXException
.asDataXException(
UnstructuredStorageReaderErrorCode.NOT_SUPPORT_TYPE,
errorMessage);
}
record.addColumn(columnGenerated);
}
}
recordSender.sendToWriter(record);
} catch (IllegalArgumentException iae) {
taskPluginCollector
.collectDirtyRecord(record, iae.getMessage());
} catch (IndexOutOfBoundsException ioe) {
taskPluginCollector
.collectDirtyRecord(record, ioe.getMessage());
} catch (Exception e) {
if (e instanceof DataXException) {
throw (DataXException) e;
}
// 每一种转换失败都是脏数据处理,包括数字格式 & 日期格式
taskPluginCollector.collectDirtyRecord(record, e.getMessage());
}
return record;
}