in tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/action/Parameterized.scala [103:250]
override def execute(suite: Suite): Boolean = {
val runner: QueryRunner =
new QueryRunner(suite.queryResource(), suite.dataWritePath(scale, genPartitionedData))
val sessionSwitcher = suite.sessionSwitcher
val testConf = suite.getTestConf()
println("Prepared coordinates: ")
coordinates.keys.foreach { c =>
println(s" ${c.id}: $c")
}
coordinates.foreach { entry =>
// register one session per coordinate
val coordinate = entry._1
val coordinateConf = entry._2
val conf = testConf.clone()
conf.setAllWarningOnOverriding(coordinateConf)
sessionSwitcher.registerSession(coordinate.toString, conf)
}
val runQueryIds = queries.select(suite).map(TestResultLine.QueryId(_))
val marks: Seq[TestResultLine.CoordMark] = coordinates.flatMap { entry =>
val coordinate = entry._1
sessionSwitcher.useSession(coordinate.toString, "Parameterized %s".format(coordinate))
runner.createTables(suite.tableCreator(), sessionSwitcher.spark())
runQueryIds.flatMap { queryId =>
// warm up
(0 until warmupIterations).foreach { iteration =>
println(s"Warming up: Running query $queryId (iteration $iteration)...")
try {
Parameterized.warmUp(
runner,
sessionSwitcher.spark(),
queryId.id,
coordinate,
suite.desc())
} finally {
if (noSessionReuse) {
sessionSwitcher.renewSession()
runner.createTables(suite.tableCreator(), sessionSwitcher.spark())
}
}
}
// run
(0 until iterations).map { iteration =>
println(s"Running query $queryId with coordinate $coordinate (iteration $iteration)...")
val r =
try {
Parameterized.runQuery(
runner,
sessionSwitcher.spark(),
queryId.id,
coordinate,
suite.desc(),
explain,
metrics)
} finally {
if (noSessionReuse) {
sessionSwitcher.renewSession()
runner.createTables(suite.tableCreator(), sessionSwitcher.spark())
}
}
TestResultLine.CoordMark(iteration, queryId, r)
}
}
}.toSeq
val results: Seq[TestResultLine] = marks
.groupBy(m => (m.iteration, m.queryId))
.toSeq
.sortBy(_._1)
.map { e =>
val iteration = e._1._1
val queryId = e._1._2
val marks = e._2
val line = TestResultLine(queryId, marks.map(_.coord).toList)
line
}
val succeededCount = results.count(l => l.succeeded())
val totalCount = results.count(_ => true)
// RAM stats
println("Performing GC to collect RAM statistics... ")
System.gc()
System.gc()
printf(
"RAM statistics: JVM Heap size: %d KiB (total %d KiB), Process RSS: %d KiB\n",
RamStat.getJvmHeapUsed(),
RamStat.getJvmHeapTotal(),
RamStat.getProcessRamUsed())
println("")
println("Test report: ")
println("")
printf(
"Summary: %d out of %d queries successfully run on all config combinations. \n",
succeededCount,
totalCount)
println("")
println("Configurations:")
coordinates.foreach(coord => println(s"${coord._1.id}. ${coord._1}"))
println("")
val succeeded = results.filter(_.succeeded())
val all = succeeded match {
case Nil => None
case several =>
Some(
TestResultLine(
TestResultLine.QueryId("all"),
coordinates.keys.map { c =>
TestResultLine.Coord(
c,
several
.map(_.coord(c.id))
.map(_.queryResult)
.asSuccesses()
.agg(s"coordinate $c")
.get)
}.toSeq))
}
TestResultLines(coordinates.map(_._1.id).toSeq, configDimensions, metrics, succeeded ++ all)
.print()
println("")
if (succeededCount == totalCount) {
println("No failed queries. ")
println("")
} else {
println("Failed queries: ")
println("")
TestResultLines(
coordinates.map(_._1.id).toSeq,
configDimensions,
metrics,
results.filter(!_.succeeded()))
.print()
println("")
}
if (succeededCount != totalCount) {
return false
}
true
}