public async filter()

in source/packages/services/events-processor/src/filter/filter.service.ts [37:150]


    public async filter(events:CommonEvent[]): Promise<void> {
        logger.debug(`filter.service filter: in: model:${JSON.stringify(events)}`);

        ow(events, ow.array.nonEmpty);

        // for performance, cache for the duration of the method call...
        const subscriptionMap: { [key:string]:SubscriptionItem[]} = {};
        const ruleMap: { [key:string]:Rule} = {};

        const engine = new Engine();

        const alerts:AlertItem[]=[];
        const changedSubAlerts:{[key:string]:SubscriptionItem}= {};

        for(const ev of events) {

            // perform lookup to see if any subscriptions are configured for the event source/principal/principalValue (cached for the duration of the method call)
            const subscriptions = await this.listSubscriptionsForEvent(ev, subscriptionMap);

            // if we have subscriptions, lets evaluate them against the datasource
            if (subscriptions!==undefined) {

                for(const sub of subscriptions) {

                    // initializing an empty cache
                    const templateCache:TemplateCache = {};

                    // initialize the rule (cached for the duration of the method call)
                    let rule = ruleMap[sub.event.id];
                    if (rule===undefined) {
                        rule = new Rule({
                            conditions: sub.event.conditions as TopLevelCondition,
                            event: {
                                type: sub.event.name
                            }
                        });
                        ruleMap[sub.event.id] = rule;
                    }

                    engine.addRule(rule);

                    // add all root elements with '__' prefix, except attributes
                    Object.keys(ev)
                        .filter(key=> key!=='attributes')
                        .forEach(key=> engine.addFact( '__' + key, ev[key]));

                    // add all the known facts
                    Object.keys(ev.attributes)
                        .filter(key=> ev.attributes[key]!==undefined && ev.attributes[key]!==null)
                        .forEach(key=> engine.addFact(key, ev.attributes[key]));

                    // evaluate the rules
                    let results:EngineResult;
                    try {
                        results = await engine.run();
                    } catch (err) {
                        // silently ignore, as an incoming message may not contain the facts we're interested in
                    }
                    logger.debug(`filter.service filter: results:${JSON.stringify(results)}`);

                    if (results?.events?.length>0 ) {
                        if (!sub.alerted || sub.event.disableAlertThreshold) {
                            // a new alert...
                            const attributes = await this.getTemplatePropertiesData(sub, ev, templateCache);
                            alerts.push(this.buildAlert(sub, attributes));
                            if (!sub.alerted) {
                                changedSubAlerts[sub.id]= {
                                    id: sub.id,
                                    eventSource: {
                                        id: sub.eventSource.id,
                                        principal: sub.eventSource.principal
                                    },
                                    principalValue: sub.principalValue,
                                    alerted: true
                                };
                            }
                            sub.alerted=true;
                        }
                    } else if (results?.events?.length===0 && sub.alerted) {
                        // an alert that needs resetting...
                        sub.alerted=false;
                        changedSubAlerts[sub.id]= {
                            id: sub.id,
                            eventSource: {
                                id: sub.eventSource.id,
                                principal: sub.eventSource.principal
                            },
                            principalValue: sub.principalValue,
                            alerted: false
                        };
                    }

                    // clear the engine state ready for the next run
                    Object.keys(ev.attributes).forEach(key=> engine.removeFact(key));
                    engine.removeRule(rule);
                }

                // save the subscription in case its state changed
                const mapKey = this.subscriptionMapKey(ev);
                subscriptionMap[mapKey]=subscriptions;
            }
        }

        logger.debug(`filter.service filter: alerts:${JSON.stringify(alerts)}`);
        if (alerts.length>0) {
            await this.alertDao.create(alerts);
        }
        if (Object.keys(changedSubAlerts).length>0) {
            await this.alertDao.updateChangedSubAlertStatus(changedSubAlerts);
        }

        logger.debug(`filter.service filter: exit:`);

    }