in src/main/scala/deequ/deequ-suggestion-runner.scala [184:212]
def writeSuggestionsToDynamo(finalSuggestionsDF: DataFrame, outputDynamoTable: String) = {
val dynoWriteConf = new JobConf(spark.sparkContext.hadoopConfiguration)
dynoWriteConf.set("dynamodb.output.tableName", outputDynamoTable)
dynoWriteConf.set("dynamodb.throughput.write.percent", "1.5")
dynoWriteConf.set("mapred.input.format.class", "org.apache.hadoop.dynamodb.read.DynamoDBInputFormat")
dynoWriteConf.set("mapred.output.format.class", "org.apache.hadoop.dynamodb.write.DynamoDBOutputFormat")
dynoWriteConf.set("mapred.output.committer.class", "org.apache.hadoop.mapred.FileOutputCommitter")
val schema_ddb = finalSuggestionsDF.dtypes
var dynoInsertFormattedRDD = finalSuggestionsDF.rdd.map(a => {
val ddbMap = new HashMap[String, AttributeValue]()
for (i <- 0 to schema_ddb.length - 1) {
val value = a.get(i)
if (value != null) {
val att = new AttributeValue()
att.setS(value.toString)
ddbMap.put(schema_ddb(i)._1, att)
}
}
val item = new DynamoDBItemWritable()
item.setItem(ddbMap)
(new Text(""), item)
}
)
dynoInsertFormattedRDD.saveAsHadoopDataset(dynoWriteConf)
}