in odpswriter/src/main/java/com/alibaba/datax/plugin/writer/odpswriter/OdpsWriterProxy.java [250:436]
public Record dataxRecordToOdpsRecord(com.alibaba.datax.common.element.Record dataXRecord) throws Exception {
int sourceColumnCount = dataXRecord.getColumnNumber();
ArrayRecord odpsRecord = (ArrayRecord) slaveUpload.newRecord();
int userConfiguredColumnNumber = this.columnPositions.size();
if (sourceColumnCount > userConfiguredColumnNumber) {
throw DataXException.asDataXException(OdpsWriterErrorCode.ILLEGAL_VALUE,
MESSAGE_SOURCE.message("odpswriterproxy.1", sourceColumnCount, userConfiguredColumnNumber));
} else if (sourceColumnCount < userConfiguredColumnNumber) {
if (printColumnLess) {
LOG.warn(MESSAGE_SOURCE.message("odpswriterproxy.2", sourceColumnCount, userConfiguredColumnNumber));
}
printColumnLess = false;
}
int currentIndex = 0;
int sourceIndex = 0;
try {
com.alibaba.datax.common.element.Column columnValue;
for (; sourceIndex < sourceColumnCount; sourceIndex++) {
// 跳过分区列
if (this.columnPositions.get(sourceIndex) == -1) {
continue;
}
currentIndex = columnPositions.get(sourceIndex);
TypeInfo typeInfo = this.tableOriginalColumnTypeList.get(currentIndex);
OdpsType type = typeInfo.getOdpsType();
String typeName = typeInfo.getTypeName();
columnValue = dataXRecord.getColumn(sourceIndex);
if (columnValue == null) {
continue;
}
// for compatible dt lib, "" as null
if (this.emptyAsNull && columnValue instanceof StringColumn && "".equals(columnValue.asString())) {
continue;
}
switch (type) {
case STRING:
String newValue = (String)OdpsUtil.processOverLengthData(columnValue.asString(), OdpsType.STRING, this.overLengthRule, this.maxFieldLength, this.enableOverLengthOutput);
odpsRecord.setString(currentIndex, newValue);
break;
case BIGINT:
odpsRecord.setBigint(currentIndex, columnValue.asLong());
break;
case BOOLEAN:
odpsRecord.setBoolean(currentIndex, columnValue.asBoolean());
break;
case DATETIME:
odpsRecord.setDatetime(currentIndex, columnValue.asDate());
// Date datetimeData = columnValue.asDate();
// if (null == datetimeData) {
// odpsRecord.setDatetime(currentIndex, null);
// } else {
// Timestamp dateDataForOdps = new Timestamp(datetimeData.getTime());
// if (datetimeData instanceof java.sql.Timestamp) {
// dateDataForOdps.setNanos(((java.sql.Timestamp)datetimeData).getNanos());
// }
// odpsRecord.setDatetime(currentIndex, dateDataForOdps);
// }
break;
case DATE:
Date dateData = columnValue.asDate();
if (null == dateData) {
odpsRecord.setDatetime(currentIndex, null);
} else {
if (this.useDateWithCalendar) {
odpsRecord.setDate(currentIndex, new java.sql.Date(dateData.getTime()), this.calendarForDate);
} else {
odpsRecord.setDatetime(currentIndex, new java.sql.Date(dateData.getTime()));
}
}
break;
case DOUBLE:
odpsRecord.setDouble(currentIndex, columnValue.asDouble());
break;
case FLOAT:
Double floatValue = columnValue.asDouble();
if (null == floatValue) {
((ArrayRecord) odpsRecord).setFloat(currentIndex, null);
} else {
((ArrayRecord) odpsRecord).setFloat(currentIndex, floatValue.floatValue());
}
break;
case DECIMAL:
odpsRecord.setDecimal(currentIndex, columnValue.asBigDecimal());
String columnStr = columnValue.asString();
if (columnStr != null && columnStr.indexOf(".") >= 36) {
throw new Exception(MESSAGE_SOURCE.message("odpswriterproxy.3"));
}
break;
case TINYINT:
Long tinyintValueStr = columnValue.asLong();
if (null == tinyintValueStr) {
((ArrayRecord) odpsRecord).setTinyint(currentIndex, null);
} else {
((ArrayRecord) odpsRecord).setTinyint(currentIndex,
Byte.valueOf(String.valueOf(tinyintValueStr)));
}
break;
case SMALLINT:
Long smallIntValue = columnValue.asLong();
if (null == smallIntValue) {
((ArrayRecord) odpsRecord).setSmallint(currentIndex, null);
} else {
((ArrayRecord) odpsRecord).setSmallint(currentIndex, smallIntValue.shortValue());
}
break;
case INT:
Long intValue = columnValue.asLong();
if (null == intValue) {
((ArrayRecord) odpsRecord).setInt(currentIndex, null);
} else {
((ArrayRecord) odpsRecord).setInt(currentIndex, intValue.intValue());
}
break;
case VARCHAR:
// warn: columnValue.asString() 为 null 时 , odps sdk 有 BUG
// 不能用 Varchar 的默认构造函数,不然有 NPE
String varcharValueStr = columnValue.asString();
Varchar varcharData = null;
if (varcharValueStr != null){
varcharData = new Varchar(columnValue.asString());
}
((ArrayRecord) odpsRecord).setVarchar(currentIndex, varcharData);
break;
case CHAR:
String charValueStr = columnValue.asString();
Char charData = null;
if (charValueStr != null ){
charData = new Char(charValueStr);
}
((ArrayRecord) odpsRecord).setChar(currentIndex, charData);
break;
case TIMESTAMP:
Date timestampData = columnValue.asDate();
if (null == timestampData) {
((ArrayRecord) odpsRecord).setTimestamp(currentIndex, null);
} else {
Timestamp timestampDataForOdps = new Timestamp(timestampData.getTime());
if (timestampData instanceof java.sql.Timestamp) {
// 纳秒
timestampDataForOdps.setNanos(((java.sql.Timestamp)timestampData).getNanos());
}
// warn优化:如果原来类型就是Timestamp,直接使用就少创建了一个对象
((ArrayRecord) odpsRecord).setTimestamp(currentIndex, timestampDataForOdps);
}
break;
case BINARY:
Binary newBinaryData = (Binary)OdpsUtil.processOverLengthData(new Binary(columnValue.asBytes()), OdpsType.BINARY, this.overLengthRule, this.maxFieldLength, this.enableOverLengthOutput);
((ArrayRecord) odpsRecord).setBinary(currentIndex,columnValue.asBytes() == null ? null : newBinaryData);
break;
case ARRAY:
JSONArray arrayJson = JSON.parseArray(columnValue.asString());
((ArrayRecord) odpsRecord).setArray(currentIndex, parseArray(arrayJson, (ArrayTypeInfo) typeInfo));
break;
case MAP:
JSONObject mapJson = JSON.parseObject(columnValue.asString());
((ArrayRecord) odpsRecord).setMap(currentIndex, parseMap(mapJson, (MapTypeInfo) typeInfo));
break;
case STRUCT:
JSONObject structJson = JSON.parseObject(columnValue.asString());
((ArrayRecord) odpsRecord).setStruct(currentIndex,
parseStruct(structJson, (StructTypeInfo) typeInfo));
break;
default:
break;
}
}
return odpsRecord;
} catch (Exception e) {
String dirtyColumnName = "";
try {
dirtyColumnName = this.allColumns.get(currentIndex);
} catch (Exception ignoreEx) {
// ignore
}
String message = MESSAGE_SOURCE.message("odpswriterproxy.4", sourceIndex, dirtyColumnName);
this.taskPluginCollector.collectDirtyRecord(dataXRecord, e, message);
return null;
}
}