in odpswriter/src/main/java/com/alibaba/datax/plugin/writer/odpswriter/OdpsWriterProxy.java [615:942]
private Map parseMap(JSONObject json, MapTypeInfo typeInfo) throws ParseException {
if (json == null) {
return null;
}
Map<Object, String> keyMap = new HashMap();
Set<String> keys = json.keySet();
switch (typeInfo.getKeyTypeInfo().getOdpsType()) {
case BIGINT:
for (String item : keys) {
keyMap.put(Long.parseLong(item), item);
}
break;
/**
* 双精度浮点
*/
case DOUBLE:
for (String item : keys) {
keyMap.put(Double.parseDouble(item), item);
}
break;
/**
* 布尔型
*/
case BOOLEAN:
for (String item : keys) {
keyMap.put(Boolean.parseBoolean(item), item);
}
break;
/**
* 日期类型
*/
case DATETIME:
// TODO 精度
for (String item : keys) {
keyMap.put(dateFormat.parse(item), item);
}
break;
/**
* 字符串类型
*/
case STRING:
for (String item : keys) {
keyMap.put(item, item);
}
break;
/**
* 精确小数类型
*/
case DECIMAL:
for (String item : keys) {
keyMap.put(new BigDecimal(item), item);
}
break;
/**
* 1字节有符号整型
*/
case TINYINT:
for (String item : keys) {
keyMap.put(Byte.parseByte(item), item);
}
break;
/**
* 2字节有符号整型
*/
case SMALLINT:
for (String item : keys) {
keyMap.put(Short.parseShort(item), item);
}
break;
/**
* 4字节有符号整型
*/
case INT:
for (String item : keys) {
keyMap.put(Integer.parseInt(item), item);
}
break;
/**
* 单精度浮点
*/
case FLOAT:
for (String item : keys) {
keyMap.put(Float.parseFloat(item), item);
}
break;
/**
* 固定长度字符串
*/
case CHAR:
for (String item : keys) {
keyMap.put(new Char(item, ((CharTypeInfo) typeInfo.getKeyTypeInfo()).getLength()), item);
}
break;
/**
* 可变长度字符串
*/
case VARCHAR:
for (String item : keys) {
keyMap.put(new Varchar(item, ((VarcharTypeInfo) typeInfo.getKeyTypeInfo()).getLength()), item);
}
break;
/**
* 时间类型
*/
case DATE:
// TODO string -> date need timezone
// TODO how to use odps Record
for (String item : keys) {
keyMap.put(java.sql.Date.valueOf(item), item);
}
break;
/**
* 时间戳
*/
case TIMESTAMP:
for (String item : keys) {
keyMap.put(Timestamp.valueOf(item), item);
}
break;
/**
* 字节数组
*/
case BINARY:
for (String item : keys) {
keyMap.put(new Binary(Base64.decodeBase64(item)), item);
}
break;
/**
* 日期间隔
*/
case INTERVAL_DAY_TIME:
for (String item : keys) {
JSONObject jsonObject = JSON.parseObject(item);
keyMap.put(new IntervalDayTime(jsonObject.getInteger("totalSeconds"), jsonObject.getInteger("nanos")),
item);
}
break;
/**
* 年份间隔
*/
case INTERVAL_YEAR_MONTH:
for (String item : keys) {
JSONObject jsonObject = JSON.parseObject(item);
keyMap.put(new IntervalYearMonth(jsonObject.getInteger("years"), jsonObject.getInteger("months")),
item);
}
break;
default:
break;
// TODO throw an exception
}
Map result = new HashMap();
// process map value
switch (typeInfo.getValueTypeInfo().getOdpsType()) {
case BIGINT:
for (Map.Entry<Object, String> item : keyMap.entrySet()) {
result.put(item.getKey(), json.getLong(item.getValue()));
}
return result;
/**
* 双精度浮点
*/
case DOUBLE:
for (Map.Entry<Object, String> item : keyMap.entrySet()) {
result.put(item.getKey(), json.getDouble(item.getValue()));
}
return result;
/**
* 布尔型
*/
case BOOLEAN:
for (Map.Entry<Object, String> item : keyMap.entrySet()) {
result.put(item.getKey(), json.getBoolean(item.getValue()));
}
return result;
/**
* 日期类型
*/
case DATETIME:
// TODO 精度
for (Map.Entry<Object, String> item : keyMap.entrySet()) {
result.put(item.getKey(), dateFormat.parse(json.getString(item.getValue())));
}
return result;
/**
* 字符串类型
*/
case STRING:
for (Map.Entry<Object, String> item : keyMap.entrySet()) {
result.put(item.getKey(), json.getString(item.getValue()));
}
return result;
/**
* 精确小数类型
*/
case DECIMAL:
for (Map.Entry<Object, String> item : keyMap.entrySet()) {
result.put(item.getKey(), json.getBigDecimal(item.getValue()));
}
return result;
/**
* 1字节有符号整型
*/
case TINYINT:
for (Map.Entry<Object, String> item : keyMap.entrySet()) {
result.put(item.getKey(), json.getByte(item.getValue()));
}
return result;
/**
* 2字节有符号整型
*/
case SMALLINT:
for (Map.Entry<Object, String> item : keyMap.entrySet()) {
result.put(item.getKey(), json.getShort(item.getValue()));
}
return result;
/**
* 4字节有符号整型
*/
case INT:
for (Map.Entry<Object, String> item : keyMap.entrySet()) {
result.put(item.getKey(), json.getInteger(item.getValue()));
}
return result;
/**
* 单精度浮点
*/
case FLOAT:
for (Map.Entry<Object, String> item : keyMap.entrySet()) {
result.put(item.getKey(), json.getFloat(item.getValue()));
}
return result;
/**
* 固定长度字符串
*/
case CHAR:
for (Map.Entry<Object, String> item : keyMap.entrySet()) {
result.put(item.getKey(), new Char(json.getString(item.getValue()),
((CharTypeInfo) typeInfo.getValueTypeInfo()).getLength()));
}
return result;
/**
* 可变长度字符串
*/
case VARCHAR:
for (Map.Entry<Object, String> item : keyMap.entrySet()) {
result.put(item.getKey(), new Varchar(json.getString(item.getValue()),
((VarcharTypeInfo) typeInfo.getValueTypeInfo()).getLength()));
}
return result;
/**
* 时间类型
*/
case DATE:
// TODO string -> date need timezone
// TODO how to use odps Record
for (Map.Entry<Object, String> item : keyMap.entrySet()) {
result.put(item.getKey(), java.sql.Date.valueOf(json.getString(item.getValue())));
}
return result;
/**
* 时间戳
*/
case TIMESTAMP:
for (Map.Entry<Object, String> item : keyMap.entrySet()) {
result.put(item.getKey(), Timestamp.valueOf(json.getString(item.getValue())));
}
return result;
/**
* 字节数组
*/
case BINARY:
for (Map.Entry<Object, String> item : keyMap.entrySet()) {
result.put(item.getKey(), new Binary(Base64.decodeBase64(json.getString(item.getValue()))));
}
return result;
/**
* 日期间隔
*/
case INTERVAL_DAY_TIME:
for (Map.Entry<Object, String> item : keyMap.entrySet()) {
JSONObject jsonObject = json.getJSONObject(item.getValue());
result.put(item.getKey(),
new IntervalDayTime(jsonObject.getInteger("totalSeconds"), jsonObject.getInteger("nanos")));
}
return result;
/**
* 年份间隔
*/
case INTERVAL_YEAR_MONTH:
for (Map.Entry<Object, String> item : keyMap.entrySet()) {
JSONObject jsonObject = json.getJSONObject(item.getValue());
result.put(item.getKey(),
new IntervalYearMonth(jsonObject.getInteger("years"), jsonObject.getInteger("months")));
}
return result;
/**
* 结构体
*/
case STRUCT:
for (Map.Entry<Object, String> item : keyMap.entrySet()) {
result.put(item.getKey(),
parseStruct(json.getJSONObject(item.getValue()), (StructTypeInfo) typeInfo.getValueTypeInfo()));
}
return result;
/**
* MAP类型
*/
case MAP:
for (Map.Entry<Object, String> item : keyMap.entrySet()) {
result.put(item.getKey(),
parseMap(json.getJSONObject(item.getValue()), (MapTypeInfo) typeInfo.getValueTypeInfo()));
}
return result;
/**
* ARRAY类型
*/
case ARRAY:
for (Map.Entry<Object, String> item : keyMap.entrySet()) {
result.put(item.getKey(),
parseArray(json.getJSONArray(item.getValue()), (ArrayTypeInfo) typeInfo.getValueTypeInfo()));
}
return result;
default:
throw new IllegalArgumentException("decode record failed. column type: " + typeInfo.getTypeName());
}
}