in pkg/client/mq/mq.go [93:155]
func (c Client) Call(req *client.Request) (res any, err error) {
body, err := io.ReadAll(req.IngressRequest.Body)
if err != nil {
return nil, err
}
paths := strings.Split(req.API.Path, "/")
if len(paths) < 3 {
return nil, perrors.New("failed to send message, broker or Topic not found")
}
switch MQActionStrToInt[paths[0]] {
case MQActionPublish:
var pReq MQProduceRequest
err = json.Unmarshal(body, &pReq)
if err != nil {
return nil, err
}
err = c.producerFacade.Send(pReq.Msg, WithTopic(pReq.Topic))
if err != nil {
return nil, err
}
case MQActionSubscribe:
var cReq MQSubscribeRequest
err = json.Unmarshal(body, &cReq)
if err != nil {
return nil, err
}
if _, ok := consumerFacadeMap.Load(cReq.ConsumerGroup); !ok {
facade, err := NewKafkaConsumerFacade(c.kafkaConsumerConfig, cReq.ConsumerGroup)
if err != nil {
return nil, err
}
consumerFacadeMap.Store(cReq.ConsumerGroup, facade)
if f, ok := consumerFacadeMap.Load(cReq.ConsumerGroup); ok {
cf := f.(ConsumerFacade)
ctx, cancel := context.WithTimeout(c.ctx, req.Timeout)
defer cancel()
err = cf.Subscribe(ctx, WithTopics(cReq.TopicList), WithConsumeUrl(cReq.ConsumeUrl), WithCheckUrl(cReq.CheckUrl), WithConsumerGroup(cReq.ConsumerGroup))
if err != nil {
facade.Stop()
consumerFacadeMap.Delete(cReq.ConsumerGroup)
return nil, err
}
}
}
case MQActionUnSubscribe:
var cReq MQUnSubscribeRequest
err = json.Unmarshal(body, &cReq)
if err != nil {
return nil, err
}
if facade, ok := consumerFacadeMap.Load(cReq.ConsumerGroup); ok {
facade.(ConsumerFacade).Stop()
consumerFacadeMap.Delete(cReq.ConsumerGroup)
return nil, err
}
default:
return nil, perrors.New("failed to get mq action")
}
return nil, nil
}