public async filter()

in source/packages/services/events-processor/src/filter/filter.service.ts [36:154]


    public async filter(events: CommonEvent[]): Promise<void> {
        logger.debug(`filter.service filter: in: model:${JSON.stringify(events)}`);

        ow(events, ow.array.nonEmpty);

        // for performance, cache for the duration of the method call...
        const subscriptionMap: { [key: string]: SubscriptionItem[] } = {};
        const ruleMap: { [key: string]: Rule } = {};

        const engine = new Engine();

        const alerts: AlertItem[] = [];
        const changedSubAlerts: { [key: string]: SubscriptionItem } = {};

        for (const ev of events) {
            // perform lookup to see if any subscriptions are configured for the event source/principal/principalValue (cached for the duration of the method call)
            const subscriptions = (
                (await this.listSubscriptionsForEvent(ev, subscriptionMap)) || []
            ).filter((o) => o.enabled);

            // if we have subscriptions, lets evaluate them against the datasource
            if (subscriptions !== undefined) {
                for (const sub of subscriptions) {
                    // initializing an empty cache
                    const templateCache: TemplateCache = {};

                    // initialize the rule (cached for the duration of the method call)
                    let rule = ruleMap[sub.event.id];
                    if (rule === undefined) {
                        rule = new Rule({
                            conditions: sub.event.conditions as TopLevelCondition,
                            event: {
                                type: sub.event.name,
                            },
                        });
                        ruleMap[sub.event.id] = rule;
                    }

                    engine.addRule(rule);

                    // add all root elements with '__' prefix, except attributes
                    Object.keys(ev)
                        .filter((key) => key !== 'attributes')
                        .forEach((key) => engine.addFact('__' + key, ev[key]));

                    // add all the known facts
                    Object.keys(ev.attributes)
                        .filter(
                            (key) =>
                                ev.attributes[key] !== undefined && ev.attributes[key] !== null
                        )
                        .forEach((key) => engine.addFact(key, ev.attributes[key]));

                    // evaluate the rules
                    let results: EngineResult;
                    try {
                        results = await engine.run();
                    } catch (err) {
                        // silently ignore, as an incoming message may not contain the facts we're interested in
                    }
                    logger.debug(`filter.service filter: results:${JSON.stringify(results)}`);

                    if (results?.events?.length > 0) {
                        if (!sub.alerted || sub.event.disableAlertThreshold) {
                            // a new alert...
                            const attributes = await this.getTemplatePropertiesData(
                                sub,
                                ev,
                                templateCache
                            );
                            alerts.push(this.buildAlert(sub, attributes));
                            if (!sub.alerted) {
                                changedSubAlerts[sub.id] = {
                                    id: sub.id,
                                    eventSource: {
                                        id: sub.eventSource.id,
                                        principal: sub.eventSource.principal,
                                    },
                                    principalValue: sub.principalValue,
                                    alerted: true,
                                };
                            }
                            sub.alerted = true;
                        }
                    } else if (results?.events?.length === 0 && sub.alerted) {
                        // an alert that needs resetting...
                        sub.alerted = false;
                        changedSubAlerts[sub.id] = {
                            id: sub.id,
                            eventSource: {
                                id: sub.eventSource.id,
                                principal: sub.eventSource.principal,
                            },
                            principalValue: sub.principalValue,
                            alerted: false,
                        };
                    }

                    // clear the engine state ready for the next run
                    Object.keys(ev.attributes).forEach((key) => engine.removeFact(key));
                    engine.removeRule(rule);
                }

                // save the subscription in case its state changed
                const mapKey = this.subscriptionMapKey(ev);
                subscriptionMap[mapKey] = subscriptions;
            }
        }

        logger.debug(`filter.service filter: alerts:${JSON.stringify(alerts)}`);
        if (alerts.length > 0) {
            await this.alertDao.create(alerts);
        }
        if (Object.keys(changedSubAlerts).length > 0) {
            await this.alertDao.updateChangedSubAlertStatus(changedSubAlerts);
        }

        logger.debug(`filter.service filter: exit:`);
    }