def write()

in streampark-flink/streampark-flink-connector/streampark-flink-connector-base/src/main/scala/org/apache/streampark/flink/connector/failover/FailoverWriter.scala [42:137]


  def write(request: SinkRequest): Unit = {
    this.synchronized {
      val table = request.table.split("\\.").last
      failoverStorage match {
        case NONE =>
        case Console =>
          val records = request.records.map(x => s"(${cleanUp(x)})")
          logInfo(s"failover body: [ ${records.mkString(",")} ]")
        case Kafka =>
          if (!Lock.initialized) {
            try {
              Lock.lock.lock()
              if (!Lock.initialized) {
                Lock.initialized = true
                properties.put(
                  "key.serializer",
                  "org.apache.kafka.common.serialization.StringSerializer")
                properties.put(
                  "value.serializer",
                  "org.apache.kafka.common.serialization.StringSerializer")
                kafkaProducer = new KafkaProducer[String, String](properties)
              }
            } catch {
              case exception: Exception => {
                logError(
                  s"build Failover storageType:KAFKA failed exception ${exception.getStackTrace
                      .mkString("Array(", ", ", ")")}")
                throw exception
              }
            } finally {
              Lock.lock.unlock()
            }
          }
          val topic = properties.getProperty(KEY_KAFKA_TOPIC)
          val timestamp = System.currentTimeMillis()
          val records = request.records.map(cleanUp)
          val sendData =
            s"""
               |{
               |"values":[${records.mkString(",")}],
               |"timestamp":$timestamp
               |}
               |""".stripMargin
          val record = new ProducerRecord[String, String](topic, sendData)
          kafkaProducer
            .send(
              record,
              new Callback() {
                override def onCompletion(recordMetadata: RecordMetadata, e: Exception): Unit = {
                  logInfo(
                    s"Failover successful!! storageType:Kafka,table: $table,size:${request.size}")
                }
              }
            )
            .get()
        case MySQL =>
          if (!Lock.initialized) {
            try {
              Lock.lock.lock()
              if (!Lock.initialized) {
                Lock.initialized = true
                properties.put(KEY_ALIAS, s"failover-$table")
                val mysqlConnect = JdbcUtils.getConnection(properties)
                val mysqlTable =
                  mysqlConnect.getMetaData.getTables(null, null, table, Array("TABLE", "VIEW"))
                if (!mysqlTable.next()) {
                  JdbcUtils.execute(
                    mysqlConnect,
                    s"create table $table (`values` text, `timestamp` bigint)")
                  logWarn(s"Failover storageType:MySQL,table: $table is not exist,auto created...")
                }
              }
            } catch {
              case exception: Exception =>
                logError(
                  s"build Failover storageType:MySQL failed exception ${exception.getStackTrace}")
                throw exception
            } finally {
              Lock.lock.unlock()
            }
          }
          val timestamp = System.currentTimeMillis()
          val records = request.records.map(
            x => {
              val v = cleanUp(x)
              s""" ($v,$timestamp) """.stripMargin
            })
          val sql = s"INSERT INTO $table(`values`,`timestamp`) VALUES ${records.mkString(",")} "
          JdbcUtils.update(sql)(properties)
          logInfo(s"Failover successful!! storageType:MySQL,table: $table,size:${request.size}")
        case _ =>
          throw new UnsupportedOperationException(
            s"[StreamPark] unsupported failover storageType:$failoverStorage")
      }
    }
  }