def runQueries()

in fuzz-testing/src/main/scala/org/apache/comet/fuzz/QueryRunner.scala [31:133]


  def runQueries(
      spark: SparkSession,
      numFiles: Int,
      filename: String,
      showFailedSparkQueries: Boolean = false): Unit = {

    val outputFilename = s"results-${System.currentTimeMillis()}.md"
    // scalastyle:off println
    println(s"Writing results to $outputFilename")
    // scalastyle:on println

    val w = new BufferedWriter(new FileWriter(outputFilename))

    // register input files
    for (i <- 0 until numFiles) {
      val table = spark.read.parquet(s"test$i.parquet")
      val tableName = s"test$i"
      table.createTempView(tableName)
      w.write(
        s"Created table $tableName with schema:\n\t" +
          s"${table.schema.fields.map(f => s"${f.name}: ${f.dataType}").mkString("\n\t")}\n\n")
    }

    val querySource = Source.fromFile(filename)
    try {
      querySource
        .getLines()
        .foreach(sql => {

          try {
            // execute with Spark
            spark.conf.set("spark.comet.enabled", "false")
            val df = spark.sql(sql)
            val sparkRows = df.collect()
            val sparkPlan = df.queryExecution.executedPlan.toString

            // execute with Comet
            try {
              spark.conf.set("spark.comet.enabled", "true")
              // complex type support until we support it natively
              spark.conf.set("spark.comet.sparkToColumnar.enabled", "true")
              spark.conf.set("spark.comet.convert.parquet.enabled", "true")
              val df = spark.sql(sql)
              val cometRows = df.collect()
              val cometPlan = df.queryExecution.executedPlan.toString

              if (sparkRows.length == cometRows.length) {
                var i = 0
                while (i < sparkRows.length) {
                  val l = sparkRows(i)
                  val r = cometRows(i)
                  assert(l.length == r.length)
                  for (j <- 0 until l.length) {
                    if (!same(l(j), r(j))) {
                      showSQL(w, sql)
                      showPlans(w, sparkPlan, cometPlan)
                      w.write(s"First difference at row $i:\n")
                      w.write("Spark: `" + formatRow(l) + "`\n")
                      w.write("Comet: `" + formatRow(r) + "`\n")
                      i = sparkRows.length
                    }
                  }
                  i += 1
                }
              } else {
                showSQL(w, sql)
                showPlans(w, sparkPlan, cometPlan)
                w.write(
                  s"[ERROR] Spark produced ${sparkRows.length} rows and " +
                    s"Comet produced ${cometRows.length} rows.\n")
              }
            } catch {
              case e: Exception =>
                // the query worked in Spark but failed in Comet, so this is likely a bug in Comet
                showSQL(w, sql)
                w.write(s"[ERROR] Query failed in Comet: ${e.getMessage}:\n")
                w.write("```\n")
                val sw = new StringWriter()
                val p = new PrintWriter(sw)
                e.printStackTrace(p)
                p.close()
                w.write(s"${sw.toString}\n")
                w.write("```\n")
            }

            // flush after every query so that results are saved in the event of the driver crashing
            w.flush()

          } catch {
            case e: Exception =>
              // we expect many generated queries to be invalid
              if (showFailedSparkQueries) {
                showSQL(w, sql)
                w.write(s"Query failed in Spark: ${e.getMessage}\n")
              }
          }
        })

    } finally {
      w.close()
      querySource.close()
    }
  }