func()

in pkg/client/mq/mq.go [93:155]


func (c Client) Call(req *client.Request) (res any, err error) {
	body, err := io.ReadAll(req.IngressRequest.Body)
	if err != nil {
		return nil, err
	}

	paths := strings.Split(req.API.Path, "/")
	if len(paths) < 3 {
		return nil, perrors.New("failed to send message, broker or Topic not found")
	}

	switch MQActionStrToInt[paths[0]] {
	case MQActionPublish:
		var pReq MQProduceRequest
		err = json.Unmarshal(body, &pReq)
		if err != nil {
			return nil, err
		}
		err = c.producerFacade.Send(pReq.Msg, WithTopic(pReq.Topic))
		if err != nil {
			return nil, err
		}
	case MQActionSubscribe:
		var cReq MQSubscribeRequest
		err = json.Unmarshal(body, &cReq)
		if err != nil {
			return nil, err
		}
		if _, ok := consumerFacadeMap.Load(cReq.ConsumerGroup); !ok {
			facade, err := NewKafkaConsumerFacade(c.kafkaConsumerConfig, cReq.ConsumerGroup)
			if err != nil {
				return nil, err
			}
			consumerFacadeMap.Store(cReq.ConsumerGroup, facade)
			if f, ok := consumerFacadeMap.Load(cReq.ConsumerGroup); ok {
				cf := f.(ConsumerFacade)
				ctx, cancel := context.WithTimeout(c.ctx, req.Timeout)
				defer cancel()
				err = cf.Subscribe(ctx, WithTopics(cReq.TopicList), WithConsumeUrl(cReq.ConsumeUrl), WithCheckUrl(cReq.CheckUrl), WithConsumerGroup(cReq.ConsumerGroup))
				if err != nil {
					facade.Stop()
					consumerFacadeMap.Delete(cReq.ConsumerGroup)
					return nil, err
				}
			}
		}
	case MQActionUnSubscribe:
		var cReq MQUnSubscribeRequest
		err = json.Unmarshal(body, &cReq)
		if err != nil {
			return nil, err
		}
		if facade, ok := consumerFacadeMap.Load(cReq.ConsumerGroup); ok {
			facade.(ConsumerFacade).Stop()
			consumerFacadeMap.Delete(cReq.ConsumerGroup)
			return nil, err
		}
	default:
		return nil, perrors.New("failed to get mq action")
	}

	return nil, nil
}