in runtime/core/protocol/grpc/consumer/message_request.go [137:210]
func NewWebhookRequest(mctx *MessageContext) (*WebhookRequest, error) {
r, err := NewRequest(mctx)
if err != nil {
return nil, err
}
rand.Seed(time.Now().UnixMilli())
hr := &WebhookRequest{
IDCWebhookURLs: mctx.TopicConfig.IDCURLs(),
AllURLs: mctx.TopicConfig.AllURLs(),
Request: r,
startIdx: rand.Intn(mctx.TopicConfig.Size()),
subscriptionMode: mctx.SubscriptionMode,
}
hr.Try = func() error {
hr.LastPushTime = time.Now()
httpClient := &http.Client{
Timeout: defaultWebhookTimeout,
}
httpHdr := http.Header{}
httpHdr.Set(grpc.REQUEST_CODE, grpc.HTTP_PUSH_CLIENT_ASYNC)
httpHdr.Set(grpc.LANGUAGE, "Go")
httpHdr.Set(grpc.Version, "1.0")
httpHdr.Set(grpc.EVENTMESHCLUSTER, config.GlobalConfig().Common.Cluster)
httpHdr.Set(grpc.EVENTMESHENV, config.GlobalConfig().Common.Env)
httpHdr.Set(grpc.EVENTMESHIP, util.GetIP())
httpHdr.Set(grpc.EVENTMESHIDC, config.GlobalConfig().Common.IDC)
httpHdr.Set(grpc.PROTOCOL_TYPE, hr.SimpleMessage.Header.ProtocolType)
httpHdr.Set(grpc.PROTOCOL_DESC, hr.SimpleMessage.Header.ProtocolDesc)
httpHdr.Set(grpc.PROTOCOL_VERSION, hr.SimpleMessage.Header.ProtocolVersion)
httpHdr.Set(grpc.CONTENT_TYPE, hr.SimpleMessage.Properties[grpc.CONTENT_TYPE])
formValues := url.Values{}
formValues.Set(grpc.CONTENT, hr.SimpleMessage.Content)
formValues.Set(grpc.BIZSEQNO, hr.SimpleMessage.SeqNum)
formValues.Set(grpc.UNIQUEID, hr.SimpleMessage.UniqueId)
formValues.Set(grpc.RANDOMNO, mctx.MsgRandomNo)
formValues.Set(grpc.TOPIC, hr.SimpleMessage.Topic)
content, _ := jsonPool.Get().(jsoniter.API).MarshalToString(hr.SimpleMessage.Properties)
formValues.Set(grpc.EXTFIELDS, content)
// TODO need to clone?
hr.SimpleMessage.Properties[consts.REQ_EVENTMESH2C_TIMESTAMP] = fmt.Sprintf("%v", hr.LastPushTime.UnixMilli())
urls := hr.getURLs()
for _, u := range urls {
log.Infof("message|eventMesh2client|url={}|topic={}|bizSeqNo={}|uniqueId={}",
u, hr.SimpleMessage.Topic, hr.SimpleMessage.SeqNum, hr.SimpleMessage.UniqueId)
resp, err := httpClient.PostForm(u, formValues)
if err != nil {
log.Warnf("err:%v in submit to url:%v|topic={}|bizSeqNo={}|uniqueId={}",
err, u, hr.SimpleMessage.Topic, hr.SimpleMessage.SeqNum, hr.SimpleMessage.UniqueId)
continue
}
if resp.StatusCode != http.StatusOK {
log.Warnf("status code:%v to submit to url:%v|topic={}|bizSeqNo={}|uniqueId={}",
resp.StatusCode, hr.SimpleMessage.Topic, hr.SimpleMessage.SeqNum, hr.SimpleMessage.UniqueId)
continue
}
buf, err := io.ReadAll(resp.Body)
if err != nil {
log.Warnf("err:%v in read response url:%v|topic={}|bizSeqNo={}|uniqueId={}",
err, hr.SimpleMessage.Topic, hr.SimpleMessage.SeqNum, hr.SimpleMessage.UniqueId)
continue
}
res := &Response{}
if err := jsonPool.Get().(jsoniter.API).Unmarshal(buf, res); err != nil {
log.Warnf("err:%v in unmarshal response:%v url:%v|topic={}|bizSeqNo={}|uniqueId={}",
err, string(buf), hr.SimpleMessage.Topic, hr.SimpleMessage.SeqNum, hr.SimpleMessage.UniqueId)
continue
}
}
return nil
}
return hr, nil
}