in GlueCustomConnectors/development/Spark/glue-3.0/tpcds-custom-connector-for-glue3.0/jobvalidation/scala/GlueJobValidationDataPartitioningTest.scala [19:78]
def main(sysArgs: Array[String]) {
val spark: SparkContext = new SparkContext()
val glueContext: GlueContext = new GlueContext(spark, 1, 20) // Change these values for preventing from repartition by GlueContext default config.
val connectionName = "GlueTPCDSConnection"
val connectionType = "marketplace.spark"
// 1. Partition count - generated single chunk data and row count is more than numPartitions
val options_1 = Map(
"table" -> "customer",
"scale" -> "1",
"numPartitions" -> "5",
"connectionName" -> connectionName
)
val dyf_1 = glueContext.getSource(
connectionType = connectionType,
connectionOptions = JsonOptions(options_1),
transformationContext = "dyf"
).getDynamicFrame()
assert(dyf_1.getNumPartitions == 5)
assert(dyf_1.count == 100000)
// 2. Partition count - generated single chunk data and row count is less than numPartitions
val options_2 = Map(
"table" -> "call_center",
"scale" -> "1",
"numPartitions" -> "100",
"connectionName" -> connectionName
)
val dyf_2 = glueContext.getSource(
connectionType = connectionType,
connectionOptions = JsonOptions(options_2),
transformationContext = "dyf"
).getDynamicFrame()
assert(dyf_2.getNumPartitions == 1)
assert(dyf_2.count == 6)
// 3. Partition count - generated multiple chunk data - in parallel
val options_3 = Map(
"table" -> "customer",
"scale" -> "100",
"numPartitions" -> "100",
"connectionName" -> connectionName
)
val dyf_3 = glueContext.getSource(
connectionType = connectionType,
connectionOptions = JsonOptions(options_3),
transformationContext = "dyf"
).getDynamicFrame()
assert(dyf_3.getNumPartitions == 100)
assert(dyf_3.count == 2000000)
}