in src/main/scala/deequ/deequ-profile-runner.scala [38:80]
def main(sysArgs: Array[String]) {
val args = GlueArgParser.getResolvedOptions(sysArgs, Seq("JOB_NAME",
"glueDatabase",
"glueTables",
"targetBucketName").toArray)
Job.init(args("JOB_NAME"), glueContext, args.asJava)
val dbName = args("glueDatabase")
val tabNames = args("glueTables").split(",").map(_.trim)
for (tabName <- tabNames) {
val profiler_df = glueContext.getCatalogSource(database = dbName,
tableName = tabName,
redshiftTmpDir = "",
transformationContext = "datasource0").getDynamicFrame().toDF()
val profileResult = ColumnProfilerRunner()
.onData(profiler_df)
.run()
val profileResultDataset = profileResult.profiles.map {
case (productName, profile) => (
productName,
profile.completeness,
profile.dataType.toString,
profile.approximateNumDistinctValues)
}.toSeq.toDS
val finalDataset = profileResultDataset
.withColumnRenamed("_1", "column")
.withColumnRenamed("_2", "completeness")
.withColumnRenamed("_3", "inferred_datatype")
.withColumnRenamed("_4", "approx_distinct_values")
.withColumn("timestamp", lit(current_timestamp()))
writeDStoS3(finalDataset, args("targetBucketName"), "profile-results", dbName, tabName, getYear, getMonth, getDay, getTimestamp)
}
Job.commit()
}