in src/main/java/com/uber/uberscriptquery/execution/WriteJdbcActionStatementExecutor.java [38:77]
public Object execute(SparkSession sparkSession, ActionStatement actionStatement, CredentialProvider credentialManager) {
String connectionString = actionStatement.getParamValues().get(0).getValue().toString();
String outputTableName = actionStatement.getParamValues().get(1).getValue().toString();
String primaryKeysStr = actionStatement.getParamValues().get(2).getValue().toString();
String indexColumnsStr = actionStatement.getParamValues().get(3).getValue().toString();
String textColumnsStr = actionStatement.getParamValues().get(4).getValue().toString();
String saveModeStr = actionStatement.getParamValues().get(5).getValue().toString();
String dfTableName = actionStatement.getParamValues().get(6).getValue().toString();
int dfPartitionCount = 1;
if (actionStatement.getParamValues().size() > 7) {
String str = actionStatement.getParamValues().get(7).getValue().toString();
dfPartitionCount = Integer.parseInt(str);
}
double writesPerSecond = 100000;
if (actionStatement.getParamValues().size() > 8) {
writesPerSecond = Double.parseDouble(actionStatement.getParamValues().get(8).getValue().toString());
logger.info(String.format("Use writesPerSecond: %s for db table %s", writesPerSecond, outputTableName));
}
String postWriteSql = null;
if (actionStatement.getParamValues().size() > 9) {
postWriteSql = actionStatement.getParamValues().get(9).getValue().toString();
}
List<String> primaryKeys = primaryKeysStr.isEmpty() ? new ArrayList<>() : Arrays.asList(primaryKeysStr.split(","));
List<String> indexColumns = indexColumnsStr.isEmpty() ? new ArrayList<>() : Arrays.asList(indexColumnsStr.split(","));
List<String> textColumns = textColumnsStr.isEmpty() ? new ArrayList<>() : Arrays.asList(textColumnsStr.split(","));
SaveMode saveMode = SaveMode.valueOf(saveModeStr);
String sql = String.format("select * from %s", dfTableName);
logger.info(String.format("Running sql [%s] to get data and then save it", sql));
Dataset<Row> df = sparkSession.sql(sql).coalesce(dfPartitionCount);
SparkUtils.writeJdbc(df, connectionString, outputTableName, primaryKeys, indexColumns, textColumns, saveMode, postWriteSql, writesPerSecond);
logger.info(String.format("Saved data [%s] to %s, %s", sql, outputTableName, saveMode));
return null;
}