def main()

in GlueCustomConnectors/development/Spark/glue-3.0/tpcds-custom-connector-for-glue3.0/jobvalidation/scala/GlueJobValidationDataPartitioningTest.scala [19:78]


  def main(sysArgs: Array[String]) {
    val spark: SparkContext = new SparkContext()
    val glueContext: GlueContext = new GlueContext(spark, 1, 20) // Change these values for preventing from repartition by GlueContext default config.

    val connectionName = "GlueTPCDSConnection"
    val connectionType = "marketplace.spark"

    // 1. Partition count - generated single chunk data and row count is more than numPartitions
    val options_1 = Map(
      "table" -> "customer",
      "scale" -> "1",
      "numPartitions" -> "5",
      "connectionName" -> connectionName
    )

    val dyf_1 = glueContext.getSource(
      connectionType = connectionType,
      connectionOptions = JsonOptions(options_1),
      transformationContext = "dyf"
    ).getDynamicFrame()

    assert(dyf_1.getNumPartitions == 5)
    assert(dyf_1.count == 100000)


    // 2. Partition count - generated single chunk data and row count is less than numPartitions
    val options_2 = Map(
      "table" -> "call_center",
      "scale" -> "1",
      "numPartitions" -> "100",
      "connectionName" -> connectionName
    )

    val dyf_2 = glueContext.getSource(
      connectionType = connectionType,
      connectionOptions = JsonOptions(options_2),
      transformationContext = "dyf"
    ).getDynamicFrame()

    assert(dyf_2.getNumPartitions == 1)
    assert(dyf_2.count == 6)


    // 3. Partition count - generated multiple chunk data - in parallel
    val options_3 = Map(
      "table" -> "customer",
      "scale" -> "100",
      "numPartitions" -> "100",
      "connectionName" -> connectionName
    )

    val dyf_3 = glueContext.getSource(
      connectionType = connectionType,
      connectionOptions = JsonOptions(options_3),
      transformationContext = "dyf"
    ).getDynamicFrame()

    assert(dyf_3.getNumPartitions == 100)
    assert(dyf_3.count == 2000000)
  }