in flink-ml-lib/src/main/java/org/apache/flink/ml/recommendation/swing/Swing.java [301:365]
public void endInput() throws Exception {
Map<Long, Double> userWeights = new HashMap<>(userAndPurchasedItems.size());
userAndPurchasedItems.forEach(
(k, v) -> {
int count = v.length;
userWeights.put(k, calculateWeight(count));
});
long[] interaction = new long[maxUserBehavior];
for (long mainItem : itemAndPurchasers.keySet()) {
List<Long> userList = itemAndPurchasers.get(mainItem);
HashMap<Long, Double> id2swing = new HashMap<>();
for (int i = 1; i < userList.size(); i++) {
long u = userList.get(i);
int interactionSize;
for (int j = i + 1; j < userList.size(); j++) {
long v = userList.get(j);
interactionSize =
calculateCommonItems(
userAndPurchasedItems.get(u),
userAndPurchasedItems.get(v),
interaction);
if (interactionSize == 0) {
continue;
}
double similarity =
userWeights.get(u)
* userWeights.get(v)
/ (alpha2 + interactionSize);
for (int k = 0; k < interactionSize; k++) {
long simItem = interaction[k];
if (simItem == mainItem) {
continue;
}
double itemSimilarity =
id2swing.getOrDefault(simItem, 0.0) + similarity;
id2swing.put(simItem, itemSimilarity);
}
}
}
ArrayList<Tuple2<Long, Double>> itemAndScore = new ArrayList<>();
id2swing.forEach((key, value) -> itemAndScore.add(Tuple2.of(key, value)));
itemAndScore.sort((o1, o2) -> Double.compare(o2.f1, o1.f1));
if (itemAndScore.size() == 0) {
continue;
}
int itemNums = Math.min(k, itemAndScore.size());
String itemList =
itemAndScore.stream()
.sequential()
.limit(itemNums)
.map(tuple2 -> "" + tuple2.f0 + commaDelimiter + tuple2.f1)
.collect(Collectors.joining("" + semicolonDelimiter));
output.collect(new StreamRecord<>(Row.of(mainItem, itemList)));
}
userAndPurchasedItemsState.clear();
itemAndPurchasersState.clear();
}