private def getSink()

in measure/src/main/scala/org/apache/griffin/measure/sink/SinkFactory.scala [58:75]


  private def getSink(timeStamp: Long, sinkParam: SinkParam, block: Boolean): Option[Sink] = {
    val config = sinkParam.getConfig
    val sinkType = sinkParam.getType
    val sinkTry = sinkType match {
      case Console => Try(ConsoleSink(config, jobName, timeStamp))
      case Hdfs => Try(HdfsSink(config, jobName, timeStamp))
      case ElasticSearch => Try(ElasticSearchSink(config, jobName, timeStamp, block))
      case MongoDB => Try(MongoSink(config, jobName, timeStamp, block))
      case Custom => Try(getCustomSink(config, timeStamp, block))
      case _ => throw new Exception(s"sink type $sinkType is not supported!")
    }
    sinkTry match {
      case Success(sink) if sink.validate() => Some(sink)
      case Failure(ex) =>
        error("Failed to get sink", ex)
        None
    }
  }