def writeSuggestionsToDynamo()

in src/main/scala/deequ/deequ-suggestion-runner.scala [184:212]


  def writeSuggestionsToDynamo(finalSuggestionsDF: DataFrame, outputDynamoTable: String) = {

    val dynoWriteConf = new JobConf(spark.sparkContext.hadoopConfiguration)
    dynoWriteConf.set("dynamodb.output.tableName", outputDynamoTable)
    dynoWriteConf.set("dynamodb.throughput.write.percent", "1.5")
    dynoWriteConf.set("mapred.input.format.class", "org.apache.hadoop.dynamodb.read.DynamoDBInputFormat")
    dynoWriteConf.set("mapred.output.format.class", "org.apache.hadoop.dynamodb.write.DynamoDBOutputFormat")
    dynoWriteConf.set("mapred.output.committer.class", "org.apache.hadoop.mapred.FileOutputCommitter")

    val schema_ddb = finalSuggestionsDF.dtypes

    var dynoInsertFormattedRDD = finalSuggestionsDF.rdd.map(a => {
      val ddbMap = new HashMap[String, AttributeValue]()
      for (i <- 0 to schema_ddb.length - 1) {
        val value = a.get(i)
        if (value != null) {
          val att = new AttributeValue()
          att.setS(value.toString)
          ddbMap.put(schema_ddb(i)._1, att)
        }
      }
      val item = new DynamoDBItemWritable()
      item.setItem(ddbMap)
      (new Text(""), item)
    }
    )
    dynoInsertFormattedRDD.saveAsHadoopDataset(dynoWriteConf)

  }