in flink-ml-lib/src/main/java/org/apache/flink/ml/feature/lsh/LSHModel.java [211:273]
public Table approxSimilarityJoin(
Table datasetA, Table datasetB, double threshold, String idCol, String distCol) {
StreamTableEnvironment tEnv =
(StreamTableEnvironment) ((TableImpl) datasetA).getTableEnvironment();
DataStream<Row> explodedA = preprocessData(datasetA, idCol);
DataStream<Row> explodedB = preprocessData(datasetB, idCol);
RowTypeInfo inputTypeInfo = getOutputType(datasetA, idCol);
RowTypeInfo outputTypeInfo =
new RowTypeInfo(
inputTypeInfo.getTypeAt(0),
inputTypeInfo.getTypeAt(0),
inputTypeInfo.getTypeAt(1),
inputTypeInfo.getTypeAt(1));
DataStream<? extends LSHModelData> modelData =
tEnv.toDataStream(modelDataTable, modelDataClass);
DataStream<Row> sameBucketPairs =
explodedA
.join(explodedB)
.where(new IndexHashValueKeySelector())
.equalTo(new IndexHashValueKeySelector())
.window(EndOfStreamWindows.get())
.apply(
(r0, r1) ->
Row.of(
r0.getField(0),
r1.getField(0),
r0.getField(1),
r1.getField(1)),
outputTypeInfo);
DataStream<Row> distinctSameBucketPairs =
DataStreamUtils.reduce(
sameBucketPairs.keyBy(
new KeySelector<Row, Tuple2<Integer, Integer>>() {
@Override
public Tuple2<Integer, Integer> getKey(Row r) {
return Tuple2.of(r.getFieldAs(0), r.getFieldAs(1));
}
}),
(r0, r1) -> r0,
outputTypeInfo);
TypeInformation<?> idColType =
TableUtils.getRowTypeInfo(datasetA.getResolvedSchema()).getTypeAt(idCol);
DataStream<Row> pairsWithDists =
BroadcastUtils.withBroadcastStream(
Collections.singletonList(distinctSameBucketPairs),
Collections.singletonMap(MODEL_DATA_BC_KEY, modelData),
inputList -> {
DataStream<Row> data = (DataStream<Row>) inputList.get(0);
return data.flatMap(
new FilterByDistanceFunction(threshold),
new RowTypeInfo(
new TypeInformation[] {
idColType, idColType, Types.DOUBLE
},
new String[] {"datasetA.id", "datasetB.id", distCol}));
});
return tEnv.fromDataStream(pairsWithDists);
}