in src/main/scala/deequ/deequ-analysis-verification-runner.scala [80:154]
def main(sysArgs: Array[String]) {
//***********************************************************************//
// Step1: Create Glue Context and extract Args
//***********************************************************************//
val args = GlueArgParser.getResolvedOptions(sysArgs,
Seq("JOB_NAME",
"dynamodbSuggestionTableName",
"dynamodbAnalysisTableName",
"glueDatabase",
"glueTables",
"targetBucketName").toArray)
Job.init(args("JOB_NAME"), glueContext, args.asJava)
val dynamodbSuggestionTableName = args("dynamodbSuggestionTableName")
val dynamodbAnalysisTableName = args("dynamodbAnalysisTableName")
val dbName = args("glueDatabase")
val tabNames = args("glueTables").split(",").map(_.trim)
// Empty dataframes required for successful job compilation
var suggestionCheckDF: DataFrame = spark.createDataFrame(spark.sparkContext.emptyRDD[Row], StructType(Seq()))
var analysisCheckDF: DataFrame = spark.createDataFrame(spark.sparkContext.emptyRDD[Row], StructType(Seq()))
// Empty dataframe required for successful job compilation
var verificationDataFrame: Seq[DataFrame] = Seq(spark.createDataFrame(spark.sparkContext.emptyRDD[Row], StructType(Seq())))
var analysisDataFrame: Seq[DataFrame] = Seq(spark.createDataFrame(spark.sparkContext.emptyRDD[Row], StructType(Seq())))
for (tabName <- tabNames) {
//***********************************************************************//
// Step2: Extracting suggestions from DynamoDB using input GLUE table
//***********************************************************************//
val suggestionConstraintFullDF: DataFrame = extractSuggestionsFromDynamo(dynamodbSuggestionTableName, dbName, tabName)
val analysisConstraintFullDF: DataFrame = extractSuggestionsFromDynamo(dynamodbAnalysisTableName, dbName, tabName)
//***********************************************************************//
// Step3: Create Dataframe from GLUE tables to run the Verification result
//***********************************************************************//
val glueTableDF: DataFrame = readGlueTablesToDF(dbName, tabName)
//***********************************************************************//
// Step4: Build validation code dataframe
//***********************************************************************//
if (!(suggestionConstraintFullDF.isEmpty)) {
suggestionCheckDF = buildSuggestionCheckConstraints(suggestionConstraintFullDF, dbName, tabName)
}
if (!(analysisConstraintFullDF.isEmpty)) {
analysisCheckDF = buildAnalysisCheckConstraints(analysisConstraintFullDF, dbName, tabName)
}
//***********************************************************************//
// Step5: Create Check class with scala constraints from Dynamo
// Step6: Execute Verification Runner and Analysis
//***********************************************************************//
if (!(suggestionCheckDF.isEmpty)) {
verificationDataFrame = createCheckClass(suggestionCheckDF, glueTableDF)
}
if (!(analysisCheckDF.isEmpty)) {
analysisDataFrame = createAnalyzersClass(analysisCheckDF, glueTableDF)
}
//***********************************************************************//
// Step7: Write result dataframe to S3 bucket
//***********************************************************************//
if (!(suggestionCheckDF.isEmpty)) {
verificationDataFrame.foreach{
resultDF => writeDStoS3(resultDF, args("targetBucketName"), "constraints-verification-results", dbName, tabName, getYear, getMonth, getDay, getTimestamp)
}
}
if (!(analysisCheckDF.isEmpty)) {
analysisDataFrame.foreach{
resultDF => writeDStoS3(resultDF, args("targetBucketName"), "constraints-analysis-results", dbName, tabName, getYear, getMonth, getDay, getTimestamp)
}
}
}
Job.commit()
}