public void endInput()

in flink-ml-lib/src/main/java/org/apache/flink/ml/recommendation/swing/Swing.java [301:365]


        public void endInput() throws Exception {

            Map<Long, Double> userWeights = new HashMap<>(userAndPurchasedItems.size());
            userAndPurchasedItems.forEach(
                    (k, v) -> {
                        int count = v.length;
                        userWeights.put(k, calculateWeight(count));
                    });

            long[] interaction = new long[maxUserBehavior];
            for (long mainItem : itemAndPurchasers.keySet()) {
                List<Long> userList = itemAndPurchasers.get(mainItem);
                HashMap<Long, Double> id2swing = new HashMap<>();

                for (int i = 1; i < userList.size(); i++) {
                    long u = userList.get(i);
                    int interactionSize;
                    for (int j = i + 1; j < userList.size(); j++) {
                        long v = userList.get(j);
                        interactionSize =
                                calculateCommonItems(
                                        userAndPurchasedItems.get(u),
                                        userAndPurchasedItems.get(v),
                                        interaction);
                        if (interactionSize == 0) {
                            continue;
                        }
                        double similarity =
                                userWeights.get(u)
                                        * userWeights.get(v)
                                        / (alpha2 + interactionSize);
                        for (int k = 0; k < interactionSize; k++) {
                            long simItem = interaction[k];
                            if (simItem == mainItem) {
                                continue;
                            }
                            double itemSimilarity =
                                    id2swing.getOrDefault(simItem, 0.0) + similarity;
                            id2swing.put(simItem, itemSimilarity);
                        }
                    }
                }

                ArrayList<Tuple2<Long, Double>> itemAndScore = new ArrayList<>();
                id2swing.forEach((key, value) -> itemAndScore.add(Tuple2.of(key, value)));

                itemAndScore.sort((o1, o2) -> Double.compare(o2.f1, o1.f1));

                if (itemAndScore.size() == 0) {
                    continue;
                }

                int itemNums = Math.min(k, itemAndScore.size());
                String itemList =
                        itemAndScore.stream()
                                .sequential()
                                .limit(itemNums)
                                .map(tuple2 -> "" + tuple2.f0 + commaDelimiter + tuple2.f1)
                                .collect(Collectors.joining("" + semicolonDelimiter));
                output.collect(new StreamRecord<>(Row.of(mainItem, itemList)));
            }

            userAndPurchasedItemsState.clear();
            itemAndPurchasersState.clear();
        }