public Table approxSimilarityJoin()

in flink-ml-lib/src/main/java/org/apache/flink/ml/feature/lsh/LSHModel.java [211:273]


    public Table approxSimilarityJoin(
            Table datasetA, Table datasetB, double threshold, String idCol, String distCol) {
        StreamTableEnvironment tEnv =
                (StreamTableEnvironment) ((TableImpl) datasetA).getTableEnvironment();

        DataStream<Row> explodedA = preprocessData(datasetA, idCol);
        DataStream<Row> explodedB = preprocessData(datasetB, idCol);

        RowTypeInfo inputTypeInfo = getOutputType(datasetA, idCol);
        RowTypeInfo outputTypeInfo =
                new RowTypeInfo(
                        inputTypeInfo.getTypeAt(0),
                        inputTypeInfo.getTypeAt(0),
                        inputTypeInfo.getTypeAt(1),
                        inputTypeInfo.getTypeAt(1));

        DataStream<? extends LSHModelData> modelData =
                tEnv.toDataStream(modelDataTable, modelDataClass);
        DataStream<Row> sameBucketPairs =
                explodedA
                        .join(explodedB)
                        .where(new IndexHashValueKeySelector())
                        .equalTo(new IndexHashValueKeySelector())
                        .window(EndOfStreamWindows.get())
                        .apply(
                                (r0, r1) ->
                                        Row.of(
                                                r0.getField(0),
                                                r1.getField(0),
                                                r0.getField(1),
                                                r1.getField(1)),
                                outputTypeInfo);

        DataStream<Row> distinctSameBucketPairs =
                DataStreamUtils.reduce(
                        sameBucketPairs.keyBy(
                                new KeySelector<Row, Tuple2<Integer, Integer>>() {
                                    @Override
                                    public Tuple2<Integer, Integer> getKey(Row r) {
                                        return Tuple2.of(r.getFieldAs(0), r.getFieldAs(1));
                                    }
                                }),
                        (r0, r1) -> r0,
                        outputTypeInfo);

        TypeInformation<?> idColType =
                TableUtils.getRowTypeInfo(datasetA.getResolvedSchema()).getTypeAt(idCol);
        DataStream<Row> pairsWithDists =
                BroadcastUtils.withBroadcastStream(
                        Collections.singletonList(distinctSameBucketPairs),
                        Collections.singletonMap(MODEL_DATA_BC_KEY, modelData),
                        inputList -> {
                            DataStream<Row> data = (DataStream<Row>) inputList.get(0);
                            return data.flatMap(
                                    new FilterByDistanceFunction(threshold),
                                    new RowTypeInfo(
                                            new TypeInformation[] {
                                                idColType, idColType, Types.DOUBLE
                                            },
                                            new String[] {"datasetA.id", "datasetB.id", distCol}));
                        });
        return tEnv.fromDataStream(pairsWithDists);
    }