mantis-examples/mantis-examples-synthetic-sourcejob/src/main/java/io/mantisrx/sourcejob/synthetic/sink/TaggedDataSourceSink.java [40:68]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
public class TaggedDataSourceSink implements Sink<TaggedData> {

    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
    private final ServerSentEventsSink<TaggedData> sink;
    private Subscription subscription;

    static class NoOpProcessor implements Func2<Map<String, List<String>>, Context, Void> {

        @Override
        public Void call(Map<String, List<String>> t1, Context t2) {
            return null;
        }
    }

    public TaggedDataSourceSink() {
        this(new NoOpProcessor(), new NoOpProcessor());
    }

    public TaggedDataSourceSink(Func2<Map<String, List<String>>, Context, Void> preProcessor,
                                Func2<Map<String, List<String>>, Context, Void> postProcessor) {
        this.sink = new ServerSentEventsSink.Builder<TaggedData>()
            .withEncoder((data) -> {
                try {
                    return OBJECT_MAPPER.writeValueAsString(data.getPayload());
                } catch (JsonProcessingException e) {
                    e.printStackTrace();
                    return "{\"error\":" + e.getMessage() + "}";
                }
            })
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



mantis-source-jobs/mantis-source-job-kafka/src/main/java/io/mantisrx/sourcejob/kafka/sink/TaggedDataSourceSink.java [34:62]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
public class TaggedDataSourceSink implements Sink<TaggedData> {

    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
    private final ServerSentEventsSink<TaggedData> sink;
    private Subscription subscription;

    static class NoOpProcessor implements Func2<Map<String, List<String>>, Context, Void> {

        @Override
        public Void call(Map<String, List<String>> t1, Context t2) {
            return null;
        }
    }

    public TaggedDataSourceSink() {
        this(new NoOpProcessor(), new NoOpProcessor());
    }

    public TaggedDataSourceSink(Func2<Map<String, List<String>>, Context, Void> preProcessor,
                                Func2<Map<String, List<String>>, Context, Void> postProcessor) {
        this.sink = new ServerSentEventsSink.Builder<TaggedData>()
            .withEncoder((data) -> {
                try {
                    return OBJECT_MAPPER.writeValueAsString(data.getPayload());
                } catch (JsonProcessingException e) {
                    e.printStackTrace();
                    return "{\"error\":" + e.getMessage() + "}";
                }
            })
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



