in measure/src/main/scala/org/apache/griffin/measure/sink/SinkFactory.scala [58:75]
private def getSink(timeStamp: Long, sinkParam: SinkParam, block: Boolean): Option[Sink] = {
val config = sinkParam.getConfig
val sinkType = sinkParam.getType
val sinkTry = sinkType match {
case Console => Try(ConsoleSink(config, jobName, timeStamp))
case Hdfs => Try(HdfsSink(config, jobName, timeStamp))
case ElasticSearch => Try(ElasticSearchSink(config, jobName, timeStamp, block))
case MongoDB => Try(MongoSink(config, jobName, timeStamp, block))
case Custom => Try(getCustomSink(config, timeStamp, block))
case _ => throw new Exception(s"sink type $sinkType is not supported!")
}
sinkTry match {
case Success(sink) if sink.validate() => Some(sink)
case Failure(ex) =>
error("Failed to get sink", ex)
None
}
}