protected String sendNewNotificationsEx()

in ignite-tc-helper-web/src/main/java/org/apache/ignite/ci/tcbot/issue/IssueDetector.java [151:322]


    protected String sendNewNotificationsEx() {
        List<INotificationChannel> channels = new ArrayList<>();

        userStorage.allUsers()
            .filter(TcHelperUser::hasEmail)
            .filter(TcHelperUser::hasSubscriptions)
            .forEach(channels::add);

        channels.addAll(cfg.notifications().channels());

        Map<String, Notification> toBeSent = new HashMap<>();

        AtomicInteger issuesChecked = new AtomicInteger();
        AtomicInteger filteredFresh = new AtomicInteger();
        AtomicInteger filteredBuildTs = new AtomicInteger();
        AtomicInteger filteredNotDisabled = new AtomicInteger();
        AtomicInteger hasSubscriptions = new AtomicInteger();
        AtomicInteger neverSentBefore = new AtomicInteger();

        issuesStorage.allIssues()
            .peek(issue -> issuesChecked.incrementAndGet())
            .filter(issue -> {
                long detected = issue.detectedTs == null ? 0 : issue.detectedTs;
                long issueAgeMs = System.currentTimeMillis() - detected;

                //here boundary can be not an absolute, but some ts when particular notification channel config was changed
                // alternatively boundary may depend to issue notification histroy

                boolean neverNotified = issue.addressNotified == null || issue.addressNotified.isEmpty();
                // if issue had a prior notification, limit age by 2 hours to avoid new addresses spamming.
                // otherwise check last day issues if it is notifiable
                long bound = TimeUnit.HOURS.toMillis(neverNotified
                    ? TcBotConst.NOTIFY_MAX_AGE_SINCE_DETECT_HOURS
                    : TcBotConst.NOTIFY_MAX_AGE_SINCE_DETECT_FOR_NOTIFIED_ISSUE_HOURS );

                return issueAgeMs <= bound;
            })
            .peek(issue -> filteredFresh.incrementAndGet())
            .filter(issue -> {
                if (issue.buildStartTs == null)
                    return true; // exception due to bug in issue detection; field was not filled

                long buildStartTs = issue.buildStartTs == null ? 0 : issue.buildStartTs;
                long buildAgeMs = System.currentTimeMillis() - buildStartTs;
                long maxBuildAgeToNotify = TimeUnit.DAYS.toMillis(TcBotConst.NOTIFY_MAX_AGE_SINCE_START_DAYS) / 2;

                return buildAgeMs <= maxBuildAgeToNotify;
            })
            .peek(issue -> filteredBuildTs.incrementAndGet())
            .filter(issue -> {
                return cfg.getTrackedBranches()
                    .get(issue.trackedBranchName)
                    .filter(tb -> !tb.disableIssueTypes().contains(issue.type()))
                    .isPresent();
            })
            .peek(issue -> filteredNotDisabled.incrementAndGet())
            .forEach(issue -> {
                List<String> addrs = new ArrayList<>();

                final String srvCode = issue.issueKey().server;

                AtomicInteger ctnSrvAllowed = new AtomicInteger();
                AtomicInteger cntSubscibed = new AtomicInteger();
                AtomicInteger cntTagsFilterPassed = new AtomicInteger();

                channels.stream()
                    .filter(ch -> ch.isServerAllowed(srvCode))
                    .peek(ch -> ctnSrvAllowed.incrementAndGet())
                    .filter(ch -> ch.isSubscribedToBranch(issue.trackedBranchName))
                    .peek(ch -> cntSubscibed.incrementAndGet())
                    .filter(ch -> {
                        if (ch.hasTagFilter())
                            return issue.buildTags().stream().anyMatch(ch::isSubscribedToTag);

                        return true;
                    })
                    .peek(ch -> cntTagsFilterPassed.incrementAndGet())
                    .forEach(channel -> {
                        String email = channel.email();
                        String slack = channel.slack();
                        logger.info("User/channel " + channel + " is candidate for notification " + email
                            + " , " + slack + " for " + issue);

                        if (!Strings.isNullOrEmpty(email))
                            addrs.add(email);

                        if (!Strings.isNullOrEmpty(slack))
                            addrs.add(SLACK + slack);
                    });

                if(!addrs.isEmpty())
                    hasSubscriptions.incrementAndGet();

                boolean nonNotifedChFound = false;

                for (String nextAddr : addrs) {
                    if (issuesStorage.getIsNewAndSetNotified(issue.issueKey, nextAddr, null)) {
                        nonNotifedChFound = true;

                        toBeSent.computeIfAbsent(nextAddr, addr -> {
                            Notification notification = new Notification();
                            notification.ts = System.currentTimeMillis();
                            notification.addr = addr;
                            return notification;
                        }).addIssue(issue);
                    }
                }

                if (!nonNotifedChFound) {
                    issuesStorage.saveIssueSubscribersStat(issue.issueKey,
                        ctnSrvAllowed.get(),
                        cntSubscibed.get(),
                        cntTagsFilterPassed.get());
                }
                else
                    neverSentBefore.incrementAndGet();
            });

        String stat = issuesChecked.get() + " issues checked, " +
            filteredFresh.get() + " detected recenty, " +
            filteredBuildTs.get() + " for fresh builds, " +
            filteredNotDisabled.get() + " not disabled, " +
            hasSubscriptions.get() + " has subscriber, " +
            neverSentBefore.get() + " non sent before";

        if (toBeSent.isEmpty())
            return "Noting to notify, " + stat;

        NotificationsConfig notifications = cfg.notifications();

        Map<String, AtomicInteger> sndStat = new HashMap<>();

        for (Notification next : toBeSent.values()) {
            String addr = next.addr;

            try {
                if (addr.startsWith(SLACK)) {
                    String slackUser = addr.substring(SLACK.length());

                    List<String> messages = next.toSlackMarkup();

                    for (String msg : messages) {
                        slackSender.sendMessage(slackUser, msg, notifications);

                        sndStat.computeIfAbsent(addr, k -> new AtomicInteger()).incrementAndGet();
                    }
                }
                else {
                    String builds = next.buildIdToIssue.keySet().toString();
                    String subj = "[MTCGA]: " + next.countIssues() + " new failures in builds " + builds + " needs to be handled";

                    emailSender.sendEmail(addr, subj, next.toHtml(), next.toPlainText(), notifications.email());

                    sndStat.computeIfAbsent(addr, k -> new AtomicInteger()).incrementAndGet();
                }
            }
            catch (Exception e) {
                e.printStackTrace();
                logger.warn("Unable to notify address [" + addr + "] about build failures", e);

                next.allIssues().forEach(issue -> {
                        IssueKey key = issue.issueKey();
                        // rollback successfull notification
                        issuesStorage.getIsNewAndSetNotified(key, addr, e);
                    });

                stat += " ;" + e.getClass().getSimpleName() + ": " + e.getMessage();
            }
        }

        return "Send " + sndStat.toString() + "; Statistics: " + stat;
    }