public CompletableFuture apply()

in java/connected-components/src/main/java/org/apache/flink/statefun/playground/java/connectedcomponents/ConnectedComponentsFn.java [62:114]


  public CompletableFuture<Void> apply(Context context, Message message) {
    // initialize a new vertex
    if (message.is(Types.VERTEX_INIT_TYPE)) {
      final Vertex vertex = message.as(Types.VERTEX_INIT_TYPE);

      int currentComponentId = context.storage().get(COMPONENT_ID).orElse(Integer.MAX_VALUE);
      final Set<Integer> currentNeighbours = getCurrentNeighbours(context);

      if (currentComponentId > vertex.getVertexId()) {
        updateComponentId(context, vertex.getVertexId(), vertex.getVertexId());
        currentComponentId = vertex.getVertexId();
      }

      final HashSet<Integer> neighbourDiff = new HashSet<>(vertex.getNeighbours());
      neighbourDiff.removeAll(currentNeighbours);

      broadcastVertexConnectedComponentChange(
          context, vertex.getVertexId(), neighbourDiff, currentComponentId);

      // update the neighbours
      neighbourDiff.addAll(currentNeighbours);
      context.storage().set(NEIGHBOURS_VALUE, neighbourDiff);
    }
    // a neighbours component id has changed
    else if (message.is(Types.VERTEX_COMPONENT_CHANGE_TYPE)) {
      final VertexComponentChange vertexComponentChange =
          message.as(Types.VERTEX_COMPONENT_CHANGE_TYPE);
      final Set<Integer> currentNeighbours = getCurrentNeighbours(context);

      // only process the message if we can reach the source --> connected components with directed
      // edges
      if (currentNeighbours.contains(vertexComponentChange.getSource())) {
        final int componentIdCandidate = vertexComponentChange.getComponentId();
        final int currentComponentId =
            context.storage().get(COMPONENT_ID).orElse(Integer.MAX_VALUE);

        if (currentComponentId < componentIdCandidate) {
          sendVertexConnectedComponentChange(
              context,
              vertexComponentChange.getTarget(),
              vertexComponentChange.getSource(),
              currentComponentId);
        } else if (currentComponentId > componentIdCandidate) {
          updateComponentId(context, vertexComponentChange.getTarget(), componentIdCandidate);
          currentNeighbours.remove(vertexComponentChange.getSource());
          broadcastVertexConnectedComponentChange(
              context, vertexComponentChange.getTarget(), currentNeighbours, componentIdCandidate);
        }
      }
    }

    return context.done();
  }