public void executeScript()

in src/main/java/com/uber/uberscriptquery/execution/QueryEngine.java [129:217]


    public void executeScript(String query, String queryOverwrite, SparkSession spark, boolean debug) {
        QuerySqlParser parser = new QuerySqlParser();

        logger.info("Parsing query statement: " + query);
        RootStatement rootStatement = parser.parse(query, queryOverwrite).getRootStatement();

        if (rootStatement.getFileAssignments() != null) {
            for (FileAssignment fileAssignment : rootStatement.getFileAssignments()) {
                logger.info("Processing file input: " + fileAssignment);
                String inputFormat = fileAssignment.getFileType();
                String inputLocation = fileAssignment.getFileLocation();
                Dataset<Row> df = SparkUtils.loadFile(inputFormat, inputLocation, spark);
                logger.info("Finished loading file input: " + fileAssignment);

                processAndRegisterTempTable(df, rootStatement, fileAssignment.getTableAlias(), fileAssignment.toString(), debug);
            }
        }

        if (rootStatement.getJsonQueryStatementAssignments() != null) {
            for (StatementAssignment statementAssignment : rootStatement.getJsonQueryStatementAssignments()) {
                logger.info("Processing query statement: " + statementAssignment);
                JsonInputStatementExecutor jsonInputStatementExecutor = jsonInputStatementExecutors.get(statementAssignment.getQueryEngine().toLowerCase());
                if (jsonInputStatementExecutor == null) {
                    throw new RuntimeException("Not supported json query engine: " + statementAssignment.getQueryEngine());
                }

                Dataset<Row> df = jsonInputStatementExecutor.execute(spark, statementAssignment, this.credentialManager);
                logger.info("Finished query statement: " + statementAssignment);

                processAndRegisterTempTable(df, rootStatement, statementAssignment.getTableAlias(), statementAssignment.toString(), debug);
            }
        }

        if (rootStatement.getStatementAssignments() != null) {
            for (StatementAssignment statementAssignment : rootStatement.getStatementAssignments()) {
                logger.info("Processing query statement: " + statementAssignment);
                Dataset<Row> df;
                if (statementAssignment.getQueryType() == null) {
                    logger.info("Running query by spark sql: " + statementAssignment.getQueryText());
                    df = spark.sql(statementAssignment.getQueryText());
                } else if (statementAssignment.getQueryType().equalsIgnoreCase("SQL")) {
                    logger.info("Running query by SQL: " + statementAssignment);

                    SqlInputStatementExecutor sqlInputStatementExecutor = sqlInputStatementExecutors.get(statementAssignment.getQueryEngine().toLowerCase());
                    if (sqlInputStatementExecutor == null) {
                        throw new RuntimeException("Not supported sql query engine: " + statementAssignment.getQueryEngine());
                    }

                    df = sqlInputStatementExecutor.execute(spark, statementAssignment, this.credentialManager);
                } else if (statementAssignment.getQueryType().equalsIgnoreCase("JSON")) {
                    logger.info("Running query by JSON: " + statementAssignment);

                    JsonInputStatementExecutor jsonInputStatementExecutor = jsonInputStatementExecutors.get(statementAssignment.getQueryEngine().toLowerCase());
                    if (jsonInputStatementExecutor == null) {
                        throw new RuntimeException("Not supported json query engine: " + statementAssignment.getQueryEngine());
                    }

                    df = jsonInputStatementExecutor.execute(spark, statementAssignment, this.credentialManager);
                } else if (statementAssignment.getQueryType().equalsIgnoreCase("datagen")) {
                    logger.info("Running datagen: " + statementAssignment);
                    if (statementAssignment.getQueryEngine().equalsIgnoreCase(DATAGEN_week_timepoints_by_10_minutes)) {
                        df = generateData_week_timepoints_by_10_minutes(spark);
                    } else if (statementAssignment.getQueryEngine().equalsIgnoreCase(DATAGEN_numbers_1k)) {
                        df = generateData_numbers_1k(spark);
                    } else {
                        throw new RuntimeException("Query statement not supported for datagen: " + statementAssignment);
                    }
                } else {
                    throw new RuntimeException("Query statement not supported: " + statementAssignment);
                }
                logger.info("Finished query statement: " + statementAssignment);

                processAndRegisterTempTable(df, rootStatement, statementAssignment.getTableAlias(), statementAssignment.toString(), debug);
            }
        }

        QueryActionEngine actionExecutor = new QueryActionEngine();
        actionExecutor.setCredentialProvider(credentialManager);

        for (Map.Entry<String, ActionStatementExecutor> entry : actionStatementExecutors.entrySet()) {
            actionExecutor.addActionStatementExecutor(entry.getKey(), entry.getValue());
        }

        for (ActionStatement actionStatement : rootStatement.getActionStatements()) {
            logger.info("Running action statement: " + actionStatement);
            actionExecutor.execute(actionStatement, spark);
            logger.info("Finished action statement: " + actionStatement);
        }
    }