in cassandrawriter/src/main/java/com/alibaba/datax/plugin/writer/cassandrawriter/CassandraWriterHelper.java [247:349]
public static void setupColumn(BoundStatement ps, int pos, DataType sqlType, Column col) throws Exception {
if (col.getRawData() != null) {
switch (sqlType.getName()) {
case ASCII:
case TEXT:
case VARCHAR:
ps.setString(pos, col.asString());
break;
case BLOB:
ps.setBytes(pos, ByteBuffer.wrap(col.asBytes()));
break;
case BOOLEAN:
ps.setBool(pos, col.asBoolean());
break;
case TINYINT:
ps.setByte(pos, col.asLong().byteValue());
break;
case SMALLINT:
ps.setShort(pos, col.asLong().shortValue());
break;
case INT:
ps.setInt(pos, col.asLong().intValue());
break;
case BIGINT:
ps.setLong(pos, col.asLong());
break;
case VARINT:
ps.setVarint(pos, col.asBigInteger());
break;
case FLOAT:
ps.setFloat(pos, col.asDouble().floatValue());
break;
case DOUBLE:
ps.setDouble(pos, col.asDouble());
break;
case DECIMAL:
ps.setDecimal(pos, col.asBigDecimal());
break;
case DATE:
ps.setDate(pos, LocalDate.fromMillisSinceEpoch(col.asDate().getTime()));
break;
case TIME:
ps.setTime(pos, col.asLong());
break;
case TIMESTAMP:
ps.setTimestamp(pos, col.asDate());
break;
case UUID:
case TIMEUUID:
ps.setUUID(pos, UUID.fromString(col.asString()));
break;
case INET:
ps.setInet(pos, InetAddress.getByName(col.asString()));
break;
case DURATION:
ps.set(pos, Duration.from(col.asString()), Duration.class);
break;
case LIST:
ps.setList(pos, (List<?>) parseFromString(col.asString(), sqlType));
break;
case MAP:
ps.setMap(pos, (Map) parseFromString(col.asString(), sqlType));
break;
case SET:
ps.setSet(pos, (Set) parseFromString(col.asString(), sqlType));
break;
case TUPLE:
ps.setTupleValue(pos, (TupleValue) parseFromString(col.asString(), sqlType));
break;
case UDT:
ps.setUDTValue(pos, (UDTValue) parseFromString(col.asString(), sqlType));
break;
default:
throw DataXException.asDataXException(CassandraWriterErrorCode.CONF_ERROR,
"不支持您配置的列类型:" + sqlType + ", 请检查您的配置 或者 联系 管理员.");
} // end switch
} else {
ps.setToNull(pos);
}
}