in wayang-api/wayang-api-json/src/main/scala/operatorfromdrawflow/OperatorFromDrawflowConverter.scala [76:123]
private def toOperatorFromJsonBody(operatorFromDrawflow: OperatorFromDrawflow): OperatorFromJson = {
val (id, cat, operatorName, input, output, executionPlatform) = extractInfo(operatorFromDrawflow)
operatorName match {
// input
case "iBinary" => InputCollectionFromJson(id, input, output, cat, OperatorFromJson.OperatorNames.InputCollection, InputCollectionFromJson.Data(operatorFromDrawflow.data("collectionGeneratorFunction").asInstanceOf[String]), executionPlatform)
case "iTextFile" => TextFileInputFromJson(id, input, output, cat, OperatorFromJson.OperatorNames.TextFileInput, TextFileInputFromJson.Data(operatorFromDrawflow.data("inputFileURL").asInstanceOf[String]), executionPlatform)
case "iParquet" => ParquetInputFromJson(id, input, output, cat, OperatorFromJson.OperatorNames.ParquetInput, ParquetInputFromJson.Data(operatorFromDrawflow.data("inputFileURL").asInstanceOf[String], operatorFromDrawflow.data("projection").asInstanceOf[Array[String]]), executionPlatform)
case "iCsvFile" => TableInputFromJson(id, input, output, cat, OperatorFromJson.OperatorNames.Table, TableInputFromJson.Data(operatorFromDrawflow.data("tableName").asInstanceOf[String], operatorFromDrawflow.data("columnNames").asInstanceOf[String].split(",").map(s => s.trim()).toList), executionPlatform)
case "iJdbc" => JDBCRemoteInputFromJson(id, input, output, cat, OperatorFromJson.OperatorNames.JDBCRemoteInput,
JDBCRemoteInputFromJson.Data(
operatorFromDrawflow.data("inputJDBCconnection").asInstanceOf[String],
operatorFromDrawflow.data("usernameJDBCconnection").asInstanceOf[String],
operatorFromDrawflow.data("passwordJDBCconnection").asInstanceOf[String],
operatorFromDrawflow.data("tableJDBCconnection").asInstanceOf[String],
operatorFromDrawflow.data("columnNamesJDBCconnection").asInstanceOf[String].split(",").map(s => s.trim()).toList
))
// unary
case "filter" => FilterOperatorFromJson(id, input, output, cat, OperatorFromJson.OperatorNames.Filter, FilterOperatorFromJson.Data(operatorFromDrawflow.data("booleanFunction").asInstanceOf[String], None, None), executionPlatform)
case "reduceBy" => ReduceByOperatorFromJson(id, input, output, cat, OperatorFromJson.OperatorNames.ReduceBy, ReduceByOperatorFromJson.Data(operatorFromDrawflow.data("keyFunction").asInstanceOf[String], operatorFromDrawflow.data("reduceFunction").asInstanceOf[String]), executionPlatform)
case "count" => CountOperatorFromJson(id, input, output, cat, OperatorFromJson.OperatorNames.Count, executionPlatform)
case "groupBy" => GroupByOperatorFromJson(id, input, output, cat, OperatorFromJson.OperatorNames.GroupBy, GroupByOperatorFromJson.Data(operatorFromDrawflow.data("keyFunction").asInstanceOf[String]), executionPlatform)
case "sort" => SortOperatorFromJson(id, input, output, cat, OperatorFromJson.OperatorNames.Sort, SortOperatorFromJson.Data(operatorFromDrawflow.data("keyFunction").asInstanceOf[String], None, None), executionPlatform)
case "flatMap" => FlatMapOperatorFromJson(id, input, output, cat, OperatorFromJson.OperatorNames.FlatMap, FlatMapOperatorFromJson.Data(operatorFromDrawflow.data("flatMapFunction").asInstanceOf[String], None, None), executionPlatform)
case "map" => MapOperatorFromJson(id, input, output, cat, OperatorFromJson.OperatorNames.Map, MapOperatorFromJson.Data(operatorFromDrawflow.data("mapFunction").asInstanceOf[String], None, None), executionPlatform)
case "reduce" => ReduceOperatorFromJson(id, input, output, cat, OperatorFromJson.OperatorNames.Reduce, ReduceOperatorFromJson.Data(operatorFromDrawflow.data("reduceFunction").asInstanceOf[String], None, None), executionPlatform)
case "distinct" => DistinctOperatorFromJson(id, input, output, cat, OperatorFromJson.OperatorNames.Distinct, executionPlatform)
case "mapPartitions" => MapPartitionsOperatorFromJson(id, input, output, cat, OperatorFromJson.OperatorNames.MapPartitions, MapPartitionsOperatorFromJson.Data(operatorFromDrawflow.data("mapPartitionsFunction").asInstanceOf[String], None, None), executionPlatform)
case "sample" => SampleOperatorFromJson(id, input, output, cat, OperatorFromJson.OperatorNames.Sample, SampleOperatorFromJson.Data(operatorFromDrawflow.data("sampleSize").asInstanceOf[String].toInt), executionPlatform)
// binary
case "union" => UnionOperatorFromJson(id, input, output, cat, OperatorFromJson.OperatorNames.Union, executionPlatform)
case "coGroup" => CoGroupOperatorFromJson(id, input, output, cat, OperatorFromJson.OperatorNames.CoGroup, CoGroupOperatorFromJson.Data(operatorFromDrawflow.data("groupKey1").asInstanceOf[String], operatorFromDrawflow.data("groupKey2").asInstanceOf[String]), executionPlatform)
case "cartesian" => CartesianOperatorFromJson(id, input, output, cat, OperatorFromJson.OperatorNames.Cartesian, executionPlatform)
case "join" => JoinOperatorFromJson(id, input, output, cat, OperatorFromJson.OperatorNames.Join, JoinOperatorFromJson.Data(operatorFromDrawflow.data("joinKey1").asInstanceOf[String], operatorFromDrawflow.data("joinKey2").asInstanceOf[String], None, None), executionPlatform)
case "intersect" => IntersectOperatorFromJson(id, input, output, cat, OperatorFromJson.OperatorNames.Intersect, executionPlatform)
// loop
case "foreach" => ForeachOperatorFromJson(id, input, output, cat, OperatorFromJson.OperatorNames.Foreach, ForeachOperatorFromJson.Data(operatorFromDrawflow.data("Body").asInstanceOf[String]), executionPlatform)
case "while" => DoWhileOperatorFromJson(id, input, output, cat, OperatorFromJson.OperatorNames.TextFileOutput, DoWhileOperatorFromJson.Data(operatorFromDrawflow.data("criterionFunction").asInstanceOf[String], operatorFromDrawflow.data("Body").asInstanceOf[String]), executionPlatform)
case "repeat" => RepeatOperatorFromJson(id, input, output, cat, OperatorFromJson.OperatorNames.Repeat, RepeatOperatorFromJson.Data(operatorFromDrawflow.data("numberOfIterations").asInstanceOf[String].toInt, operatorFromDrawflow.data("Body").asInstanceOf[String]), executionPlatform)
// output
case "oTextFile" => TextFileOutputFromJson(id, input, output, cat, OperatorFromJson.OperatorNames.TextFileOutput, TextFileOutputFromJson.Data(operatorFromDrawflow.data("outputFileURL").asInstanceOf[String]), executionPlatform)
}
}