in measure/src/main/scala/org/apache/griffin/measure/step/write/RecordWriteStep.scala [40:87]
def execute(context: DQContext): Try[Boolean] = Try {
val timestamp = writeTimestampOpt.getOrElse(context.contextId.timestamp)
val writeMode = writeTimestampOpt.map(_ => SimpleMode).getOrElse(context.writeMode)
writeMode match {
case SimpleMode =>
// batch records
val recordsOpt = getBatchRecords(context)
// write records
recordsOpt match {
case Some(records) =>
context.getSinks(timestamp).foreach { sink =>
try {
sink.sinkBatchRecords(records, Option(name))
} catch {
case e: Throwable => error(s"sink records error: ${e.getMessage}", e)
}
}
case _ =>
}
case TimestampMode =>
// streaming records
val (recordsOpt, emptyTimestamps) = getStreamingRecords(context)
// write records
recordsOpt.foreach { records =>
records.foreach { pair =>
val (t, strRecords) = pair
context.getSinks(t).foreach { sink =>
try {
sink.sinkRecords(strRecords, name)
} catch {
case e: Throwable => error(s"sink records error: ${e.getMessage}", e)
}
}
}
}
emptyTimestamps.foreach { t =>
context.getSinks(t).foreach { sink =>
try {
sink.sinkRecords(Nil, name)
} catch {
case e: Throwable => error(s"sink records error: ${e.getMessage}", e)
}
}
}
}
true
}