in src/main/scala/deequ/deequ-analysis-runner.scala [124:150]
def extractInputAnalyzersFromDynamo(dynoTable: String): DataFrame = {
var jobConf_add = new JobConf(spark.sparkContext.hadoopConfiguration)
jobConf_add.set("dynamodb.input.tableName", dynoTable)
jobConf_add.set("mapred.output.format.class", "org.apache.hadoop.dynamodb.write.DynamoDBOutputFormat")
jobConf_add.set("mapred.input.format.class", "org.apache.hadoop.dynamodb.read.DynamoDBInputFormat")
var hadooprdd_add = sparkContext.hadoopRDD(jobConf_add, classOf[DynamoDBInputFormat], classOf[Text], classOf[DynamoDBItemWritable])
val rdd_add: RDD[(String, String, String, String, String, String)] = hadooprdd_add.map {
case (text, dbwritable) => (dbwritable.getItem().get("id").toString(),
dbwritable.getItem().get("database").toString(),
dbwritable.getItem().get("tablename").toString(),
dbwritable.getItem().get("column").toString(),
dbwritable.getItem().get("analyzerCode").toString(),
dbwritable.getItem().get("enable").toString())
}
rdd_add.toDF.withColumn("id", col_extractValue($"_1"))
.withColumn("database", col_extractValue($"_2"))
.withColumn("tablename", col_extractValue($"_3"))
.withColumn("column", col_extractValue($"_4"))
.withColumn("analyzerCode", col_extractValue($"_5"))
.withColumn("enable", col_extractValue($"_6"))
.select("id", "database", "tablename", "column", "analyzerCode", "enable")
}