in java/connected-components/src/main/java/org/apache/flink/statefun/playground/java/connectedcomponents/ConnectedComponentsFn.java [62:114]
public CompletableFuture<Void> apply(Context context, Message message) {
// initialize a new vertex
if (message.is(Types.VERTEX_INIT_TYPE)) {
final Vertex vertex = message.as(Types.VERTEX_INIT_TYPE);
int currentComponentId = context.storage().get(COMPONENT_ID).orElse(Integer.MAX_VALUE);
final Set<Integer> currentNeighbours = getCurrentNeighbours(context);
if (currentComponentId > vertex.getVertexId()) {
updateComponentId(context, vertex.getVertexId(), vertex.getVertexId());
currentComponentId = vertex.getVertexId();
}
final HashSet<Integer> neighbourDiff = new HashSet<>(vertex.getNeighbours());
neighbourDiff.removeAll(currentNeighbours);
broadcastVertexConnectedComponentChange(
context, vertex.getVertexId(), neighbourDiff, currentComponentId);
// update the neighbours
neighbourDiff.addAll(currentNeighbours);
context.storage().set(NEIGHBOURS_VALUE, neighbourDiff);
}
// a neighbours component id has changed
else if (message.is(Types.VERTEX_COMPONENT_CHANGE_TYPE)) {
final VertexComponentChange vertexComponentChange =
message.as(Types.VERTEX_COMPONENT_CHANGE_TYPE);
final Set<Integer> currentNeighbours = getCurrentNeighbours(context);
// only process the message if we can reach the source --> connected components with directed
// edges
if (currentNeighbours.contains(vertexComponentChange.getSource())) {
final int componentIdCandidate = vertexComponentChange.getComponentId();
final int currentComponentId =
context.storage().get(COMPONENT_ID).orElse(Integer.MAX_VALUE);
if (currentComponentId < componentIdCandidate) {
sendVertexConnectedComponentChange(
context,
vertexComponentChange.getTarget(),
vertexComponentChange.getSource(),
currentComponentId);
} else if (currentComponentId > componentIdCandidate) {
updateComponentId(context, vertexComponentChange.getTarget(), componentIdCandidate);
currentNeighbours.remove(vertexComponentChange.getSource());
broadcastVertexConnectedComponentChange(
context, vertexComponentChange.getTarget(), currentNeighbours, componentIdCandidate);
}
}
}
return context.done();
}