in wayang-api/wayang-api-rest/src/main/java/org/apache/wayang/api/rest/server/spring/general/WayangController.java [220:293]
public OperatorBase createOperatorByType(OperatorProto operator){
System.out.println("Typo: " + operator.getType());
switch(operator.getType()){
case "source":
try {
String source_path = operator.getPath();
URL url = new File(source_path).toURI().toURL();
return new TextFileSource(url.toString());
} catch (MalformedURLException e) {
e.printStackTrace();
}
break;
case "sink":
try {
String sink_path = operator.getPath();
URL url = new File(sink_path).toURI().toURL();
return new TextFileSink<String>(
url.toString(),
String.class
);
} catch (MalformedURLException e) {
e.printStackTrace();
}
break;
case "reduce_by_key":
try {
/* Function to be applied in Python workers */
ByteString function = operator.getUdf();
/* Has dimension or positions that compose GroupKey */
Map<String, String> parameters = operator.getParametersMap();
PyWayangReduceByOperator<String, String> op = new PyWayangReduceByOperator(
operator.getParametersMap(),
operator.getUdf() ,
String.class,
String.class,
false
);
String sink_path = operator.getPath();
URL url = new File(sink_path).toURI().toURL();
return new TextFileSink<String>(
url.toString(),
String.class
);
} catch (MalformedURLException e) {
e.printStackTrace();
}
break;
case "map_partition":
return new MapPartitionsOperator<>(
new MapPartitionsDescriptor<String, String>(
new WrappedPythonFunction<String, String>(
l -> l,
operator.getUdf()
),
String.class,
String.class
)
);
case "union":
return new UnionAllOperator<String>(
String.class
);
}
throw new WayangException("Operator Type not supported");
}