in streampark-flink/streampark-flink-connector/streampark-flink-connector-base/src/main/scala/org/apache/streampark/flink/connector/failover/FailoverWriter.scala [42:137]
def write(request: SinkRequest): Unit = {
this.synchronized {
val table = request.table.split("\\.").last
failoverStorage match {
case NONE =>
case Console =>
val records = request.records.map(x => s"(${cleanUp(x)})")
logInfo(s"failover body: [ ${records.mkString(",")} ]")
case Kafka =>
if (!Lock.initialized) {
try {
Lock.lock.lock()
if (!Lock.initialized) {
Lock.initialized = true
properties.put(
"key.serializer",
"org.apache.kafka.common.serialization.StringSerializer")
properties.put(
"value.serializer",
"org.apache.kafka.common.serialization.StringSerializer")
kafkaProducer = new KafkaProducer[String, String](properties)
}
} catch {
case exception: Exception => {
logError(
s"build Failover storageType:KAFKA failed exception ${exception.getStackTrace
.mkString("Array(", ", ", ")")}")
throw exception
}
} finally {
Lock.lock.unlock()
}
}
val topic = properties.getProperty(KEY_KAFKA_TOPIC)
val timestamp = System.currentTimeMillis()
val records = request.records.map(cleanUp)
val sendData =
s"""
|{
|"values":[${records.mkString(",")}],
|"timestamp":$timestamp
|}
|""".stripMargin
val record = new ProducerRecord[String, String](topic, sendData)
kafkaProducer
.send(
record,
new Callback() {
override def onCompletion(recordMetadata: RecordMetadata, e: Exception): Unit = {
logInfo(
s"Failover successful!! storageType:Kafka,table: $table,size:${request.size}")
}
}
)
.get()
case MySQL =>
if (!Lock.initialized) {
try {
Lock.lock.lock()
if (!Lock.initialized) {
Lock.initialized = true
properties.put(KEY_ALIAS, s"failover-$table")
val mysqlConnect = JdbcUtils.getConnection(properties)
val mysqlTable =
mysqlConnect.getMetaData.getTables(null, null, table, Array("TABLE", "VIEW"))
if (!mysqlTable.next()) {
JdbcUtils.execute(
mysqlConnect,
s"create table $table (`values` text, `timestamp` bigint)")
logWarn(s"Failover storageType:MySQL,table: $table is not exist,auto created...")
}
}
} catch {
case exception: Exception =>
logError(
s"build Failover storageType:MySQL failed exception ${exception.getStackTrace}")
throw exception
} finally {
Lock.lock.unlock()
}
}
val timestamp = System.currentTimeMillis()
val records = request.records.map(
x => {
val v = cleanUp(x)
s""" ($v,$timestamp) """.stripMargin
})
val sql = s"INSERT INTO $table(`values`,`timestamp`) VALUES ${records.mkString(",")} "
JdbcUtils.update(sql)(properties)
logInfo(s"Failover successful!! storageType:MySQL,table: $table,size:${request.size}")
case _ =>
throw new UnsupportedOperationException(
s"[StreamPark] unsupported failover storageType:$failoverStorage")
}
}
}