def main()

in src/main/scala/deequ/deequ-profile-runner.scala [38:80]


  def main(sysArgs: Array[String]) {

    val args = GlueArgParser.getResolvedOptions(sysArgs, Seq("JOB_NAME",
      "glueDatabase",
      "glueTables",
      "targetBucketName").toArray)

    Job.init(args("JOB_NAME"), glueContext, args.asJava)

    val dbName = args("glueDatabase")
    val tabNames = args("glueTables").split(",").map(_.trim)

    for (tabName <- tabNames) {
        val profiler_df = glueContext.getCatalogSource(database = dbName,
        tableName = tabName,
        redshiftTmpDir = "",
        transformationContext = "datasource0").getDynamicFrame().toDF()

        val profileResult = ColumnProfilerRunner()
        .onData(profiler_df)
        .run()

        val profileResultDataset = profileResult.profiles.map {
        case (productName, profile) => (
            productName,
            profile.completeness,
            profile.dataType.toString,
            profile.approximateNumDistinctValues)
        }.toSeq.toDS

        val finalDataset = profileResultDataset
            .withColumnRenamed("_1", "column")
            .withColumnRenamed("_2", "completeness")
            .withColumnRenamed("_3", "inferred_datatype")
            .withColumnRenamed("_4", "approx_distinct_values")
            .withColumn("timestamp", lit(current_timestamp()))

        writeDStoS3(finalDataset, args("targetBucketName"), "profile-results", dbName, tabName, getYear, getMonth, getDay, getTimestamp)
    }

    Job.commit()

  }