private[this] def formatSql()

in streampark-common/src/main/scala/org/apache/streampark/common/util/SqlConvertUtils.scala [100:175]


  private[this] def formatSql(sql: String): String = {

    val LENGTH_REGEXP = "(.*?)\\s*\\([^\\\\)|^\\n]+,$".r

    val COMMENT_REGEXP =
      Pattern.compile("(comment)\\s+(['\"])", Pattern.CASE_INSENSITIVE)

    @tailrec def commentJoin(
        map: util.Map[Integer, String],
        index: Integer,
        segment: String): (Integer, String) = {
      val matcher = COMMENT_REGEXP.matcher(segment)
      matcher.find() match {
        case b if !b => index -> segment
        case _ =>
          val s = matcher.group(2)
          val regexp = s"\\$s(,|)$$".r
          val cleaned = segment
            .replaceFirst("(?i)(comment)\\s+(['\"])", "")
            .replace(s"\\$s", "")
          regexp.findFirstIn(cleaned) match {
            case Some(_) => index -> segment
            case _ =>
              val nextLine = map(index + 1)
              commentJoin(map, index + 1, s"$segment$nextLine")
          }
      }
    }

    @tailrec def lengthJoin(
        map: util.Map[Integer, String],
        index: Integer,
        segment: String): (Integer, String) = {
      LENGTH_REGEXP.findFirstIn(segment) match {
        case None => commentJoin(map, index, segment)
        case Some(_) =>
          val nextLine = map(index + 1)
          lengthJoin(map, index + 1, s"${segment.trim}$nextLine")
      }
    }

    val body = sql
      .substring(sql.indexOf("("), sql.lastIndexOf(")") + 1)
      .replaceAll("\r|\n|\r\n", "")
      .replaceFirst("\\(", "(\n")
      .replaceFirst("\\)$", "\n)")
      .replaceAll(",", ",\n")

    val scanner = new Scanner(body)
    val map = new util.HashMap[Integer, String]()
    while (scanner.hasNextLine) {
      map.put(map.size(), scanner.nextLine().trim)
    }
    val sqlBuffer = new StringBuffer(sql.substring(0, sql.indexOf("(")))
    var skipNo: Int = -1
    map.foreach(a => {
      if (a._1 > skipNo) {
        val length = lengthJoin(map, a._1, a._2)
        if (length._1 > a._1) {
          sqlBuffer.append(length._2).append("\n")
          skipNo = length._1
        } else {
          val comment = commentJoin(map, a._1, a._2)
          if (comment._1 > a._1) {
            sqlBuffer.append(comment._2).append("\n")
            skipNo = comment._1
          } else {
            sqlBuffer.append(a._2).append("\n")
          }
        }
      }
    })
    scanner.close()
    sqlBuffer.toString.trim.concat(sql.substring(sql.lastIndexOf(")") + 1))

  }