in ignite-tc-helper-web/src/main/java/org/apache/ignite/ci/tcbot/issue/IssueDetector.java [151:322]
protected String sendNewNotificationsEx() {
List<INotificationChannel> channels = new ArrayList<>();
userStorage.allUsers()
.filter(TcHelperUser::hasEmail)
.filter(TcHelperUser::hasSubscriptions)
.forEach(channels::add);
channels.addAll(cfg.notifications().channels());
Map<String, Notification> toBeSent = new HashMap<>();
AtomicInteger issuesChecked = new AtomicInteger();
AtomicInteger filteredFresh = new AtomicInteger();
AtomicInteger filteredBuildTs = new AtomicInteger();
AtomicInteger filteredNotDisabled = new AtomicInteger();
AtomicInteger hasSubscriptions = new AtomicInteger();
AtomicInteger neverSentBefore = new AtomicInteger();
issuesStorage.allIssues()
.peek(issue -> issuesChecked.incrementAndGet())
.filter(issue -> {
long detected = issue.detectedTs == null ? 0 : issue.detectedTs;
long issueAgeMs = System.currentTimeMillis() - detected;
//here boundary can be not an absolute, but some ts when particular notification channel config was changed
// alternatively boundary may depend to issue notification histroy
boolean neverNotified = issue.addressNotified == null || issue.addressNotified.isEmpty();
// if issue had a prior notification, limit age by 2 hours to avoid new addresses spamming.
// otherwise check last day issues if it is notifiable
long bound = TimeUnit.HOURS.toMillis(neverNotified
? TcBotConst.NOTIFY_MAX_AGE_SINCE_DETECT_HOURS
: TcBotConst.NOTIFY_MAX_AGE_SINCE_DETECT_FOR_NOTIFIED_ISSUE_HOURS );
return issueAgeMs <= bound;
})
.peek(issue -> filteredFresh.incrementAndGet())
.filter(issue -> {
if (issue.buildStartTs == null)
return true; // exception due to bug in issue detection; field was not filled
long buildStartTs = issue.buildStartTs == null ? 0 : issue.buildStartTs;
long buildAgeMs = System.currentTimeMillis() - buildStartTs;
long maxBuildAgeToNotify = TimeUnit.DAYS.toMillis(TcBotConst.NOTIFY_MAX_AGE_SINCE_START_DAYS) / 2;
return buildAgeMs <= maxBuildAgeToNotify;
})
.peek(issue -> filteredBuildTs.incrementAndGet())
.filter(issue -> {
return cfg.getTrackedBranches()
.get(issue.trackedBranchName)
.filter(tb -> !tb.disableIssueTypes().contains(issue.type()))
.isPresent();
})
.peek(issue -> filteredNotDisabled.incrementAndGet())
.forEach(issue -> {
List<String> addrs = new ArrayList<>();
final String srvCode = issue.issueKey().server;
AtomicInteger ctnSrvAllowed = new AtomicInteger();
AtomicInteger cntSubscibed = new AtomicInteger();
AtomicInteger cntTagsFilterPassed = new AtomicInteger();
channels.stream()
.filter(ch -> ch.isServerAllowed(srvCode))
.peek(ch -> ctnSrvAllowed.incrementAndGet())
.filter(ch -> ch.isSubscribedToBranch(issue.trackedBranchName))
.peek(ch -> cntSubscibed.incrementAndGet())
.filter(ch -> {
if (ch.hasTagFilter())
return issue.buildTags().stream().anyMatch(ch::isSubscribedToTag);
return true;
})
.peek(ch -> cntTagsFilterPassed.incrementAndGet())
.forEach(channel -> {
String email = channel.email();
String slack = channel.slack();
logger.info("User/channel " + channel + " is candidate for notification " + email
+ " , " + slack + " for " + issue);
if (!Strings.isNullOrEmpty(email))
addrs.add(email);
if (!Strings.isNullOrEmpty(slack))
addrs.add(SLACK + slack);
});
if(!addrs.isEmpty())
hasSubscriptions.incrementAndGet();
boolean nonNotifedChFound = false;
for (String nextAddr : addrs) {
if (issuesStorage.getIsNewAndSetNotified(issue.issueKey, nextAddr, null)) {
nonNotifedChFound = true;
toBeSent.computeIfAbsent(nextAddr, addr -> {
Notification notification = new Notification();
notification.ts = System.currentTimeMillis();
notification.addr = addr;
return notification;
}).addIssue(issue);
}
}
if (!nonNotifedChFound) {
issuesStorage.saveIssueSubscribersStat(issue.issueKey,
ctnSrvAllowed.get(),
cntSubscibed.get(),
cntTagsFilterPassed.get());
}
else
neverSentBefore.incrementAndGet();
});
String stat = issuesChecked.get() + " issues checked, " +
filteredFresh.get() + " detected recenty, " +
filteredBuildTs.get() + " for fresh builds, " +
filteredNotDisabled.get() + " not disabled, " +
hasSubscriptions.get() + " has subscriber, " +
neverSentBefore.get() + " non sent before";
if (toBeSent.isEmpty())
return "Noting to notify, " + stat;
NotificationsConfig notifications = cfg.notifications();
Map<String, AtomicInteger> sndStat = new HashMap<>();
for (Notification next : toBeSent.values()) {
String addr = next.addr;
try {
if (addr.startsWith(SLACK)) {
String slackUser = addr.substring(SLACK.length());
List<String> messages = next.toSlackMarkup();
for (String msg : messages) {
slackSender.sendMessage(slackUser, msg, notifications);
sndStat.computeIfAbsent(addr, k -> new AtomicInteger()).incrementAndGet();
}
}
else {
String builds = next.buildIdToIssue.keySet().toString();
String subj = "[MTCGA]: " + next.countIssues() + " new failures in builds " + builds + " needs to be handled";
emailSender.sendEmail(addr, subj, next.toHtml(), next.toPlainText(), notifications.email());
sndStat.computeIfAbsent(addr, k -> new AtomicInteger()).incrementAndGet();
}
}
catch (Exception e) {
e.printStackTrace();
logger.warn("Unable to notify address [" + addr + "] about build failures", e);
next.allIssues().forEach(issue -> {
IssueKey key = issue.issueKey();
// rollback successfull notification
issuesStorage.getIsNewAndSetNotified(key, addr, e);
});
stat += " ;" + e.getClass().getSimpleName() + ": " + e.getMessage();
}
}
return "Send " + sndStat.toString() + "; Statistics: " + stat;
}