in clickhousewriter/src/main/java/com/alibaba/datax/plugin/writer/clickhousewriter/ClickhouseWriter.java [66:306]
public void init() {
this.writerSliceConfig = super.getPluginJobConf();
this.commonRdbmsWriterSlave = new CommonRdbmsWriter.Task(DATABASE_TYPE) {
@Override
protected PreparedStatement fillPreparedStatementColumnType(PreparedStatement preparedStatement, int columnIndex, int columnSqltype, String typeName, Column column) throws SQLException {
try {
if (column.getRawData() == null) {
preparedStatement.setNull(columnIndex + 1, columnSqltype);
return preparedStatement;
}
java.util.Date utilDate;
switch (columnSqltype) {
case Types.CHAR:
case Types.NCHAR:
case Types.CLOB:
case Types.NCLOB:
case Types.VARCHAR:
case Types.LONGVARCHAR:
case Types.NVARCHAR:
case Types.LONGNVARCHAR:
preparedStatement.setString(columnIndex + 1, column
.asString());
break;
case Types.TINYINT:
case Types.SMALLINT:
case Types.INTEGER:
case Types.BIGINT:
case Types.DECIMAL:
case Types.FLOAT:
case Types.REAL:
case Types.DOUBLE:
String strValue = column.asString();
if (emptyAsNull && "".equals(strValue)) {
preparedStatement.setNull(columnIndex + 1, columnSqltype);
} else {
switch (columnSqltype) {
case Types.TINYINT:
case Types.SMALLINT:
case Types.INTEGER:
preparedStatement.setInt(columnIndex + 1, column.asBigInteger().intValue());
break;
case Types.BIGINT:
preparedStatement.setLong(columnIndex + 1, column.asLong());
break;
case Types.DECIMAL:
preparedStatement.setBigDecimal(columnIndex + 1, column.asBigDecimal());
break;
case Types.REAL:
case Types.FLOAT:
preparedStatement.setFloat(columnIndex + 1, column.asDouble().floatValue());
break;
case Types.DOUBLE:
preparedStatement.setDouble(columnIndex + 1, column.asDouble());
break;
}
}
break;
case Types.DATE:
if (this.resultSetMetaData.getRight().get(columnIndex)
.equalsIgnoreCase("year")) {
if (column.asBigInteger() == null) {
preparedStatement.setString(columnIndex + 1, null);
} else {
preparedStatement.setInt(columnIndex + 1, column.asBigInteger().intValue());
}
} else {
java.sql.Date sqlDate = null;
try {
utilDate = column.asDate();
} catch (DataXException e) {
throw new SQLException(String.format(
"Date 类型转换错误:[%s]", column));
}
if (null != utilDate) {
sqlDate = new java.sql.Date(utilDate.getTime());
}
preparedStatement.setDate(columnIndex + 1, sqlDate);
}
break;
case Types.TIME:
java.sql.Time sqlTime = null;
try {
utilDate = column.asDate();
} catch (DataXException e) {
throw new SQLException(String.format(
"Date 类型转换错误:[%s]", column));
}
if (null != utilDate) {
sqlTime = new java.sql.Time(utilDate.getTime());
}
preparedStatement.setTime(columnIndex + 1, sqlTime);
break;
case Types.TIMESTAMP:
Timestamp sqlTimestamp = null;
if (column instanceof StringColumn && column.asString() != null) {
String timeStampStr = column.asString();
// JAVA TIMESTAMP 类型入参必须是 "2017-07-12 14:39:00.123566" 格式
String pattern = "^\\d+-\\d+-\\d+ \\d+:\\d+:\\d+.\\d+";
boolean isMatch = Pattern.matches(pattern, timeStampStr);
if (isMatch) {
sqlTimestamp = Timestamp.valueOf(timeStampStr);
preparedStatement.setTimestamp(columnIndex + 1, sqlTimestamp);
break;
}
}
try {
utilDate = column.asDate();
} catch (DataXException e) {
throw new SQLException(String.format(
"Date 类型转换错误:[%s]", column));
}
if (null != utilDate) {
sqlTimestamp = new Timestamp(
utilDate.getTime());
}
preparedStatement.setTimestamp(columnIndex + 1, sqlTimestamp);
break;
case Types.BINARY:
case Types.VARBINARY:
case Types.BLOB:
case Types.LONGVARBINARY:
preparedStatement.setBytes(columnIndex + 1, column
.asBytes());
break;
case Types.BOOLEAN:
preparedStatement.setInt(columnIndex + 1, column.asBigInteger().intValue());
break;
// warn: bit(1) -> Types.BIT 可使用setBoolean
// warn: bit(>1) -> Types.VARBINARY 可使用setBytes
case Types.BIT:
if (this.dataBaseType == DataBaseType.MySql) {
Boolean asBoolean = column.asBoolean();
if (asBoolean != null) {
preparedStatement.setBoolean(columnIndex + 1, asBoolean);
} else {
preparedStatement.setNull(columnIndex + 1, Types.BIT);
}
} else {
preparedStatement.setString(columnIndex + 1, column.asString());
}
break;
default:
boolean isHandled = fillPreparedStatementColumnType4CustomType(preparedStatement,
columnIndex, columnSqltype, column);
if (isHandled) {
break;
}
throw DataXException
.asDataXException(
DBUtilErrorCode.UNSUPPORTED_TYPE,
String.format(
"您的配置文件中的列配置信息有误. 因为DataX 不支持数据库写入这种字段类型. 字段名:[%s], 字段类型:[%d], 字段Java类型:[%s]. 请修改表中该字段的类型或者不同步该字段.",
this.resultSetMetaData.getLeft()
.get(columnIndex),
this.resultSetMetaData.getMiddle()
.get(columnIndex),
this.resultSetMetaData.getRight()
.get(columnIndex)));
}
return preparedStatement;
} catch (DataXException e) {
// fix类型转换或者溢出失败时,将具体哪一列打印出来
if (e.getErrorCode() == CommonErrorCode.CONVERT_NOT_SUPPORT ||
e.getErrorCode() == CommonErrorCode.CONVERT_OVER_FLOW) {
throw DataXException
.asDataXException(
e.getErrorCode(),
String.format(
"类型转化错误. 字段名:[%s], 字段类型:[%d], 字段Java类型:[%s]. 请修改表中该字段的类型或者不同步该字段.",
this.resultSetMetaData.getLeft()
.get(columnIndex),
this.resultSetMetaData.getMiddle()
.get(columnIndex),
this.resultSetMetaData.getRight()
.get(columnIndex)));
} else {
throw e;
}
}
}
private Object toJavaArray(Object val) {
if (null == val) {
return null;
} else if (val instanceof JSONArray) {
Object[] valArray = ((JSONArray) val).toArray();
for (int i = 0; i < valArray.length; i++) {
valArray[i] = this.toJavaArray(valArray[i]);
}
return valArray;
} else {
return val;
}
}
boolean fillPreparedStatementColumnType4CustomType(PreparedStatement ps,
int columnIndex, int columnSqltype,
Column column) throws SQLException {
switch (columnSqltype) {
case Types.OTHER:
if (this.resultSetMetaData.getRight().get(columnIndex).startsWith("Tuple")) {
throw DataXException
.asDataXException(ClickhouseWriterErrorCode.TUPLE_NOT_SUPPORTED_ERROR, ClickhouseWriterErrorCode.TUPLE_NOT_SUPPORTED_ERROR.getDescription());
} else {
ps.setString(columnIndex + 1, column.asString());
}
return true;
case Types.ARRAY:
Connection conn = ps.getConnection();
List<Object> values = JSON.parseArray(column.asString(), Object.class);
for (int i = 0; i < values.size(); i++) {
values.set(i, this.toJavaArray(values.get(i)));
}
Array array = conn.createArrayOf("String", values.toArray());
ps.setArray(columnIndex + 1, array);
return true;
default:
break;
}
return false;
}
};
this.commonRdbmsWriterSlave.init(this.writerSliceConfig);
}