override def deserialize()

in wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/serialization/customserializers/MultiContextDeserializer.scala [44:96]


  override def deserialize(jp: JsonParser, ctxt: DeserializationContext): MultiContext = {

    // Deserialize each field of MultiContext separately
    val node: JsonNode = jp.getCodec.readTree(jp)

    val configurationParser: JsonParser = node.get("configuration").traverse(jp.getCodec)
    val configuration: Configuration = mapper.readValue(configurationParser, classOf[Configuration])

    val sinkParser: JsonParser = node.get("sink").traverse(jp.getCodec)
    val sink: Option[MultiContext.UnarySink] = mapper.readValue(sinkParser, new TypeReference[Option[MultiContext.UnarySink]]() {})

    val pluginsParser: JsonParser = node.get("plugins").traverse(jp.getCodec)
    val plugins: List[String] = mapper.readValue(pluginsParser, new TypeReference[List[String]]() {})

    //
    // Create the whole deserialized multi context
    //
    // 1. Add configuration
    val multiContext = new MultiContext(configuration)

    // 2. Add sink
    sink match {
      case Some(MultiContext.TextFileSink(url)) =>
        println(s"It's a TextFileSink with url: $url")
        multiContext.withTextFileSink(url)
      case Some(MultiContext.ObjectFileSink(url)) =>
        println(s"It's an ObjectFileSink with url: $url")
        multiContext.withObjectFileSink(url)
      case None =>
        println("No sink defined")
      case _ =>
        println("Unknown sink type")
    }

    // TODO: Add all plugins
    // 3. Add plugins
    val javaPluginName = Java.basicPlugin.getClass.getName
    val sparkPluginName = Spark.basicPlugin.getClass.getName
    val postgresPluginName = Postgres.plugin().getClass.getName
    // val flinkPluginName = Flink.basicPlugin().getClass.getName
    val sqlite3PluginName = Sqlite3.plugin().getClass.getName

    plugins.foreach {
      case pluginName if pluginName == javaPluginName => multiContext.register(Java.basicPlugin())
      case pluginName if pluginName == sparkPluginName => multiContext.register(Spark.basicPlugin())
      case pluginName if pluginName == postgresPluginName => multiContext.register(Postgres.plugin())
      // case pluginName if pluginName == flinkPluginName => multiContext.register(Flink.basicPlugin())
      case pluginName if pluginName == sqlite3PluginName => multiContext.register(Sqlite3.plugin())
      case _ => println("Unknown plugin detected")
    }

    multiContext
  }