in plugin-unstructured-storage-util/src/main/java/com/alibaba/datax/plugin/unstructuredstorage/reader/UnstructuredStorageReaderUtil.java [340:489]
public static Record transportOneRecord(RecordSender recordSender,
List<ColumnEntry> columnConfigs, String[] sourceLine,
String nullFormat, TaskPluginCollector taskPluginCollector) {
Record record = recordSender.createRecord();
Column columnGenerated = null;
// 创建都为String类型column的record
if (null == columnConfigs || columnConfigs.size() == 0) {
for (String columnValue : sourceLine) {
// not equalsIgnoreCase, it's all ok if nullFormat is null
if (columnValue.equals(nullFormat)) {
columnGenerated = new StringColumn(null);
} else {
columnGenerated = new StringColumn(columnValue);
}
record.addColumn(columnGenerated);
}
recordSender.sendToWriter(record);
} else {
try {
for (ColumnEntry columnConfig : columnConfigs) {
String columnType = columnConfig.getType();
Integer columnIndex = columnConfig.getIndex();
String columnConst = columnConfig.getValue();
String columnValue = null;
if (null == columnIndex && null == columnConst) {
throw DataXException
.asDataXException(
UnstructuredStorageReaderErrorCode.NO_INDEX_VALUE,
"由于您配置了type, 则至少需要配置 index 或 value");
}
if (null != columnIndex && null != columnConst) {
throw DataXException
.asDataXException(
UnstructuredStorageReaderErrorCode.MIXED_INDEX_VALUE,
"您混合配置了index, value, 每一列同时仅能选择其中一种");
}
if (null != columnIndex) {
if (columnIndex >= sourceLine.length) {
String message = String
.format("您尝试读取的列越界,源文件该行有 [%s] 列,您尝试读取第 [%s] 列, 数据详情[%s]",
sourceLine.length, columnIndex + 1,
StringUtils.join(sourceLine, ","));
LOG.warn(message);
throw new IndexOutOfBoundsException(message);
}
columnValue = sourceLine[columnIndex];
} else {
columnValue = columnConst;
}
Type type = Type.valueOf(columnType.toUpperCase());
// it's all ok if nullFormat is null
if (columnValue.equals(nullFormat)) {
columnValue = null;
}
switch (type) {
case STRING:
columnGenerated = new StringColumn(columnValue);
break;
case LONG:
try {
columnGenerated = new LongColumn(columnValue);
} catch (Exception e) {
throw new IllegalArgumentException(String.format(
"类型转换错误, 无法将[%s] 转换为[%s]", columnValue,
"LONG"));
}
break;
case DOUBLE:
try {
columnGenerated = new DoubleColumn(columnValue);
} catch (Exception e) {
throw new IllegalArgumentException(String.format(
"类型转换错误, 无法将[%s] 转换为[%s]", columnValue,
"DOUBLE"));
}
break;
case BOOLEAN:
try {
columnGenerated = new BoolColumn(columnValue);
} catch (Exception e) {
throw new IllegalArgumentException(String.format(
"类型转换错误, 无法将[%s] 转换为[%s]", columnValue,
"BOOLEAN"));
}
break;
case DATE:
try {
if (columnValue == null) {
Date date = null;
columnGenerated = new DateColumn(date);
} else {
String formatString = columnConfig.getFormat();
//if (null != formatString) {
if (StringUtils.isNotBlank(formatString)) {
// 用户自己配置的格式转换, 脏数据行为出现变化
DateFormat format = columnConfig
.getDateFormat();
columnGenerated = new DateColumn(
format.parse(columnValue));
} else {
// 框架尝试转换
columnGenerated = new DateColumn(
new StringColumn(columnValue)
.asDate());
}
}
} catch (Exception e) {
throw new IllegalArgumentException(String.format(
"类型转换错误, 无法将[%s] 转换为[%s]", columnValue,
"DATE"));
}
break;
default:
String errorMessage = String.format(
"您配置的列类型暂不支持 : [%s]", columnType);
LOG.error(errorMessage);
throw DataXException
.asDataXException(
UnstructuredStorageReaderErrorCode.NOT_SUPPORT_TYPE,
errorMessage);
}
record.addColumn(columnGenerated);
}
recordSender.sendToWriter(record);
} catch (IllegalArgumentException iae) {
taskPluginCollector
.collectDirtyRecord(record, iae.getMessage());
} catch (IndexOutOfBoundsException ioe) {
taskPluginCollector
.collectDirtyRecord(record, ioe.getMessage());
} catch (Exception e) {
if (e instanceof DataXException) {
throw (DataXException) e;
}
// 每一种转换失败都是脏数据处理,包括数字格式 & 日期格式
taskPluginCollector.collectDirtyRecord(record, e.getMessage());
}
}
return record;
}