public void activate()

in rcomp-examples/kafka-appender/src/main/java/net/lr/reactive/components/appender/kafka/KafkaAppender.java [65:82]


    public void activate(ComponentContext context) {
        Dictionary<String, Object> config = context.getProperties();
        this.topic = (String)config.get("topic");
        if (topic == null) {
            throw new IllegalArgumentException("Config property topic must be present.");
        }
        String eventTopics = (String)config.get(EventConstants.EVENT_TOPIC);
        Publisher<Event> fromEventAdmin = eventAdmin.from(eventTopics, Event.class);
        toKafka = kafka.to(topic, ProducerRecord.class);
        org.slf4j.MDC.put("inLogAppender", "true");
        Flux.from(fromEventAdmin)
            .doOnEach(event -> org.slf4j.MDC.put("inLogAppender", "true"))
            //.log()
            .map(event->toRecord(event))
            .doOnError(ex -> LOGGER.error(ex.getMessage(), ex))
            .subscribe(toKafka);
        LOGGER.info("Kafka appender started. Listening on topic " + topic);
    }