void syncStateOnOuterVertexImmutable()

in analytical_engine/java/grape-jdk/src/main/java/com/alibaba/graphscope/parallel/ParallelMessageManager.java [245:523]


            void syncStateOnOuterVertexImmutable(
                    @CXXReference FRAG_T frag,
                    @CXXReference Vertex<VID_T> vertex,
                    @CXXReference MSG_T msg,
                    int channel_id,
                    @FFISkip UNUSED_T unused);

    /**
     * Send a msg to the fragment where the querying outer vertex is an inner vertexin another
     * fragment.
     *
     * @param frag       ArrowProjectedFragment.
     * @param vertex     querying vertex.
     * @param msg        msg to send.
     * @param channel_id channel id.
     * @param <MSG_T>    message type.
     */
    @FFINameAlias("SyncStateOnOuterVertex")
    <
                    @FFISkip OID_T,
                    @FFISkip VID_T,
                    @FFISkip VDATA_T,
                    @FFISkip EDATA_T,
                    FRAG_T extends ArrowProjectedFragment<OID_T, VID_T, VDATA_T, EDATA_T>,
                    MSG_T,
                    @FFISkip UNUSED_T>
            void syncStateOnOuterVertexArrowProjected(
                    @CXXReference FRAG_T frag,
                    @CXXReference Vertex<VID_T> vertex,
                    @CXXReference MSG_T msg,
                    int channel_id,
                    @FFISkip UNUSED_T unused);

    /**
     * SyncState on outer vertex without message, used in bfs etc.
     *
     * @param frag       fragment.
     * @param vertex     query vertex.
     * @param channel_id message channel id.
     */
    @FFINameAlias("SyncStateOnOuterVertex")
    <
                    @FFISkip OID_T,
                    @FFISkip VID_T,
                    @FFISkip VDATA_T,
                    @FFISkip EDATA_T,
                    FRAG_T extends ImmutableEdgecutFragment<OID_T, VID_T, VDATA_T, EDATA_T>,
                    @FFISkip UNUSED_T>
            void syncStateOnOuterVertexImmutableNoMsg(
                    @CXXReference FRAG_T frag,
                    @CXXReference Vertex<VID_T> vertex,
                    int channel_id,
                    @FFISkip UNUSED_T vdata);

    /**
     * SyncState on outer vertex without message, used in bfs etc.
     *
     * @param frag       fragment.
     * @param vertex     query vertex.
     * @param channel_id message channel id.
     */
    @FFINameAlias("SyncStateOnOuterVertex")
    <
                    @FFISkip OID_T,
                    @FFISkip VID_T,
                    @FFISkip VDATA_T,
                    @FFISkip EDATA_T,
                    FRAG_T extends ArrowProjectedFragment<OID_T, VID_T, VDATA_T, EDATA_T>,
                    @FFISkip UNUSED_T>
            void syncStateOnOuterVertexArrowProjectedNoMsg(
                    @CXXReference FRAG_T frag,
                    @CXXReference Vertex<VID_T> vertex,
                    int channel_id,
                    @FFISkip UNUSED_T vdata);

    /**
     * Send the a vertex's data to other fragment through outgoing edges.
     *
     * @param frag       ImmutableEdgeCutFragment.
     * @param vertex     querying vertex.
     * @param msg        msg to send.
     * @param channel_id channel_id
     * @param <MSG_T>    message type.
     */
    @FFINameAlias("SendMsgThroughOEdges")
    <
                    @FFISkip OID_T,
                    @FFISkip VID_T,
                    @FFISkip VDATA_T,
                    @FFISkip EDATA_T,
                    FRAG_T extends ImmutableEdgecutFragment<OID_T, VID_T, VDATA_T, EDATA_T>,
                    MSG_T,
                    @FFISkip UNUSED_T>
            void sendMsgThroughOEdgesImmutable(
                    @CXXReference FRAG_T frag,
                    @CXXReference Vertex<VID_T> vertex,
                    @CXXReference MSG_T msg,
                    int channel_id,
                    @FFISkip UNUSED_T unused);

    /**
     * Send the a vertex's data to other fragment through outgoing edges.
     *
     * @param frag       ArrowProjectedFragment.
     * @param vertex     querying vertex.
     * @param msg        msg to send.
     * @param channel_id channel_id.
     * @param <MSG_T>    message type.
     */
    @FFINameAlias("SendMsgThroughOEdges")
    <
                    @FFISkip OID_T,
                    @FFISkip VID_T,
                    @FFISkip VDATA_T,
                    @FFISkip EDATA_T,
                    FRAG_T extends ArrowProjectedFragment<OID_T, VID_T, VDATA_T, EDATA_T>,
                    MSG_T,
                    @FFISkip UNUSED_T>
            void sendMsgThroughOEdgesArrowProjected(
                    @CXXReference FRAG_T frag,
                    @CXXReference Vertex<VID_T> vertex,
                    @CXXReference MSG_T msg,
                    int channel_id,
                    @FFISkip UNUSED_T unused);

    /**
     * Send the a vertex's data to other fragment through incoming and outgoing edges.
     *
     * @param frag       ImmutableEdgecutFragment.
     * @param vertex     querying vertex.
     * @param msg        msg to send.
     * @param channel_id channel_id.
     * @param <MSG_T>    message type.
     */
    @FFINameAlias("SendMsgThroughEdges")
    <
                    @FFISkip OID_T,
                    @FFISkip VID_T,
                    @FFISkip VDATA_T,
                    @FFISkip EDATA_T,
                    FRAG_T extends ImmutableEdgecutFragment<OID_T, VID_T, VDATA_T, EDATA_T>,
                    MSG_T,
                    @FFISkip UNUSED_T>
            void sendMsgThroughEdgesImmutable(
                    @CXXReference FRAG_T frag,
                    @CXXReference Vertex<VID_T> vertex,
                    @CXXReference MSG_T msg,
                    int channel_id,
                    @FFISkip UNUSED_T unused);

    /**
     * Send the a vertex's data to other fragment through incoming and outgoing edges.
     *
     * @param frag       ArrowProjectedFragment.
     * @param vertex     querying vertex.
     * @param msg        msg to send.
     * @param channel_id channel_id.
     * @param <MSG_T>    message type.
     */
    @FFINameAlias("SendMsgThroughEdges")
    <
                    @FFISkip OID_T,
                    @FFISkip VID_T,
                    @FFISkip VDATA_T,
                    @FFISkip EDATA_T,
                    FRAG_T extends ArrowProjectedFragment<OID_T, VID_T, VDATA_T, EDATA_T>,
                    MSG_T,
                    @FFISkip UNUSED_T>
            void sendMsgThroughEdgesArrowProjected(
                    @CXXReference FRAG_T frag,
                    @CXXReference Vertex<VID_T> vertex,
                    @CXXReference MSG_T msg,
                    int channel_id,
                    @FFISkip UNUSED_T unused);

    /**
     * Send the a vertex's data to other fragment through incoming edges.
     *
     * @param frag       ImmutableEdgecutFragment.
     * @param vertex     querying vertex.
     * @param msg        msg to send.
     * @param channel_id channel_id.
     * @param <MSG_T>    message type.
     */
    @FFINameAlias("SendMsgThroughIEdges")
    <
                    @FFISkip OID_T,
                    @FFISkip VID_T,
                    @FFISkip VDATA_T,
                    @FFISkip EDATA_T,
                    FRAG_T extends ImmutableEdgecutFragment<OID_T, VID_T, VDATA_T, EDATA_T>,
                    MSG_T,
                    @FFISkip UNUSED_T>
            void sendMsgThroughIEdgesImmutable(
                    @CXXReference FRAG_T frag,
                    @CXXReference Vertex<VID_T> vertex,
                    @CXXReference MSG_T msg,
                    int channel_id,
                    @FFISkip UNUSED_T unused);

    /**
     * Send the a vertex's data to other fragment through incoming edges.
     *
     * @param <MSG_T>    message type.
     * @param frag       ArrowProjectedFragment.
     * @param vertex     querying vertex.
     * @param msg        msg to send.
     * @param channel_id channel_id.
     */
    @FFINameAlias("SendMsgThroughIEdges")
    <
                    @FFISkip OID_T,
                    @FFISkip VID_T,
                    @FFISkip VDATA_T,
                    @FFISkip EDATA_T,
                    FRAG_T extends ArrowProjectedFragment<OID_T, VID_T, VDATA_T, EDATA_T>,
                    MSG_T,
                    @FFISkip UNUSED_T>
            void sendMsgThroughIEdgesArrowProjected(
                    @CXXReference FRAG_T frag,
                    @CXXReference Vertex<VID_T> vertex,
                    @CXXReference MSG_T msg,
                    int channel_id,
                    @FFISkip UNUSED_T unused);

    /**
     * Parallel processing the messages received from last super step. The user just need to provide
     * a lambda consumer.
     *
     * @param <MSG_T>     message type.
     * @param frag        fragment.
     * @param threadNum   number of threads to use.
     * @param executor    thread pool executor.
     * @param msgSupplier a producer function creating a msg instance.
     * @param consumer    lambda function.
     */
    default <OID_T, VID_T, VDATA_T, EDATA_T, MSG_T> void parallelProcess(
            IFragment<OID_T, VID_T, VDATA_T, EDATA_T> frag,
            int threadNum,
            ExecutorService executor,
            Supplier<MSG_T> msgSupplier,
            BiConsumer<Vertex<VID_T>, MSG_T> consumer) {
        CountDownLatch countDownLatch = new CountDownLatch(threadNum);
        MessageInBuffer.Factory bufferFactory = FFITypeFactoryhelper.newMessageInBuffer();
        int chunkSize = 1024;
        for (int tid = 0; tid < threadNum; ++tid) {
            final int finalTid = tid;
            executor.execute(
                    new Runnable() {
                        @Override
                        public void run() {
                            MessageInBuffer messageInBuffer = bufferFactory.create();
                            Vertex<VID_T> vertex =
                                    FFITypeFactoryhelper.newVertex(frag.getVidClass());
                            MSG_T msg = msgSupplier.get();
                            boolean result;
                            while (true) {
                                result = getMessageInBuffer(messageInBuffer);
                                if (result) {
                                    while (messageInBuffer.getMessage(frag, vertex, msg)) {
                                        consumer.accept(vertex, msg);
                                    }
                                } else {
                                    break;
                                }
                            }
                            countDownLatch.countDown();
                            messageInBuffer.delete();
                            vertex.delete();
                        }
                    });
        }
        try {
            countDownLatch.await();
        } catch (Exception e) {
            e.printStackTrace();
            executor.shutdown();
        }
    }