func NewWebhookRequest()

in runtime/core/protocol/grpc/consumer/message_request.go [137:210]


func NewWebhookRequest(mctx *MessageContext) (*WebhookRequest, error) {
	r, err := NewRequest(mctx)
	if err != nil {
		return nil, err
	}
	rand.Seed(time.Now().UnixMilli())
	hr := &WebhookRequest{
		IDCWebhookURLs:   mctx.TopicConfig.IDCURLs(),
		AllURLs:          mctx.TopicConfig.AllURLs(),
		Request:          r,
		startIdx:         rand.Intn(mctx.TopicConfig.Size()),
		subscriptionMode: mctx.SubscriptionMode,
	}
	hr.Try = func() error {
		hr.LastPushTime = time.Now()
		httpClient := &http.Client{
			Timeout: defaultWebhookTimeout,
		}
		httpHdr := http.Header{}
		httpHdr.Set(grpc.REQUEST_CODE, grpc.HTTP_PUSH_CLIENT_ASYNC)
		httpHdr.Set(grpc.LANGUAGE, "Go")
		httpHdr.Set(grpc.Version, "1.0")
		httpHdr.Set(grpc.EVENTMESHCLUSTER, config.GlobalConfig().Common.Cluster)
		httpHdr.Set(grpc.EVENTMESHENV, config.GlobalConfig().Common.Env)
		httpHdr.Set(grpc.EVENTMESHIP, util.GetIP())
		httpHdr.Set(grpc.EVENTMESHIDC, config.GlobalConfig().Common.IDC)
		httpHdr.Set(grpc.PROTOCOL_TYPE, hr.SimpleMessage.Header.ProtocolType)
		httpHdr.Set(grpc.PROTOCOL_DESC, hr.SimpleMessage.Header.ProtocolDesc)
		httpHdr.Set(grpc.PROTOCOL_VERSION, hr.SimpleMessage.Header.ProtocolVersion)
		httpHdr.Set(grpc.CONTENT_TYPE, hr.SimpleMessage.Properties[grpc.CONTENT_TYPE])

		formValues := url.Values{}
		formValues.Set(grpc.CONTENT, hr.SimpleMessage.Content)
		formValues.Set(grpc.BIZSEQNO, hr.SimpleMessage.SeqNum)
		formValues.Set(grpc.UNIQUEID, hr.SimpleMessage.UniqueId)
		formValues.Set(grpc.RANDOMNO, mctx.MsgRandomNo)
		formValues.Set(grpc.TOPIC, hr.SimpleMessage.Topic)
		content, _ := jsonPool.Get().(jsoniter.API).MarshalToString(hr.SimpleMessage.Properties)
		formValues.Set(grpc.EXTFIELDS, content)

		// TODO need to clone?
		hr.SimpleMessage.Properties[consts.REQ_EVENTMESH2C_TIMESTAMP] = fmt.Sprintf("%v", hr.LastPushTime.UnixMilli())
		urls := hr.getURLs()
		for _, u := range urls {
			log.Infof("message|eventMesh2client|url={}|topic={}|bizSeqNo={}|uniqueId={}",
				u, hr.SimpleMessage.Topic, hr.SimpleMessage.SeqNum, hr.SimpleMessage.UniqueId)
			resp, err := httpClient.PostForm(u, formValues)
			if err != nil {
				log.Warnf("err:%v in submit to url:%v|topic={}|bizSeqNo={}|uniqueId={}",
					err, u, hr.SimpleMessage.Topic, hr.SimpleMessage.SeqNum, hr.SimpleMessage.UniqueId)
				continue
			}
			if resp.StatusCode != http.StatusOK {
				log.Warnf("status code:%v to submit to url:%v|topic={}|bizSeqNo={}|uniqueId={}",
					resp.StatusCode, hr.SimpleMessage.Topic, hr.SimpleMessage.SeqNum, hr.SimpleMessage.UniqueId)
				continue
			}
			buf, err := io.ReadAll(resp.Body)
			if err != nil {
				log.Warnf("err:%v in read response url:%v|topic={}|bizSeqNo={}|uniqueId={}",
					err, hr.SimpleMessage.Topic, hr.SimpleMessage.SeqNum, hr.SimpleMessage.UniqueId)
				continue
			}
			res := &Response{}
			if err := jsonPool.Get().(jsoniter.API).Unmarshal(buf, res); err != nil {
				log.Warnf("err:%v in unmarshal response:%v url:%v|topic={}|bizSeqNo={}|uniqueId={}",
					err, string(buf), hr.SimpleMessage.Topic, hr.SimpleMessage.SeqNum, hr.SimpleMessage.UniqueId)
				continue
			}
		}
		return nil
	}
	return hr, nil
}