in src/main/java/com/uber/uberscriptquery/execution/QueryEngine.java [129:217]
public void executeScript(String query, String queryOverwrite, SparkSession spark, boolean debug) {
QuerySqlParser parser = new QuerySqlParser();
logger.info("Parsing query statement: " + query);
RootStatement rootStatement = parser.parse(query, queryOverwrite).getRootStatement();
if (rootStatement.getFileAssignments() != null) {
for (FileAssignment fileAssignment : rootStatement.getFileAssignments()) {
logger.info("Processing file input: " + fileAssignment);
String inputFormat = fileAssignment.getFileType();
String inputLocation = fileAssignment.getFileLocation();
Dataset<Row> df = SparkUtils.loadFile(inputFormat, inputLocation, spark);
logger.info("Finished loading file input: " + fileAssignment);
processAndRegisterTempTable(df, rootStatement, fileAssignment.getTableAlias(), fileAssignment.toString(), debug);
}
}
if (rootStatement.getJsonQueryStatementAssignments() != null) {
for (StatementAssignment statementAssignment : rootStatement.getJsonQueryStatementAssignments()) {
logger.info("Processing query statement: " + statementAssignment);
JsonInputStatementExecutor jsonInputStatementExecutor = jsonInputStatementExecutors.get(statementAssignment.getQueryEngine().toLowerCase());
if (jsonInputStatementExecutor == null) {
throw new RuntimeException("Not supported json query engine: " + statementAssignment.getQueryEngine());
}
Dataset<Row> df = jsonInputStatementExecutor.execute(spark, statementAssignment, this.credentialManager);
logger.info("Finished query statement: " + statementAssignment);
processAndRegisterTempTable(df, rootStatement, statementAssignment.getTableAlias(), statementAssignment.toString(), debug);
}
}
if (rootStatement.getStatementAssignments() != null) {
for (StatementAssignment statementAssignment : rootStatement.getStatementAssignments()) {
logger.info("Processing query statement: " + statementAssignment);
Dataset<Row> df;
if (statementAssignment.getQueryType() == null) {
logger.info("Running query by spark sql: " + statementAssignment.getQueryText());
df = spark.sql(statementAssignment.getQueryText());
} else if (statementAssignment.getQueryType().equalsIgnoreCase("SQL")) {
logger.info("Running query by SQL: " + statementAssignment);
SqlInputStatementExecutor sqlInputStatementExecutor = sqlInputStatementExecutors.get(statementAssignment.getQueryEngine().toLowerCase());
if (sqlInputStatementExecutor == null) {
throw new RuntimeException("Not supported sql query engine: " + statementAssignment.getQueryEngine());
}
df = sqlInputStatementExecutor.execute(spark, statementAssignment, this.credentialManager);
} else if (statementAssignment.getQueryType().equalsIgnoreCase("JSON")) {
logger.info("Running query by JSON: " + statementAssignment);
JsonInputStatementExecutor jsonInputStatementExecutor = jsonInputStatementExecutors.get(statementAssignment.getQueryEngine().toLowerCase());
if (jsonInputStatementExecutor == null) {
throw new RuntimeException("Not supported json query engine: " + statementAssignment.getQueryEngine());
}
df = jsonInputStatementExecutor.execute(spark, statementAssignment, this.credentialManager);
} else if (statementAssignment.getQueryType().equalsIgnoreCase("datagen")) {
logger.info("Running datagen: " + statementAssignment);
if (statementAssignment.getQueryEngine().equalsIgnoreCase(DATAGEN_week_timepoints_by_10_minutes)) {
df = generateData_week_timepoints_by_10_minutes(spark);
} else if (statementAssignment.getQueryEngine().equalsIgnoreCase(DATAGEN_numbers_1k)) {
df = generateData_numbers_1k(spark);
} else {
throw new RuntimeException("Query statement not supported for datagen: " + statementAssignment);
}
} else {
throw new RuntimeException("Query statement not supported: " + statementAssignment);
}
logger.info("Finished query statement: " + statementAssignment);
processAndRegisterTempTable(df, rootStatement, statementAssignment.getTableAlias(), statementAssignment.toString(), debug);
}
}
QueryActionEngine actionExecutor = new QueryActionEngine();
actionExecutor.setCredentialProvider(credentialManager);
for (Map.Entry<String, ActionStatementExecutor> entry : actionStatementExecutors.entrySet()) {
actionExecutor.addActionStatementExecutor(entry.getKey(), entry.getValue());
}
for (ActionStatement actionStatement : rootStatement.getActionStatements()) {
logger.info("Running action statement: " + actionStatement);
actionExecutor.execute(actionStatement, spark);
logger.info("Finished action statement: " + actionStatement);
}
}