def extractInputAnalyzersFromDynamo()

in src/main/scala/deequ/deequ-analysis-runner.scala [124:150]


  def extractInputAnalyzersFromDynamo(dynoTable: String): DataFrame = {

    var jobConf_add = new JobConf(spark.sparkContext.hadoopConfiguration)
    jobConf_add.set("dynamodb.input.tableName", dynoTable)
    jobConf_add.set("mapred.output.format.class", "org.apache.hadoop.dynamodb.write.DynamoDBOutputFormat")
    jobConf_add.set("mapred.input.format.class", "org.apache.hadoop.dynamodb.read.DynamoDBInputFormat")

    var hadooprdd_add = sparkContext.hadoopRDD(jobConf_add, classOf[DynamoDBInputFormat], classOf[Text], classOf[DynamoDBItemWritable])

    val rdd_add: RDD[(String, String, String, String, String, String)] = hadooprdd_add.map {
      case (text, dbwritable) => (dbwritable.getItem().get("id").toString(),
        dbwritable.getItem().get("database").toString(),
        dbwritable.getItem().get("tablename").toString(),
        dbwritable.getItem().get("column").toString(),
        dbwritable.getItem().get("analyzerCode").toString(),
        dbwritable.getItem().get("enable").toString())
    }

    rdd_add.toDF.withColumn("id", col_extractValue($"_1"))
      .withColumn("database", col_extractValue($"_2"))
      .withColumn("tablename", col_extractValue($"_3"))
      .withColumn("column", col_extractValue($"_4"))
      .withColumn("analyzerCode", col_extractValue($"_5"))
      .withColumn("enable", col_extractValue($"_6"))
      .select("id", "database", "tablename", "column", "analyzerCode", "enable")

  }