in analytical_engine/java/grape-jdk/src/main/java/com/alibaba/graphscope/parallel/ParallelMessageManager.java [245:523]
void syncStateOnOuterVertexImmutable(
@CXXReference FRAG_T frag,
@CXXReference Vertex<VID_T> vertex,
@CXXReference MSG_T msg,
int channel_id,
@FFISkip UNUSED_T unused);
/**
* Send a msg to the fragment where the querying outer vertex is an inner vertexin another
* fragment.
*
* @param frag ArrowProjectedFragment.
* @param vertex querying vertex.
* @param msg msg to send.
* @param channel_id channel id.
* @param <MSG_T> message type.
*/
@FFINameAlias("SyncStateOnOuterVertex")
<
@FFISkip OID_T,
@FFISkip VID_T,
@FFISkip VDATA_T,
@FFISkip EDATA_T,
FRAG_T extends ArrowProjectedFragment<OID_T, VID_T, VDATA_T, EDATA_T>,
MSG_T,
@FFISkip UNUSED_T>
void syncStateOnOuterVertexArrowProjected(
@CXXReference FRAG_T frag,
@CXXReference Vertex<VID_T> vertex,
@CXXReference MSG_T msg,
int channel_id,
@FFISkip UNUSED_T unused);
/**
* SyncState on outer vertex without message, used in bfs etc.
*
* @param frag fragment.
* @param vertex query vertex.
* @param channel_id message channel id.
*/
@FFINameAlias("SyncStateOnOuterVertex")
<
@FFISkip OID_T,
@FFISkip VID_T,
@FFISkip VDATA_T,
@FFISkip EDATA_T,
FRAG_T extends ImmutableEdgecutFragment<OID_T, VID_T, VDATA_T, EDATA_T>,
@FFISkip UNUSED_T>
void syncStateOnOuterVertexImmutableNoMsg(
@CXXReference FRAG_T frag,
@CXXReference Vertex<VID_T> vertex,
int channel_id,
@FFISkip UNUSED_T vdata);
/**
* SyncState on outer vertex without message, used in bfs etc.
*
* @param frag fragment.
* @param vertex query vertex.
* @param channel_id message channel id.
*/
@FFINameAlias("SyncStateOnOuterVertex")
<
@FFISkip OID_T,
@FFISkip VID_T,
@FFISkip VDATA_T,
@FFISkip EDATA_T,
FRAG_T extends ArrowProjectedFragment<OID_T, VID_T, VDATA_T, EDATA_T>,
@FFISkip UNUSED_T>
void syncStateOnOuterVertexArrowProjectedNoMsg(
@CXXReference FRAG_T frag,
@CXXReference Vertex<VID_T> vertex,
int channel_id,
@FFISkip UNUSED_T vdata);
/**
* Send the a vertex's data to other fragment through outgoing edges.
*
* @param frag ImmutableEdgeCutFragment.
* @param vertex querying vertex.
* @param msg msg to send.
* @param channel_id channel_id
* @param <MSG_T> message type.
*/
@FFINameAlias("SendMsgThroughOEdges")
<
@FFISkip OID_T,
@FFISkip VID_T,
@FFISkip VDATA_T,
@FFISkip EDATA_T,
FRAG_T extends ImmutableEdgecutFragment<OID_T, VID_T, VDATA_T, EDATA_T>,
MSG_T,
@FFISkip UNUSED_T>
void sendMsgThroughOEdgesImmutable(
@CXXReference FRAG_T frag,
@CXXReference Vertex<VID_T> vertex,
@CXXReference MSG_T msg,
int channel_id,
@FFISkip UNUSED_T unused);
/**
* Send the a vertex's data to other fragment through outgoing edges.
*
* @param frag ArrowProjectedFragment.
* @param vertex querying vertex.
* @param msg msg to send.
* @param channel_id channel_id.
* @param <MSG_T> message type.
*/
@FFINameAlias("SendMsgThroughOEdges")
<
@FFISkip OID_T,
@FFISkip VID_T,
@FFISkip VDATA_T,
@FFISkip EDATA_T,
FRAG_T extends ArrowProjectedFragment<OID_T, VID_T, VDATA_T, EDATA_T>,
MSG_T,
@FFISkip UNUSED_T>
void sendMsgThroughOEdgesArrowProjected(
@CXXReference FRAG_T frag,
@CXXReference Vertex<VID_T> vertex,
@CXXReference MSG_T msg,
int channel_id,
@FFISkip UNUSED_T unused);
/**
* Send the a vertex's data to other fragment through incoming and outgoing edges.
*
* @param frag ImmutableEdgecutFragment.
* @param vertex querying vertex.
* @param msg msg to send.
* @param channel_id channel_id.
* @param <MSG_T> message type.
*/
@FFINameAlias("SendMsgThroughEdges")
<
@FFISkip OID_T,
@FFISkip VID_T,
@FFISkip VDATA_T,
@FFISkip EDATA_T,
FRAG_T extends ImmutableEdgecutFragment<OID_T, VID_T, VDATA_T, EDATA_T>,
MSG_T,
@FFISkip UNUSED_T>
void sendMsgThroughEdgesImmutable(
@CXXReference FRAG_T frag,
@CXXReference Vertex<VID_T> vertex,
@CXXReference MSG_T msg,
int channel_id,
@FFISkip UNUSED_T unused);
/**
* Send the a vertex's data to other fragment through incoming and outgoing edges.
*
* @param frag ArrowProjectedFragment.
* @param vertex querying vertex.
* @param msg msg to send.
* @param channel_id channel_id.
* @param <MSG_T> message type.
*/
@FFINameAlias("SendMsgThroughEdges")
<
@FFISkip OID_T,
@FFISkip VID_T,
@FFISkip VDATA_T,
@FFISkip EDATA_T,
FRAG_T extends ArrowProjectedFragment<OID_T, VID_T, VDATA_T, EDATA_T>,
MSG_T,
@FFISkip UNUSED_T>
void sendMsgThroughEdgesArrowProjected(
@CXXReference FRAG_T frag,
@CXXReference Vertex<VID_T> vertex,
@CXXReference MSG_T msg,
int channel_id,
@FFISkip UNUSED_T unused);
/**
* Send the a vertex's data to other fragment through incoming edges.
*
* @param frag ImmutableEdgecutFragment.
* @param vertex querying vertex.
* @param msg msg to send.
* @param channel_id channel_id.
* @param <MSG_T> message type.
*/
@FFINameAlias("SendMsgThroughIEdges")
<
@FFISkip OID_T,
@FFISkip VID_T,
@FFISkip VDATA_T,
@FFISkip EDATA_T,
FRAG_T extends ImmutableEdgecutFragment<OID_T, VID_T, VDATA_T, EDATA_T>,
MSG_T,
@FFISkip UNUSED_T>
void sendMsgThroughIEdgesImmutable(
@CXXReference FRAG_T frag,
@CXXReference Vertex<VID_T> vertex,
@CXXReference MSG_T msg,
int channel_id,
@FFISkip UNUSED_T unused);
/**
* Send the a vertex's data to other fragment through incoming edges.
*
* @param <MSG_T> message type.
* @param frag ArrowProjectedFragment.
* @param vertex querying vertex.
* @param msg msg to send.
* @param channel_id channel_id.
*/
@FFINameAlias("SendMsgThroughIEdges")
<
@FFISkip OID_T,
@FFISkip VID_T,
@FFISkip VDATA_T,
@FFISkip EDATA_T,
FRAG_T extends ArrowProjectedFragment<OID_T, VID_T, VDATA_T, EDATA_T>,
MSG_T,
@FFISkip UNUSED_T>
void sendMsgThroughIEdgesArrowProjected(
@CXXReference FRAG_T frag,
@CXXReference Vertex<VID_T> vertex,
@CXXReference MSG_T msg,
int channel_id,
@FFISkip UNUSED_T unused);
/**
* Parallel processing the messages received from last super step. The user just need to provide
* a lambda consumer.
*
* @param <MSG_T> message type.
* @param frag fragment.
* @param threadNum number of threads to use.
* @param executor thread pool executor.
* @param msgSupplier a producer function creating a msg instance.
* @param consumer lambda function.
*/
default <OID_T, VID_T, VDATA_T, EDATA_T, MSG_T> void parallelProcess(
IFragment<OID_T, VID_T, VDATA_T, EDATA_T> frag,
int threadNum,
ExecutorService executor,
Supplier<MSG_T> msgSupplier,
BiConsumer<Vertex<VID_T>, MSG_T> consumer) {
CountDownLatch countDownLatch = new CountDownLatch(threadNum);
MessageInBuffer.Factory bufferFactory = FFITypeFactoryhelper.newMessageInBuffer();
int chunkSize = 1024;
for (int tid = 0; tid < threadNum; ++tid) {
final int finalTid = tid;
executor.execute(
new Runnable() {
@Override
public void run() {
MessageInBuffer messageInBuffer = bufferFactory.create();
Vertex<VID_T> vertex =
FFITypeFactoryhelper.newVertex(frag.getVidClass());
MSG_T msg = msgSupplier.get();
boolean result;
while (true) {
result = getMessageInBuffer(messageInBuffer);
if (result) {
while (messageInBuffer.getMessage(frag, vertex, msg)) {
consumer.accept(vertex, msg);
}
} else {
break;
}
}
countDownLatch.countDown();
messageInBuffer.delete();
vertex.delete();
}
});
}
try {
countDownLatch.await();
} catch (Exception e) {
e.printStackTrace();
executor.shutdown();
}
}