in src/main/scala/deequ/deequ-analysis-runner.scala [71:117]
def main(sysArgs: Array[String]) {
//***********************************************************************//
// Step1: Create Glue Context and extract Args
//***********************************************************************//
val args = GlueArgParser.getResolvedOptions(sysArgs,
Seq("JOB_NAME",
"dynoInputAnalysisTable",
"glueDatabase",
"glueTable",
"targetBucketName",
"targetBucketPrefix").toArray)
Job.init(args("JOB_NAME"), glueContext, args.asJava)
//***********************************************************************//
// Step2: Extracting analysis constraints from DynamoDB using input GLUE table
//***********************************************************************//
val suggestionConstraintFullDF: DataFrame = extractInputAnalyzersFromDynamo(args("dynoInputAnalysisTable"))
//***********************************************************************//
// Step3: Create Dataframe from GLUE tables to run the Verification result
//***********************************************************************//
val glueTableDF: DataFrame = readGlueTablesToDF(args("glueDatabase"), args("glueTable"))
//***********************************************************************//
// Step4: Build validation code dataframe
//***********************************************************************//
val checkDF: DataFrame = getAnalyzerConstraints(suggestionConstraintFullDF, args("glueDatabase"), args("glueTable"))
//***********************************************************************//
// Step5: Create Analyzer class with scala constraints from Dynamo
// Step6: Exeucte Analysis Runner
//***********************************************************************//
val resultDataFrame: Seq[DataFrame] = createAnalyzersClass(checkDF, glueTableDF)
//***********************************************************************//
// Step7: Write result dataframe to S3 bucket
//***********************************************************************//
resultDataFrame.foreach{
resultDF => writeDStoS3(resultDF, args("glueDatabase"), args("glueTable"), args("targetBucketName"), args("targetBucketPrefix"))
}
Job.commit()
}