def main()

in src/main/scala/deequ/deequ-analysis-verification-runner.scala [80:154]


  def main(sysArgs: Array[String]) {

    //***********************************************************************//
    // Step1: Create Glue Context and extract Args
    //***********************************************************************//
    val args = GlueArgParser.getResolvedOptions(sysArgs,
      Seq("JOB_NAME",
        "dynamodbSuggestionTableName",
        "dynamodbAnalysisTableName",
        "glueDatabase",
        "glueTables",
        "targetBucketName").toArray)

    Job.init(args("JOB_NAME"), glueContext, args.asJava)

    val dynamodbSuggestionTableName = args("dynamodbSuggestionTableName")
    val dynamodbAnalysisTableName = args("dynamodbAnalysisTableName")
    val dbName = args("glueDatabase")
    val tabNames = args("glueTables").split(",").map(_.trim)
    // Empty dataframes required for successful job compilation
    var suggestionCheckDF: DataFrame = spark.createDataFrame(spark.sparkContext.emptyRDD[Row], StructType(Seq()))
    var analysisCheckDF: DataFrame = spark.createDataFrame(spark.sparkContext.emptyRDD[Row], StructType(Seq()))
    // Empty dataframe required for successful job compilation
    var verificationDataFrame: Seq[DataFrame] = Seq(spark.createDataFrame(spark.sparkContext.emptyRDD[Row], StructType(Seq())))
    var analysisDataFrame: Seq[DataFrame] = Seq(spark.createDataFrame(spark.sparkContext.emptyRDD[Row], StructType(Seq())))

    for (tabName <- tabNames) {
      //***********************************************************************//
      // Step2: Extracting suggestions from DynamoDB using input GLUE table
      //***********************************************************************//
      val suggestionConstraintFullDF: DataFrame = extractSuggestionsFromDynamo(dynamodbSuggestionTableName, dbName, tabName)
      val analysisConstraintFullDF: DataFrame = extractSuggestionsFromDynamo(dynamodbAnalysisTableName, dbName, tabName)
      //***********************************************************************//
      // Step3: Create Dataframe from GLUE tables to run the Verification result
      //***********************************************************************//
      val glueTableDF: DataFrame = readGlueTablesToDF(dbName, tabName)

      //***********************************************************************//
      // Step4: Build validation code dataframe
      //***********************************************************************//
      if (!(suggestionConstraintFullDF.isEmpty)) {
        suggestionCheckDF = buildSuggestionCheckConstraints(suggestionConstraintFullDF, dbName, tabName)
      }
      if (!(analysisConstraintFullDF.isEmpty)) {
        analysisCheckDF = buildAnalysisCheckConstraints(analysisConstraintFullDF, dbName, tabName)
      }

      //***********************************************************************//
      // Step5: Create Check class with scala constraints from Dynamo
      // Step6: Execute Verification Runner and Analysis
      //***********************************************************************//
      if (!(suggestionCheckDF.isEmpty)) {
        verificationDataFrame = createCheckClass(suggestionCheckDF, glueTableDF)
      }
      if (!(analysisCheckDF.isEmpty)) {
        analysisDataFrame = createAnalyzersClass(analysisCheckDF, glueTableDF)
      }

      //***********************************************************************//
      // Step7: Write result dataframe to S3 bucket
      //***********************************************************************//
      if (!(suggestionCheckDF.isEmpty)) {
        verificationDataFrame.foreach{
          resultDF => writeDStoS3(resultDF, args("targetBucketName"), "constraints-verification-results", dbName, tabName, getYear, getMonth, getDay, getTimestamp)
        }
      }
      if (!(analysisCheckDF.isEmpty)) {
        analysisDataFrame.foreach{
          resultDF => writeDStoS3(resultDF, args("targetBucketName"), "constraints-analysis-results", dbName, tabName, getYear, getMonth, getDay, getTimestamp)
        }
      }
    }

    Job.commit()
  }