mantis-source-jobs/mantis-source-job-kafka/src/main/java/io/mantisrx/sourcejob/kafka/sink/TaggedDataSourceSink.java [52:67]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
    public TaggedDataSourceSink(Func2<Map<String, List<String>>, Context, Void> preProcessor,
                                Func2<Map<String, List<String>>, Context, Void> postProcessor) {
        this.sink = new ServerSentEventsSink.Builder<TaggedData>()
            .withEncoder((data) -> {
                try {
                    return OBJECT_MAPPER.writeValueAsString(data.getPayload());
                } catch (JsonProcessingException e) {
                    e.printStackTrace();
                    return "{\"error\":" + e.getMessage() + "}";
                }
            })
            .withPredicate(new Predicate<TaggedData>("description", new TaggedEventFilter()))
            .withRequestPreprocessor(preProcessor)
            .withRequestPostprocessor(postProcessor)
            .build();
    }
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



mantis-examples/mantis-examples-synthetic-sourcejob/src/main/java/io/mantisrx/sourcejob/synthetic/sink/TaggedDataSourceSink.java [58:73]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
    public TaggedDataSourceSink(Func2<Map<String, List<String>>, Context, Void> preProcessor,
                                Func2<Map<String, List<String>>, Context, Void> postProcessor) {
        this.sink = new ServerSentEventsSink.Builder<TaggedData>()
            .withEncoder((data) -> {
                try {
                    return OBJECT_MAPPER.writeValueAsString(data.getPayload());
                } catch (JsonProcessingException e) {
                    e.printStackTrace();
                    return "{\"error\":" + e.getMessage() + "}";
                }
            })
            .withPredicate(new Predicate<>("description", new TaggedEventFilter()))
            .withRequestPreprocessor(preProcessor)
            .withRequestPostprocessor(postProcessor)
            .build();
    }
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



