in fuzz-testing/src/main/scala/org/apache/comet/fuzz/QueryRunner.scala [31:133]
def runQueries(
spark: SparkSession,
numFiles: Int,
filename: String,
showFailedSparkQueries: Boolean = false): Unit = {
val outputFilename = s"results-${System.currentTimeMillis()}.md"
// scalastyle:off println
println(s"Writing results to $outputFilename")
// scalastyle:on println
val w = new BufferedWriter(new FileWriter(outputFilename))
// register input files
for (i <- 0 until numFiles) {
val table = spark.read.parquet(s"test$i.parquet")
val tableName = s"test$i"
table.createTempView(tableName)
w.write(
s"Created table $tableName with schema:\n\t" +
s"${table.schema.fields.map(f => s"${f.name}: ${f.dataType}").mkString("\n\t")}\n\n")
}
val querySource = Source.fromFile(filename)
try {
querySource
.getLines()
.foreach(sql => {
try {
// execute with Spark
spark.conf.set("spark.comet.enabled", "false")
val df = spark.sql(sql)
val sparkRows = df.collect()
val sparkPlan = df.queryExecution.executedPlan.toString
// execute with Comet
try {
spark.conf.set("spark.comet.enabled", "true")
// complex type support until we support it natively
spark.conf.set("spark.comet.sparkToColumnar.enabled", "true")
spark.conf.set("spark.comet.convert.parquet.enabled", "true")
val df = spark.sql(sql)
val cometRows = df.collect()
val cometPlan = df.queryExecution.executedPlan.toString
if (sparkRows.length == cometRows.length) {
var i = 0
while (i < sparkRows.length) {
val l = sparkRows(i)
val r = cometRows(i)
assert(l.length == r.length)
for (j <- 0 until l.length) {
if (!same(l(j), r(j))) {
showSQL(w, sql)
showPlans(w, sparkPlan, cometPlan)
w.write(s"First difference at row $i:\n")
w.write("Spark: `" + formatRow(l) + "`\n")
w.write("Comet: `" + formatRow(r) + "`\n")
i = sparkRows.length
}
}
i += 1
}
} else {
showSQL(w, sql)
showPlans(w, sparkPlan, cometPlan)
w.write(
s"[ERROR] Spark produced ${sparkRows.length} rows and " +
s"Comet produced ${cometRows.length} rows.\n")
}
} catch {
case e: Exception =>
// the query worked in Spark but failed in Comet, so this is likely a bug in Comet
showSQL(w, sql)
w.write(s"[ERROR] Query failed in Comet: ${e.getMessage}:\n")
w.write("```\n")
val sw = new StringWriter()
val p = new PrintWriter(sw)
e.printStackTrace(p)
p.close()
w.write(s"${sw.toString}\n")
w.write("```\n")
}
// flush after every query so that results are saved in the event of the driver crashing
w.flush()
} catch {
case e: Exception =>
// we expect many generated queries to be invalid
if (showFailedSparkQueries) {
showSQL(w, sql)
w.write(s"Query failed in Spark: ${e.getMessage}\n")
}
}
})
} finally {
w.close()
querySource.close()
}
}