in odpsreader/src/main/java/com/alibaba/datax/plugin/reader/odpsreader/ReaderProxy.java [243:456]
private void odpsColumnToDataXField(Record odpsRecord,
com.alibaba.datax.common.element.Record dataXRecord, TypeInfo typeInfo,
String columnNameValue, boolean isPartitionColumn) {
ArrayRecord record = (ArrayRecord) odpsRecord;
OdpsType type = typeInfo.getOdpsType();
switch (type) {
case BIGINT: {
if (isPartitionColumn) {
dataXRecord.addColumn(new LongColumn(columnNameValue));
} else {
dataXRecord.addColumn(new LongColumn(record
.getBigint(columnNameValue)));
}
break;
}
case BOOLEAN: {
if (isPartitionColumn) {
dataXRecord.addColumn(new BoolColumn(columnNameValue));
} else {
dataXRecord.addColumn(new BoolColumn(record
.getBoolean(columnNameValue)));
}
break;
}
case DATE:
case DATETIME: {
// odps分区列,目前支持TINYINT、SMALLINT、INT、BIGINT、VARCHAR和STRING类型
if (isPartitionColumn) {
try {
dataXRecord.addColumn(new DateColumn(ColumnCast
.string2Date(new StringColumn(columnNameValue))));
} catch (ParseException e) {
String errMessage = MESSAGE_SOURCE.message("readerproxy.4",
this.partition, columnNameValue);
LOG.error(errMessage);
throw DataXException.asDataXException(
OdpsReaderErrorCode.READ_DATA_FAIL, errMessage, e);
}
} else {
if (com.aliyun.odps.OdpsType.DATETIME == type) {
dataXRecord.addColumn(new DateColumn(record
.getDatetime(columnNameValue)));
} else {
if (this.useDateWithCalendar) {
dataXRecord.addColumn(new DateColumn(record.
getDate(columnNameValue, this.calendarForDate)));
} else {
dataXRecord.addColumn(new DateColumn(record
.getDate(columnNameValue)));
}
}
}
break;
}
case DOUBLE: {
if (isPartitionColumn) {
dataXRecord.addColumn(new DoubleColumn(columnNameValue));
} else {
dataXRecord.addColumn(new DoubleColumn(record
.getDouble(columnNameValue)));
}
break;
}
case DECIMAL: {
if(isPartitionColumn) {
dataXRecord.addColumn(new DoubleColumn(columnNameValue));
} else {
dataXRecord.addColumn(new DoubleColumn(record.getDecimal(columnNameValue)));
}
break;
}
case STRING: {
if (isPartitionColumn) {
dataXRecord.addColumn(new StringColumn(columnNameValue));
} else {
dataXRecord.addColumn(new StringColumn(record
.getString(columnNameValue)));
}
break;
}
case TINYINT:
if (isPartitionColumn) {
dataXRecord.addColumn(new LongColumn(columnNameValue));
} else {
Byte value = record.getTinyint(columnNameValue);
Integer intValue = value != null ? value.intValue() : null;
dataXRecord.addColumn(new LongColumn(intValue));
}
break;
case SMALLINT: {
if (isPartitionColumn) {
dataXRecord.addColumn(new LongColumn(columnNameValue));
} else {
Short value = record.getSmallint(columnNameValue);
Long valueInLong = null;
if (null != value) {
valueInLong = value.longValue();
}
dataXRecord.addColumn(new LongColumn(valueInLong));
}
break;
}
case INT: {
if (isPartitionColumn) {
dataXRecord.addColumn(new LongColumn(columnNameValue));
} else {
dataXRecord.addColumn(new LongColumn(record
.getInt(columnNameValue)));
}
break;
}
case FLOAT: {
if (isPartitionColumn) {
dataXRecord.addColumn(new DoubleColumn(columnNameValue));
} else {
dataXRecord.addColumn(new DoubleColumn(record
.getFloat(columnNameValue)));
}
break;
}
case VARCHAR: {
if (isPartitionColumn) {
dataXRecord.addColumn(new StringColumn(columnNameValue));
} else {
Varchar value = record.getVarchar(columnNameValue);
String columnValue = value != null ? value.getValue() : null;
dataXRecord.addColumn(new StringColumn(columnValue));
}
break;
}
case TIMESTAMP: {
if (isPartitionColumn) {
try {
dataXRecord.addColumn(new DateColumn(ColumnCast
.string2Date(new StringColumn(columnNameValue))));
} catch (ParseException e) {
String errMessage = MESSAGE_SOURCE.message("readerproxy.4",
this.partition, columnNameValue);
LOG.error(errMessage);
throw DataXException.asDataXException(
OdpsReaderErrorCode.READ_DATA_FAIL, errMessage, e);
}
} else {
dataXRecord.addColumn(new DateColumn(record
.getTimestamp(columnNameValue)));
}
break;
}
case BINARY: {
if (isPartitionColumn) {
dataXRecord.addColumn(new BytesColumn(columnNameValue.getBytes()));
} else {
// dataXRecord.addColumn(new BytesColumn(record
// .getBinary(columnNameValue).data()));
Binary binaryData = record.getBinary(columnNameValue);
if (null == binaryData) {
dataXRecord.addColumn(new BytesColumn(null));
} else {
dataXRecord.addColumn(new BytesColumn(binaryData.data()));
}
}
break;
}
case ARRAY: {
if (isPartitionColumn) {
dataXRecord.addColumn(new StringColumn(columnNameValue));
} else {
List arrayValue = record.getArray(columnNameValue);
if (arrayValue == null) {
dataXRecord.addColumn(new StringColumn(null));
} else {
dataXRecord.addColumn(new StringColumn(JSON.toJSONString(transOdpsArrayToJavaList(arrayValue, (ArrayTypeInfo)typeInfo))));
}
}
break;
}
case MAP: {
if (isPartitionColumn) {
dataXRecord.addColumn(new StringColumn(columnNameValue));
} else {
Map mapValue = record.getMap(columnNameValue);
if (mapValue == null) {
dataXRecord.addColumn(new StringColumn(null));
} else {
dataXRecord.addColumn(new StringColumn(JSON.toJSONString(transOdpsMapToJavaMap(mapValue, (MapTypeInfo)typeInfo))));
}
}
break;
}
case STRUCT: {
if (isPartitionColumn) {
dataXRecord.addColumn(new StringColumn(columnNameValue));
} else {
Struct structValue = record.getStruct(columnNameValue);
if (structValue == null) {
dataXRecord.addColumn(new StringColumn(null));
} else {
dataXRecord.addColumn(new StringColumn(JSON.toJSONString(transOdpsStructToJavaMap(structValue))));
}
}
break;
}
default:
throw DataXException.asDataXException(
OdpsReaderErrorCode.ILLEGAL_VALUE,
MESSAGE_SOURCE.message("readerproxy.5", type));
}
}