in src/main/java/com/uber/uberscriptquery/jdbc/SingleTableJdbcWriter.java [80:148]
public void writeColumns(List<String> columnNames, List<Object> columnValues) {
if (columnNames.size() != columnValues.size()) {
throw new RuntimeException(String.format("", columnNames.size(), columnValues.size()));
}
if (this.columnNames.isEmpty()) {
this.initialize();
}
List<String> effectiveColumnNames = new ArrayList<>();
List<Object> effectiveColumnValues = new ArrayList<>();
for (int i = 0; i < columnNames.size(); i++) {
String columnName = columnNames.get(i);
Object columnValue = columnValues.get(i);
if (ignoreColumnsLowerCase.contains(columnName.toLowerCase())) {
continue;
}
effectiveColumnNames.add(columnName);
effectiveColumnValues.add(columnValue);
}
List<String> updateList = new ArrayList<>();
for (int i = 0; i < effectiveColumnNames.size(); i++) {
updateList.add(String.format("%s=?", effectiveColumnNames.get(i)));
}
String sql = String.format("INSERT INTO %s (%s) VALUES (%s) ON DUPLICATE KEY UPDATE %s",
tableName,
StringUtils.join(effectiveColumnNames, ", "),
StringUtils.join(Collections.nCopies(effectiveColumnNames.size(), "?"), ", "),
StringUtils.join(updateList, ", "));
LOG.info("Running sql: " + sql);
Connection connection = connectionProvider.getConnection();
try (PreparedStatement stmt = connection.prepareStatement(sql)) {
for (int i = 0; i < effectiveColumnValues.size(); i++) {
if (effectiveColumnValues.get(i) == null) {
stmt.setObject(i + 1, null);
} else if (textColumnsLowerCase.contains(effectiveColumnNames.get(i).toLowerCase())) {
Clob clob = connection.createClob();
clob.setString(1, effectiveColumnValues.get(i).toString());
stmt.setClob(i + 1, clob);
} else {
stmt.setObject(i + 1, effectiveColumnValues.get(i));
}
}
for (int i = 0; i < effectiveColumnValues.size(); i++) {
if (effectiveColumnValues.get(i) == null) {
stmt.setObject(effectiveColumnValues.size() + i + 1, null);
} else if (textColumnsLowerCase.contains(effectiveColumnNames.get(i).toLowerCase())) {
Clob clob = connection.createClob();
clob.setString(1, effectiveColumnValues.get(i).toString());
stmt.setClob(effectiveColumnValues.size() + i + 1, clob);
} else {
stmt.setObject(effectiveColumnValues.size() + i + 1, effectiveColumnValues.get(i));
}
}
stmt.executeUpdate();
LOG.info("Finished sql: " + sql);
} catch (Throwable e) {
connectionProvider.close();
String dataStr = String.format("Columns: %s, Values: %s", StringUtils.join(columnNames, ','), StringUtils.join(columnValues, ','));
throw new RuntimeException(
String.format("Failed to run sql [%s] to insert data: %s: %s", sql, dataStr, ExceptionUtils.getStackTrace(e)),
e);
}
}