in mantis-publish/mantis-publish-core/src/main/java/io/mantisrx/publish/EventProcessor.java [70:127]
public Event process(String stream, Event event) {
LOG.debug("Entering EventProcessor#onNext: {}", event);
boolean isEnabled = config.isMREClientEnabled();
if (!isEnabled) {
LOG.debug("Mantis Realtime Events Publisher is disabled."
+ "Set the property defined in your MrePublishConfiguration object to true to enable.");
return null;
}
// make a deep copy before proceeding to avoid altering the user provided map.
if (config.isDeepCopyEventMapEnabled()) {
event = new Event(event.getMap(), true);
}
maskSensitiveFields(event);
if (config.isTeeEnabled()) {
tee.tee(config.teeStreamName(), event);
}
List<Subscription> matchingSubscriptions = new ArrayList<>();
if (streamManager.hasSubscriptions(stream)) {
final Set<Subscription> streamSubscriptions = streamManager.getStreamSubscriptions(stream);
for (Subscription s : streamSubscriptions) {
try {
if (s.matches(event)) {
matchingSubscriptions.add(s);
}
} catch (Exception e) {
streamManager.getStreamMetrics(stream)
.ifPresent(m -> m.getMantisQueryFailedCounter().increment());
// Send errors only for a sample of events.
int rndNo = randomGenerator.nextInt(1_000_000);
if (rndNo < 10) {
sendError(s, e.getMessage());
}
}
}
}
Event projectedEvent = null;
if (!matchingSubscriptions.isEmpty()) {
projectedEvent = projectSupersetEvent(stream, matchingSubscriptions, event);
} else {
if (LOG.isTraceEnabled()) {
LOG.trace("no matching subscriptions");
}
}
if (LOG.isDebugEnabled()) {
LOG.debug("Exit EventProcessor#onNext: {}", event);
}
return projectedEvent;
}