public Object execute()

in src/main/java/com/uber/uberscriptquery/execution/WriteJdbcActionStatementExecutor.java [38:77]


    public Object execute(SparkSession sparkSession, ActionStatement actionStatement, CredentialProvider credentialManager) {
        String connectionString = actionStatement.getParamValues().get(0).getValue().toString();
        String outputTableName = actionStatement.getParamValues().get(1).getValue().toString();
        String primaryKeysStr = actionStatement.getParamValues().get(2).getValue().toString();
        String indexColumnsStr = actionStatement.getParamValues().get(3).getValue().toString();
        String textColumnsStr = actionStatement.getParamValues().get(4).getValue().toString();
        String saveModeStr = actionStatement.getParamValues().get(5).getValue().toString();
        String dfTableName = actionStatement.getParamValues().get(6).getValue().toString();

        int dfPartitionCount = 1;
        if (actionStatement.getParamValues().size() > 7) {
            String str = actionStatement.getParamValues().get(7).getValue().toString();
            dfPartitionCount = Integer.parseInt(str);
        }

        double writesPerSecond = 100000;
        if (actionStatement.getParamValues().size() > 8) {
            writesPerSecond = Double.parseDouble(actionStatement.getParamValues().get(8).getValue().toString());
            logger.info(String.format("Use writesPerSecond: %s for db table %s", writesPerSecond, outputTableName));
        }

        String postWriteSql = null;
        if (actionStatement.getParamValues().size() > 9) {
            postWriteSql = actionStatement.getParamValues().get(9).getValue().toString();
        }

        List<String> primaryKeys = primaryKeysStr.isEmpty() ? new ArrayList<>() : Arrays.asList(primaryKeysStr.split(","));
        List<String> indexColumns = indexColumnsStr.isEmpty() ? new ArrayList<>() : Arrays.asList(indexColumnsStr.split(","));
        List<String> textColumns = textColumnsStr.isEmpty() ? new ArrayList<>() : Arrays.asList(textColumnsStr.split(","));

        SaveMode saveMode = SaveMode.valueOf(saveModeStr);

        String sql = String.format("select * from %s", dfTableName);
        logger.info(String.format("Running sql [%s] to get data and then save it", sql));
        Dataset<Row> df = sparkSession.sql(sql).coalesce(dfPartitionCount);
        SparkUtils.writeJdbc(df, connectionString, outputTableName, primaryKeys, indexColumns, textColumns, saveMode, postWriteSql, writesPerSecond);
        logger.info(String.format("Saved data [%s] to %s, %s", sql, outputTableName, saveMode));

        return null;
    }