in wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/serialization/customserializers/MultiContextDeserializer.scala [44:96]
override def deserialize(jp: JsonParser, ctxt: DeserializationContext): MultiContext = {
// Deserialize each field of MultiContext separately
val node: JsonNode = jp.getCodec.readTree(jp)
val configurationParser: JsonParser = node.get("configuration").traverse(jp.getCodec)
val configuration: Configuration = mapper.readValue(configurationParser, classOf[Configuration])
val sinkParser: JsonParser = node.get("sink").traverse(jp.getCodec)
val sink: Option[MultiContext.UnarySink] = mapper.readValue(sinkParser, new TypeReference[Option[MultiContext.UnarySink]]() {})
val pluginsParser: JsonParser = node.get("plugins").traverse(jp.getCodec)
val plugins: List[String] = mapper.readValue(pluginsParser, new TypeReference[List[String]]() {})
//
// Create the whole deserialized multi context
//
// 1. Add configuration
val multiContext = new MultiContext(configuration)
// 2. Add sink
sink match {
case Some(MultiContext.TextFileSink(url)) =>
println(s"It's a TextFileSink with url: $url")
multiContext.withTextFileSink(url)
case Some(MultiContext.ObjectFileSink(url)) =>
println(s"It's an ObjectFileSink with url: $url")
multiContext.withObjectFileSink(url)
case None =>
println("No sink defined")
case _ =>
println("Unknown sink type")
}
// TODO: Add all plugins
// 3. Add plugins
val javaPluginName = Java.basicPlugin.getClass.getName
val sparkPluginName = Spark.basicPlugin.getClass.getName
val postgresPluginName = Postgres.plugin().getClass.getName
// val flinkPluginName = Flink.basicPlugin().getClass.getName
val sqlite3PluginName = Sqlite3.plugin().getClass.getName
plugins.foreach {
case pluginName if pluginName == javaPluginName => multiContext.register(Java.basicPlugin())
case pluginName if pluginName == sparkPluginName => multiContext.register(Spark.basicPlugin())
case pluginName if pluginName == postgresPluginName => multiContext.register(Postgres.plugin())
// case pluginName if pluginName == flinkPluginName => multiContext.register(Flink.basicPlugin())
case pluginName if pluginName == sqlite3PluginName => multiContext.register(Sqlite3.plugin())
case _ => println("Unknown plugin detected")
}
multiContext
}