in cassandrareader/src/main/java/com/alibaba/datax/plugin/reader/cassandrareader/CassandraReaderHelper.java [263:414]
static Record buildRecord(Record record, Row rs, ColumnDefinitions metaData, int columnNumber,
TaskPluginCollector taskPluginCollector) {
try {
for (int i = 0; i < columnNumber; i++)
try {
if (rs.isNull(i)) {
record.addColumn(new StringColumn());
continue;
}
switch (metaData.getType(i).getName()) {
case ASCII:
case TEXT:
case VARCHAR:
record.addColumn(new StringColumn(rs.getString(i)));
break;
case BLOB:
record.addColumn(new BytesColumn(rs.getBytes(i).array()));
break;
case BOOLEAN:
record.addColumn(new BoolColumn(rs.getBool(i)));
break;
case SMALLINT:
record.addColumn(new LongColumn((int)rs.getShort(i)));
break;
case TINYINT:
record.addColumn(new LongColumn((int)rs.getByte(i)));
break;
case INT:
record.addColumn(new LongColumn(rs.getInt(i)));
break;
case COUNTER:
case BIGINT:
record.addColumn(new LongColumn(rs.getLong(i)));
break;
case VARINT:
record.addColumn(new LongColumn(rs.getVarint(i)));
break;
case FLOAT:
record.addColumn(new DoubleColumn(rs.getFloat(i)));
break;
case DOUBLE:
record.addColumn(new DoubleColumn(rs.getDouble(i)));
break;
case DECIMAL:
record.addColumn(new DoubleColumn(rs.getDecimal(i)));
break;
case DATE:
record.addColumn(new DateColumn(rs.getDate(i).getMillisSinceEpoch()));
break;
case TIME:
record.addColumn(new LongColumn(rs.getTime(i)));
break;
case TIMESTAMP:
record.addColumn(new DateColumn(rs.getTimestamp(i)));
break;
case UUID:
case TIMEUUID:
record.addColumn(new StringColumn(rs.getUUID(i).toString()));
break;
case INET:
record.addColumn(new StringColumn(rs.getInet(i).getHostAddress()));
break;
case DURATION:
record.addColumn(new StringColumn(rs.get(i,Duration.class).toString()));
break;
case LIST: {
TypeToken listEltClass = registry.codecFor(metaData.getType(i).getTypeArguments().get(0)).getJavaType();
List<?> l = rs.getList(i, listEltClass);
record.addColumn(new StringColumn(toJSonString(l,metaData.getType(i))));
}
break;
case MAP: {
DataType keyType = metaData.getType(i).getTypeArguments().get(0);
DataType valType = metaData.getType(i).getTypeArguments().get(1);
TypeToken<?> keyEltClass = registry.codecFor(keyType).getJavaType();
TypeToken<?> valEltClass = registry.codecFor(valType).getJavaType();
Map<?,?> m = rs.getMap(i, keyEltClass, valEltClass);
record.addColumn(new StringColumn(toJSonString(m,metaData.getType(i))));
}
break;
case SET: {
TypeToken<?> setEltClass = registry.codecFor(metaData.getType(i).getTypeArguments().get(0))
.getJavaType();
Set<?> set = rs.getSet(i, setEltClass);
record.addColumn(new StringColumn(toJSonString(set,metaData.getType(i))));
}
break;
case TUPLE: {
TupleValue t = rs.getTupleValue(i);
record.addColumn(new StringColumn(toJSonString(t,metaData.getType(i))));
}
break;
case UDT: {
UDTValue t = rs.getUDTValue(i);
record.addColumn(new StringColumn(toJSonString(t,metaData.getType(i))));
}
break;
default:
throw DataXException
.asDataXException(
CassandraReaderErrorCode.CONF_ERROR,
String.format(
"您的配置文件中的列配置信息有误. 因为DataX 不支持数据库读取这种字段类型. 字段名:[%s], "
+ "字段类型:[%s]. ",
metaData.getName(i),
metaData.getType(i)));
}
} catch (TypeNotSupported t) {
throw DataXException
.asDataXException(
CassandraReaderErrorCode.CONF_ERROR,
String.format(
"您的配置文件中的列配置信息有误. 因为DataX 不支持数据库读取这种字段类型. 字段名:[%s], "
+ "字段类型:[%s]. ",
metaData.getName(i),
metaData.getType(i)));
}
} catch (Exception e) {
//TODO 这里识别为脏数据靠谱吗?
taskPluginCollector.collectDirtyRecord(record, e);
if (e instanceof DataXException) {
throw (DataXException) e;
}
return null;
}
return record;
}