public OperatorBase createOperatorByType()

in wayang-api/wayang-api-rest/src/main/java/org/apache/wayang/api/rest/server/spring/general/WayangController.java [220:293]


    public OperatorBase createOperatorByType(OperatorProto operator){

        System.out.println("Typo: " + operator.getType());
        switch(operator.getType()){
            case "source":
                try {
                    String source_path = operator.getPath();
                    URL url = new File(source_path).toURI().toURL();
                    return new TextFileSource(url.toString());
                } catch (MalformedURLException e) {
                    e.printStackTrace();
                }
                break;
            case "sink":
                try {
                    String sink_path = operator.getPath();
                    URL url = new File(sink_path).toURI().toURL();
                    return new TextFileSink<String>(
                            url.toString(),
                            String.class
                    );

                } catch (MalformedURLException e) {
                    e.printStackTrace();
                }
                break;
            case "reduce_by_key":
                try {
                    /* Function to be applied in Python workers */
                    ByteString function = operator.getUdf();

                    /* Has dimension or positions that compose GroupKey */
                    Map<String, String> parameters = operator.getParametersMap();

                    PyWayangReduceByOperator<String, String> op = new PyWayangReduceByOperator(
                        operator.getParametersMap(),
                        operator.getUdf() ,
                        String.class,
                        String.class,
                            false
                    );

                    String sink_path = operator.getPath();
                    URL url = new File(sink_path).toURI().toURL();
                    return new TextFileSink<String>(
                            url.toString(),
                            String.class
                    );

                } catch (MalformedURLException e) {
                    e.printStackTrace();
                }
                break;
            case "map_partition":
                return new MapPartitionsOperator<>(
                    new MapPartitionsDescriptor<String, String>(
                        new WrappedPythonFunction<String, String>(
                            l -> l,
                            operator.getUdf()
                        ),
                        String.class,
                        String.class
                    )
                );

            case "union":
                return new UnionAllOperator<String>(
                        String.class
                );

        }

        throw new WayangException("Operator Type not supported");
    }