in source/packages/services/events-processor/src/filter/filter.service.ts [37:150]
public async filter(events:CommonEvent[]): Promise<void> {
logger.debug(`filter.service filter: in: model:${JSON.stringify(events)}`);
ow(events, ow.array.nonEmpty);
// for performance, cache for the duration of the method call...
const subscriptionMap: { [key:string]:SubscriptionItem[]} = {};
const ruleMap: { [key:string]:Rule} = {};
const engine = new Engine();
const alerts:AlertItem[]=[];
const changedSubAlerts:{[key:string]:SubscriptionItem}= {};
for(const ev of events) {
// perform lookup to see if any subscriptions are configured for the event source/principal/principalValue (cached for the duration of the method call)
const subscriptions = await this.listSubscriptionsForEvent(ev, subscriptionMap);
// if we have subscriptions, lets evaluate them against the datasource
if (subscriptions!==undefined) {
for(const sub of subscriptions) {
// initializing an empty cache
const templateCache:TemplateCache = {};
// initialize the rule (cached for the duration of the method call)
let rule = ruleMap[sub.event.id];
if (rule===undefined) {
rule = new Rule({
conditions: sub.event.conditions as TopLevelCondition,
event: {
type: sub.event.name
}
});
ruleMap[sub.event.id] = rule;
}
engine.addRule(rule);
// add all root elements with '__' prefix, except attributes
Object.keys(ev)
.filter(key=> key!=='attributes')
.forEach(key=> engine.addFact( '__' + key, ev[key]));
// add all the known facts
Object.keys(ev.attributes)
.filter(key=> ev.attributes[key]!==undefined && ev.attributes[key]!==null)
.forEach(key=> engine.addFact(key, ev.attributes[key]));
// evaluate the rules
let results:EngineResult;
try {
results = await engine.run();
} catch (err) {
// silently ignore, as an incoming message may not contain the facts we're interested in
}
logger.debug(`filter.service filter: results:${JSON.stringify(results)}`);
if (results?.events?.length>0 ) {
if (!sub.alerted || sub.event.disableAlertThreshold) {
// a new alert...
const attributes = await this.getTemplatePropertiesData(sub, ev, templateCache);
alerts.push(this.buildAlert(sub, attributes));
if (!sub.alerted) {
changedSubAlerts[sub.id]= {
id: sub.id,
eventSource: {
id: sub.eventSource.id,
principal: sub.eventSource.principal
},
principalValue: sub.principalValue,
alerted: true
};
}
sub.alerted=true;
}
} else if (results?.events?.length===0 && sub.alerted) {
// an alert that needs resetting...
sub.alerted=false;
changedSubAlerts[sub.id]= {
id: sub.id,
eventSource: {
id: sub.eventSource.id,
principal: sub.eventSource.principal
},
principalValue: sub.principalValue,
alerted: false
};
}
// clear the engine state ready for the next run
Object.keys(ev.attributes).forEach(key=> engine.removeFact(key));
engine.removeRule(rule);
}
// save the subscription in case its state changed
const mapKey = this.subscriptionMapKey(ev);
subscriptionMap[mapKey]=subscriptions;
}
}
logger.debug(`filter.service filter: alerts:${JSON.stringify(alerts)}`);
if (alerts.length>0) {
await this.alertDao.create(alerts);
}
if (Object.keys(changedSubAlerts).length>0) {
await this.alertDao.updateChangedSubAlertStatus(changedSubAlerts);
}
logger.debug(`filter.service filter: exit:`);
}