def execute()

in measure/src/main/scala/org/apache/griffin/measure/step/write/RecordWriteStep.scala [40:87]


  def execute(context: DQContext): Try[Boolean] = Try {
    val timestamp = writeTimestampOpt.getOrElse(context.contextId.timestamp)

    val writeMode = writeTimestampOpt.map(_ => SimpleMode).getOrElse(context.writeMode)
    writeMode match {
      case SimpleMode =>
        // batch records
        val recordsOpt = getBatchRecords(context)
        // write records
        recordsOpt match {
          case Some(records) =>
            context.getSinks(timestamp).foreach { sink =>
              try {
                sink.sinkBatchRecords(records, Option(name))
              } catch {
                case e: Throwable => error(s"sink records error: ${e.getMessage}", e)
              }
            }
          case _ =>
        }
      case TimestampMode =>
        // streaming records
        val (recordsOpt, emptyTimestamps) = getStreamingRecords(context)
        // write records
        recordsOpt.foreach { records =>
          records.foreach { pair =>
            val (t, strRecords) = pair
            context.getSinks(t).foreach { sink =>
              try {
                sink.sinkRecords(strRecords, name)
              } catch {
                case e: Throwable => error(s"sink records error: ${e.getMessage}", e)
              }
            }
          }
        }
        emptyTimestamps.foreach { t =>
          context.getSinks(t).foreach { sink =>
            try {
              sink.sinkRecords(Nil, name)
            } catch {
              case e: Throwable => error(s"sink records error: ${e.getMessage}", e)
            }
          }
        }
    }
    true
  }