public synchronized void init()

in eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/consumer/EventMeshConsumer.java [97:262]


    public synchronized void init() throws Exception {
        httpMessageHandler = new HTTPMessageHandler(this);
        Properties keyValue = new Properties();
        keyValue.put(IS_BROADCAST, "false");
        keyValue.put(CONSUMER_GROUP, consumerGroupConf.getConsumerGroup());
        keyValue.put(EVENT_MESH_IDC, eventMeshHTTPServer.getEventMeshHttpConfiguration().getEventMeshIDC());
        keyValue.put(INSTANCE_NAME, EventMeshUtil.buildMeshClientID(consumerGroupConf.getConsumerGroup(),
            eventMeshHTTPServer.getEventMeshHttpConfiguration().getEventMeshCluster()));
        persistentMqConsumer.init(keyValue);

        EventListener clusterEventListener = (event, context) -> {
            String protocolVersion =
                Objects.requireNonNull(event.getSpecVersion()).toString();

            Span span = TraceUtils.prepareServerSpan(
                EventMeshUtil.getCloudEventExtensionMap(protocolVersion, event),
                EventMeshTraceConstants.TRACE_DOWNSTREAM_EVENTMESH_SERVER_SPAN, false);
            try {
                Optional<TopicNameHelper> topicNameHelper =
                    Optional.ofNullable(EventMeshExtensionFactory.getExtension(TopicNameHelper.class,
                        eventMeshHTTPServer.getEventMeshHttpConfiguration().getEventMeshStoragePluginType()));
                String topic = event.getSubject();
                String bizSeqNo = Objects.requireNonNull(event.getExtension(ProtocolKey.ClientInstanceKey.BIZSEQNO.getKey())).toString();
                String uniqueId = Objects.requireNonNull(event.getExtension(ProtocolKey.ClientInstanceKey.UNIQUEID.getKey())).toString();

                event = CloudEventBuilder.from(event)
                    .withExtension(EventMeshConstants.REQ_MQ2EVENTMESH_TIMESTAMP, String.valueOf(System.currentTimeMillis()))
                    .withExtension(EventMeshConstants.REQ_RECEIVE_EVENTMESH_IP,
                        eventMeshHTTPServer.getEventMeshHttpConfiguration().getEventMeshServerIp())
                    .build();
                if (MESSAGE_LOGGER.isDebugEnabled()) {
                    MESSAGE_LOGGER.debug("message|mq2eventMesh|topic={}|event={}", topic, event);
                } else {
                    MESSAGE_LOGGER.info("message|mq2eventMesh|topic={}|bizSeqNo={}|uniqueId={}", topic, bizSeqNo, uniqueId);
                }

                if (topicNameHelper.isPresent() && topicNameHelper.get().isRetryTopic(topic)) {
                    topic = String.valueOf(event.getExtension(ProtocolKey.TOPIC));
                }
                ConsumerGroupTopicConf currentTopicConfig = MapUtils.getObject(consumerGroupConf.getConsumerGroupTopicConf(),
                    topic, null);
                EventMeshAsyncConsumeContext eventMeshAsyncConsumeContext = (EventMeshAsyncConsumeContext) context;

                if (currentTopicConfig == null) {
                    try {
                        sendMessageBack(event, uniqueId, bizSeqNo);
                        log.warn("no ConsumerGroupTopicConf found, sendMessageBack success, consumerGroup:{}, topic:{}, bizSeqNo={}, uniqueId={}",
                            consumerGroupConf.getConsumerGroup(), topic, bizSeqNo, uniqueId);
                    } catch (Exception ex) {
                        log.warn("sendMessageBack fail, consumerGroup:{}, topic:{}, bizSeqNo={}, uniqueId={}",
                            consumerGroupConf.getConsumerGroup(), topic, bizSeqNo, uniqueId, ex);
                    }
                    eventMeshAsyncConsumeContext.commit(EventMeshAction.CommitMessage);
                    return;
                }

                SubscriptionItem subscriptionItem =
                    consumerGroupConf.getConsumerGroupTopicConf().get(topic).getSubscriptionItem();
                HandleMsgContext handleMsgContext = new HandleMsgContext(
                    EventMeshUtil.buildPushMsgSeqNo(),
                    consumerGroupConf.getConsumerGroup(),
                    EventMeshConsumer.this,
                    topic, event, subscriptionItem, eventMeshAsyncConsumeContext.getAbstractContext(),
                    consumerGroupConf, eventMeshHTTPServer, bizSeqNo, uniqueId, currentTopicConfig);

                if (httpMessageHandler.handle(handleMsgContext)) {
                    eventMeshAsyncConsumeContext.commit(EventMeshAction.ManualAck);
                } else {
                    try {
                        sendMessageBack(event, uniqueId, bizSeqNo);
                    } catch (Exception e) {
                        // ignore
                        log.warn("sendMessageBack fail,topic:{}, bizSeqNo={}, uniqueId={}", topic, bizSeqNo, uniqueId, e);
                    }
                    eventMeshAsyncConsumeContext.commit(EventMeshAction.CommitMessage);
                }
            } finally {
                TraceUtils.finishSpan(span, event);
            }
        };
        persistentMqConsumer.registerEventListener(clusterEventListener);

        // broadcast consumer
        Properties broadcastKeyValue = new Properties();
        broadcastKeyValue.put(IS_BROADCAST, "true");
        broadcastKeyValue.put(CONSUMER_GROUP, consumerGroupConf.getConsumerGroup());
        broadcastKeyValue.put(EVENT_MESH_IDC, eventMeshHTTPServer.getEventMeshHttpConfiguration().getEventMeshIDC());
        broadcastKeyValue.put(INSTANCE_NAME, EventMeshUtil.buildMeshClientID(consumerGroupConf.getConsumerGroup(),
            eventMeshHTTPServer.getEventMeshHttpConfiguration().getEventMeshCluster()));
        broadcastMqConsumer.init(broadcastKeyValue);

        EventListener broadcastEventListener = (event, context) -> {

            String protocolVersion =
                Objects.requireNonNull(event.getSpecVersion()).toString();

            Span span = TraceUtils.prepareServerSpan(
                EventMeshUtil.getCloudEventExtensionMap(protocolVersion, event),
                EventMeshTraceConstants.TRACE_DOWNSTREAM_EVENTMESH_SERVER_SPAN, false);
            try {

                event = CloudEventBuilder.from(event)
                    .withExtension(EventMeshConstants.REQ_MQ2EVENTMESH_TIMESTAMP,
                        String.valueOf(System.currentTimeMillis()))
                    .withExtension(EventMeshConstants.REQ_RECEIVE_EVENTMESH_IP,
                        eventMeshHTTPServer.getEventMeshHttpConfiguration().getEventMeshServerIp())
                    .build();

                String topic = event.getSubject();
                String bizSeqNo = getEventExtension(event, ProtocolKey.ClientInstanceKey.BIZSEQNO.getKey());
                String uniqueId = getEventExtension(event, ProtocolKey.ClientInstanceKey.UNIQUEID.getKey());

                if (MESSAGE_LOGGER.isDebugEnabled()) {
                    MESSAGE_LOGGER.debug("message|mq2eventMesh|topic={}|msg={}", topic, event);
                } else {
                    MESSAGE_LOGGER.info("message|mq2eventMesh|topic={}|bizSeqNo={}|uniqueId={}", topic, bizSeqNo, uniqueId);
                }

                ConsumerGroupTopicConf currentTopicConfig = MapUtils.getObject(
                    consumerGroupConf.getConsumerGroupTopicConf(), topic, null);
                EventMeshAsyncConsumeContext eventMeshAsyncConsumeContext =
                    (EventMeshAsyncConsumeContext) context;

                if (currentTopicConfig == null) {
                    log.error("no topicConfig found, consumerGroup:{} topic:{}",
                        consumerGroupConf.getConsumerGroup(), topic);
                    try {
                        sendMessageBack(event, uniqueId, bizSeqNo);
                        eventMeshAsyncConsumeContext.commit(EventMeshAction.CommitMessage);
                        return;
                    } catch (Exception ex) {
                        // ignore
                    }
                }

                SubscriptionItem subscriptionItem =
                    consumerGroupConf.getConsumerGroupTopicConf().get(topic)
                        .getSubscriptionItem();
                HandleMsgContext handleMsgContext =
                    new HandleMsgContext(EventMeshUtil.buildPushMsgSeqNo(),
                        consumerGroupConf.getConsumerGroup(), EventMeshConsumer.this,
                        topic, event, subscriptionItem,
                        eventMeshAsyncConsumeContext.getAbstractContext(),
                        consumerGroupConf, eventMeshHTTPServer, bizSeqNo, uniqueId,
                        currentTopicConfig);

                if (httpMessageHandler.handle(handleMsgContext)) {
                    eventMeshAsyncConsumeContext.commit(EventMeshAction.ManualAck);
                } else {
                    try {
                        sendMessageBack(event, uniqueId, bizSeqNo);
                    } catch (Exception e) {
                        // ignore
                    }
                    eventMeshAsyncConsumeContext.commit(EventMeshAction.CommitMessage);
                }
            } finally {
                TraceUtils.finishSpan(span, event);
            }
        };
        broadcastMqConsumer.registerEventListener(broadcastEventListener);

        inited4Persistent.compareAndSet(false, true);
        inited4Broadcast.compareAndSet(false, true);
        log.info("EventMeshConsumer [{}] inited.............", consumerGroupConf.getConsumerGroup());
    }