in eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/consumer/EventMeshConsumer.java [97:262]
public synchronized void init() throws Exception {
httpMessageHandler = new HTTPMessageHandler(this);
Properties keyValue = new Properties();
keyValue.put(IS_BROADCAST, "false");
keyValue.put(CONSUMER_GROUP, consumerGroupConf.getConsumerGroup());
keyValue.put(EVENT_MESH_IDC, eventMeshHTTPServer.getEventMeshHttpConfiguration().getEventMeshIDC());
keyValue.put(INSTANCE_NAME, EventMeshUtil.buildMeshClientID(consumerGroupConf.getConsumerGroup(),
eventMeshHTTPServer.getEventMeshHttpConfiguration().getEventMeshCluster()));
persistentMqConsumer.init(keyValue);
EventListener clusterEventListener = (event, context) -> {
String protocolVersion =
Objects.requireNonNull(event.getSpecVersion()).toString();
Span span = TraceUtils.prepareServerSpan(
EventMeshUtil.getCloudEventExtensionMap(protocolVersion, event),
EventMeshTraceConstants.TRACE_DOWNSTREAM_EVENTMESH_SERVER_SPAN, false);
try {
Optional<TopicNameHelper> topicNameHelper =
Optional.ofNullable(EventMeshExtensionFactory.getExtension(TopicNameHelper.class,
eventMeshHTTPServer.getEventMeshHttpConfiguration().getEventMeshStoragePluginType()));
String topic = event.getSubject();
String bizSeqNo = Objects.requireNonNull(event.getExtension(ProtocolKey.ClientInstanceKey.BIZSEQNO.getKey())).toString();
String uniqueId = Objects.requireNonNull(event.getExtension(ProtocolKey.ClientInstanceKey.UNIQUEID.getKey())).toString();
event = CloudEventBuilder.from(event)
.withExtension(EventMeshConstants.REQ_MQ2EVENTMESH_TIMESTAMP, String.valueOf(System.currentTimeMillis()))
.withExtension(EventMeshConstants.REQ_RECEIVE_EVENTMESH_IP,
eventMeshHTTPServer.getEventMeshHttpConfiguration().getEventMeshServerIp())
.build();
if (MESSAGE_LOGGER.isDebugEnabled()) {
MESSAGE_LOGGER.debug("message|mq2eventMesh|topic={}|event={}", topic, event);
} else {
MESSAGE_LOGGER.info("message|mq2eventMesh|topic={}|bizSeqNo={}|uniqueId={}", topic, bizSeqNo, uniqueId);
}
if (topicNameHelper.isPresent() && topicNameHelper.get().isRetryTopic(topic)) {
topic = String.valueOf(event.getExtension(ProtocolKey.TOPIC));
}
ConsumerGroupTopicConf currentTopicConfig = MapUtils.getObject(consumerGroupConf.getConsumerGroupTopicConf(),
topic, null);
EventMeshAsyncConsumeContext eventMeshAsyncConsumeContext = (EventMeshAsyncConsumeContext) context;
if (currentTopicConfig == null) {
try {
sendMessageBack(event, uniqueId, bizSeqNo);
log.warn("no ConsumerGroupTopicConf found, sendMessageBack success, consumerGroup:{}, topic:{}, bizSeqNo={}, uniqueId={}",
consumerGroupConf.getConsumerGroup(), topic, bizSeqNo, uniqueId);
} catch (Exception ex) {
log.warn("sendMessageBack fail, consumerGroup:{}, topic:{}, bizSeqNo={}, uniqueId={}",
consumerGroupConf.getConsumerGroup(), topic, bizSeqNo, uniqueId, ex);
}
eventMeshAsyncConsumeContext.commit(EventMeshAction.CommitMessage);
return;
}
SubscriptionItem subscriptionItem =
consumerGroupConf.getConsumerGroupTopicConf().get(topic).getSubscriptionItem();
HandleMsgContext handleMsgContext = new HandleMsgContext(
EventMeshUtil.buildPushMsgSeqNo(),
consumerGroupConf.getConsumerGroup(),
EventMeshConsumer.this,
topic, event, subscriptionItem, eventMeshAsyncConsumeContext.getAbstractContext(),
consumerGroupConf, eventMeshHTTPServer, bizSeqNo, uniqueId, currentTopicConfig);
if (httpMessageHandler.handle(handleMsgContext)) {
eventMeshAsyncConsumeContext.commit(EventMeshAction.ManualAck);
} else {
try {
sendMessageBack(event, uniqueId, bizSeqNo);
} catch (Exception e) {
// ignore
log.warn("sendMessageBack fail,topic:{}, bizSeqNo={}, uniqueId={}", topic, bizSeqNo, uniqueId, e);
}
eventMeshAsyncConsumeContext.commit(EventMeshAction.CommitMessage);
}
} finally {
TraceUtils.finishSpan(span, event);
}
};
persistentMqConsumer.registerEventListener(clusterEventListener);
// broadcast consumer
Properties broadcastKeyValue = new Properties();
broadcastKeyValue.put(IS_BROADCAST, "true");
broadcastKeyValue.put(CONSUMER_GROUP, consumerGroupConf.getConsumerGroup());
broadcastKeyValue.put(EVENT_MESH_IDC, eventMeshHTTPServer.getEventMeshHttpConfiguration().getEventMeshIDC());
broadcastKeyValue.put(INSTANCE_NAME, EventMeshUtil.buildMeshClientID(consumerGroupConf.getConsumerGroup(),
eventMeshHTTPServer.getEventMeshHttpConfiguration().getEventMeshCluster()));
broadcastMqConsumer.init(broadcastKeyValue);
EventListener broadcastEventListener = (event, context) -> {
String protocolVersion =
Objects.requireNonNull(event.getSpecVersion()).toString();
Span span = TraceUtils.prepareServerSpan(
EventMeshUtil.getCloudEventExtensionMap(protocolVersion, event),
EventMeshTraceConstants.TRACE_DOWNSTREAM_EVENTMESH_SERVER_SPAN, false);
try {
event = CloudEventBuilder.from(event)
.withExtension(EventMeshConstants.REQ_MQ2EVENTMESH_TIMESTAMP,
String.valueOf(System.currentTimeMillis()))
.withExtension(EventMeshConstants.REQ_RECEIVE_EVENTMESH_IP,
eventMeshHTTPServer.getEventMeshHttpConfiguration().getEventMeshServerIp())
.build();
String topic = event.getSubject();
String bizSeqNo = getEventExtension(event, ProtocolKey.ClientInstanceKey.BIZSEQNO.getKey());
String uniqueId = getEventExtension(event, ProtocolKey.ClientInstanceKey.UNIQUEID.getKey());
if (MESSAGE_LOGGER.isDebugEnabled()) {
MESSAGE_LOGGER.debug("message|mq2eventMesh|topic={}|msg={}", topic, event);
} else {
MESSAGE_LOGGER.info("message|mq2eventMesh|topic={}|bizSeqNo={}|uniqueId={}", topic, bizSeqNo, uniqueId);
}
ConsumerGroupTopicConf currentTopicConfig = MapUtils.getObject(
consumerGroupConf.getConsumerGroupTopicConf(), topic, null);
EventMeshAsyncConsumeContext eventMeshAsyncConsumeContext =
(EventMeshAsyncConsumeContext) context;
if (currentTopicConfig == null) {
log.error("no topicConfig found, consumerGroup:{} topic:{}",
consumerGroupConf.getConsumerGroup(), topic);
try {
sendMessageBack(event, uniqueId, bizSeqNo);
eventMeshAsyncConsumeContext.commit(EventMeshAction.CommitMessage);
return;
} catch (Exception ex) {
// ignore
}
}
SubscriptionItem subscriptionItem =
consumerGroupConf.getConsumerGroupTopicConf().get(topic)
.getSubscriptionItem();
HandleMsgContext handleMsgContext =
new HandleMsgContext(EventMeshUtil.buildPushMsgSeqNo(),
consumerGroupConf.getConsumerGroup(), EventMeshConsumer.this,
topic, event, subscriptionItem,
eventMeshAsyncConsumeContext.getAbstractContext(),
consumerGroupConf, eventMeshHTTPServer, bizSeqNo, uniqueId,
currentTopicConfig);
if (httpMessageHandler.handle(handleMsgContext)) {
eventMeshAsyncConsumeContext.commit(EventMeshAction.ManualAck);
} else {
try {
sendMessageBack(event, uniqueId, bizSeqNo);
} catch (Exception e) {
// ignore
}
eventMeshAsyncConsumeContext.commit(EventMeshAction.CommitMessage);
}
} finally {
TraceUtils.finishSpan(span, event);
}
};
broadcastMqConsumer.registerEventListener(broadcastEventListener);
inited4Persistent.compareAndSet(false, true);
inited4Broadcast.compareAndSet(false, true);
log.info("EventMeshConsumer [{}] inited.............", consumerGroupConf.getConsumerGroup());
}