in source/packages/services/events-processor/src/filter/filter.service.ts [36:154]
public async filter(events: CommonEvent[]): Promise<void> {
logger.debug(`filter.service filter: in: model:${JSON.stringify(events)}`);
ow(events, ow.array.nonEmpty);
// for performance, cache for the duration of the method call...
const subscriptionMap: { [key: string]: SubscriptionItem[] } = {};
const ruleMap: { [key: string]: Rule } = {};
const engine = new Engine();
const alerts: AlertItem[] = [];
const changedSubAlerts: { [key: string]: SubscriptionItem } = {};
for (const ev of events) {
// perform lookup to see if any subscriptions are configured for the event source/principal/principalValue (cached for the duration of the method call)
const subscriptions = (
(await this.listSubscriptionsForEvent(ev, subscriptionMap)) || []
).filter((o) => o.enabled);
// if we have subscriptions, lets evaluate them against the datasource
if (subscriptions !== undefined) {
for (const sub of subscriptions) {
// initializing an empty cache
const templateCache: TemplateCache = {};
// initialize the rule (cached for the duration of the method call)
let rule = ruleMap[sub.event.id];
if (rule === undefined) {
rule = new Rule({
conditions: sub.event.conditions as TopLevelCondition,
event: {
type: sub.event.name,
},
});
ruleMap[sub.event.id] = rule;
}
engine.addRule(rule);
// add all root elements with '__' prefix, except attributes
Object.keys(ev)
.filter((key) => key !== 'attributes')
.forEach((key) => engine.addFact('__' + key, ev[key]));
// add all the known facts
Object.keys(ev.attributes)
.filter(
(key) =>
ev.attributes[key] !== undefined && ev.attributes[key] !== null
)
.forEach((key) => engine.addFact(key, ev.attributes[key]));
// evaluate the rules
let results: EngineResult;
try {
results = await engine.run();
} catch (err) {
// silently ignore, as an incoming message may not contain the facts we're interested in
}
logger.debug(`filter.service filter: results:${JSON.stringify(results)}`);
if (results?.events?.length > 0) {
if (!sub.alerted || sub.event.disableAlertThreshold) {
// a new alert...
const attributes = await this.getTemplatePropertiesData(
sub,
ev,
templateCache
);
alerts.push(this.buildAlert(sub, attributes));
if (!sub.alerted) {
changedSubAlerts[sub.id] = {
id: sub.id,
eventSource: {
id: sub.eventSource.id,
principal: sub.eventSource.principal,
},
principalValue: sub.principalValue,
alerted: true,
};
}
sub.alerted = true;
}
} else if (results?.events?.length === 0 && sub.alerted) {
// an alert that needs resetting...
sub.alerted = false;
changedSubAlerts[sub.id] = {
id: sub.id,
eventSource: {
id: sub.eventSource.id,
principal: sub.eventSource.principal,
},
principalValue: sub.principalValue,
alerted: false,
};
}
// clear the engine state ready for the next run
Object.keys(ev.attributes).forEach((key) => engine.removeFact(key));
engine.removeRule(rule);
}
// save the subscription in case its state changed
const mapKey = this.subscriptionMapKey(ev);
subscriptionMap[mapKey] = subscriptions;
}
}
logger.debug(`filter.service filter: alerts:${JSON.stringify(alerts)}`);
if (alerts.length > 0) {
await this.alertDao.create(alerts);
}
if (Object.keys(changedSubAlerts).length > 0) {
await this.alertDao.updateChangedSubAlertStatus(changedSubAlerts);
}
logger.debug(`filter.service filter: exit:`);
}