public Event process()

in mantis-publish/mantis-publish-core/src/main/java/io/mantisrx/publish/EventProcessor.java [70:127]


    public Event process(String stream, Event event) {
        LOG.debug("Entering EventProcessor#onNext: {}", event);

        boolean isEnabled = config.isMREClientEnabled();
        if (!isEnabled) {
            LOG.debug("Mantis Realtime Events Publisher is disabled."
                    + "Set the property defined in your MrePublishConfiguration object to true to enable.");
            return null;
        }

        // make a deep copy before proceeding to avoid altering the user provided map.
        if (config.isDeepCopyEventMapEnabled()) {
            event = new Event(event.getMap(), true);
        }

        maskSensitiveFields(event);

        if (config.isTeeEnabled()) {
            tee.tee(config.teeStreamName(), event);
        }

        List<Subscription> matchingSubscriptions = new ArrayList<>();
        if (streamManager.hasSubscriptions(stream)) {
            final Set<Subscription> streamSubscriptions = streamManager.getStreamSubscriptions(stream);

            for (Subscription s : streamSubscriptions) {
                try {
                    if (s.matches(event)) {
                        matchingSubscriptions.add(s);
                    }
                } catch (Exception e) {
                    streamManager.getStreamMetrics(stream)
                            .ifPresent(m -> m.getMantisQueryFailedCounter().increment());

                    // Send errors only for a sample of events.
                    int rndNo = randomGenerator.nextInt(1_000_000);
                    if (rndNo < 10) {
                        sendError(s, e.getMessage());
                    }
                }
            }
        }

        Event projectedEvent = null;
        if (!matchingSubscriptions.isEmpty()) {
            projectedEvent = projectSupersetEvent(stream, matchingSubscriptions, event);
        } else {
            if (LOG.isTraceEnabled()) {
                LOG.trace("no matching subscriptions");
            }
        }

        if (LOG.isDebugEnabled()) {
            LOG.debug("Exit EventProcessor#onNext: {}", event);
        }

        return projectedEvent;
    }