eventmesh-sdks/eventmesh-sdk-go/grpc/consumer.go (245 lines of code) (raw):

// Licensed to the Apache Software Foundation (ASF) under one or more // contributor license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright ownership. // The ASF licenses this file to You under the Apache License, Version 2.0 // (the "License"); you may not use this file except in compliance with // the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package grpc import ( "context" "fmt" "github.com/apache/eventmesh/eventmesh-sdk-go/common/id" "github.com/apache/eventmesh/eventmesh-sdk-go/common/seq" "github.com/apache/eventmesh/eventmesh-sdk-go/grpc/conf" "github.com/apache/eventmesh/eventmesh-sdk-go/grpc/proto" "github.com/apache/eventmesh/eventmesh-sdk-go/log" jsoniter "github.com/json-iterator/go" "google.golang.org/grpc" "io" "reflect" "sync" "time" ) var ( // ErrSubscribeResponse subscribe response code not ok ErrSubscribeResponse = fmt.Errorf("subscribe response code err") // ErrUnSupportResponse only support reflect.String, reflect.Struct, reflect.Ptr, reflect.Map ErrUnSupportResponse = fmt.Errorf("un support response msg type") // defaultTTL default msg ttl defaultTTL = time.Second * 4 ) // eventMeshConsumer consumer to implements the ConsumerService type eventMeshConsumer struct { // client subscribe api client client proto.ConsumerServiceClient // topics subscribe topics // map[string]*proto.Subscription_SubscriptionItem topics *sync.Map // cfg configuration cfg *conf.GRPCConfig // dispatcher for topic dispatcher *messageDispatcher // heartbeat used to keepalive with eventmesh heartbeat *eventMeshHeartbeat // closeCtx close context closeCtx context.Context // streamSubscribeChan chan to receive the subscribe request with stream type streamSubscribeChan chan *proto.Subscription // idg generate uniq id idg id.Interface // seqg generate sequenced id seqg seq.Interface } // newConsumer create new consumer func newConsumer(ctx context.Context, cfg *conf.GRPCConfig, grpcConn *grpc.ClientConn, idg id.Interface, seqg seq.Interface) (*eventMeshConsumer, error) { cli := proto.NewConsumerServiceClient(grpcConn) heartbeat, err := newHeartbeat(ctx, cfg, grpcConn) if err != nil { log.Warnf("failed to create producer, err:%v", err) return nil, err } return &eventMeshConsumer{ client: cli, closeCtx: ctx, topics: new(sync.Map), cfg: cfg, heartbeat: heartbeat, dispatcher: newMessageDispatcher(cfg.ConsumerConfig.PoolSize, cfg.ConsumerConfig.Timeout), streamSubscribeChan: make(chan *proto.Subscription, 1024), idg: idg, seqg: seqg, }, nil } // startConsumerStream run stream goroutine to receive the msg send by stream not webhook func (d *eventMeshConsumer) startConsumerStream() error { stream, err := d.client.SubscribeStream(d.closeCtx) if err != nil { log.Warnf("failed to get subscribe stream, err:%v", err) return err } go func() { ss := stream for { select { case <-d.closeCtx.Done(): log.Infof("close consumer subscribe goroutine") case sub, ok := <-d.streamSubscribeChan: if ok { if err := ss.Send(sub); err != nil { log.Warnf("send subscribe stream msg err:%v", err) } } } } }() go func() { ss := stream log.Infof("start receive msg stream") for { msg, err := ss.Recv() if err == io.EOF { log.Infof("receive msg got io.EOF exit stream") break } if err != nil { log.Warnf("receive msg got err:%v, need to return", err) return } reply, err := d.dispatcher.onMessage(msg) if err != nil { log.Warnf("dispatch msg got err:%v, msgID:%s", err, msg.UniqueId) continue } if reply == nil { continue } // for async message, do not need to reply it if !d.needToReply(msg.Topic) { continue } if err := d.replyMsg(msg, reply); err != nil { log.Warnf("reply msg err:%v, msgID:%s", err, msg.UniqueId) continue } } log.Infof("close receive stream") }() return nil } func (d *eventMeshConsumer) replyMsg(msg *proto.SimpleMessage, reply interface{}) error { replyContent := "" typ := reflect.TypeOf(reply) switch typ.Kind() { case reflect.String: replyContent = reply.(string) case reflect.Ptr, reflect.Struct, reflect.Map: jv, err := jsoniter.MarshalToString(reply) if err != nil { log.Warnf("failed to unmarshal the response for kind:%v, err:%v, msgID:%s", typ.Kind(), err, msg.UniqueId) return err } replyContent = jv default: log.Warnf("un support response msg type:%v", typ.Kind()) return ErrUnSupportResponse } ttl := GetTTLWithDefault(msg, defaultTTL) var err error = nil defer func() { if rerr := recover(); rerr != nil { err = rerr.(error) } }() d.streamSubscribeChan <- &proto.Subscription{ Header: msg.Header, ConsumerGroup: d.cfg.ConsumerGroup, Reply: &proto.Subscription_Reply{ ProducerGroup: d.cfg.ConsumerGroup, Topic: msg.Topic, Content: replyContent, Ttl: fmt.Sprintf("%v", ttl.Seconds()), UniqueId: d.idg.Next(), SeqNum: d.seqg.Next(), Tag: msg.Tag, Properties: msg.Properties, }, } return err } // Subscribe topic for webhook func (d *eventMeshConsumer) Subscribe(item conf.SubscribeItem, callbackURL string) error { log.Infof("subscribe with webhook topic:%v, url:%s", item, callbackURL) if callbackURL == "" { return fmt.Errorf("webhook subscribe err, url is empty") } subItem := &proto.Subscription_SubscriptionItem{ Topic: item.Topic, Mode: proto.Subscription_SubscriptionItem_SubscriptionMode(item.SubscribeMode), Type: proto.Subscription_SubscriptionItem_SubscriptionType(item.SubscribeType), } subMsg := &proto.Subscription{ Header: CreateHeader(d.cfg), ConsumerGroup: d.cfg.ConsumerGroup, SubscriptionItems: []*proto.Subscription_SubscriptionItem{subItem}, Url: callbackURL, } resp, err := d.client.Subscribe(context.TODO(), subMsg) if err != nil { log.Warnf("failed to subscribe topic:%v, err :%v", subItem, err) return err } if resp.RespCode != Success { log.Warnf("failed to subscribe resp:%v", resp.String()) return ErrSubscribeResponse } d.topics.Store(item.Topic, subItem) d.heartbeat.addHeartbeat(subItem) log.Infof("success subscribe with topic:%s, resp:%s", item.Topic, resp.String()) return nil } // UnSubscribe unsubscribe topic with all eventmesh server func (d *eventMeshConsumer) UnSubscribe() error { log.Infof("unsubscribe topics") resp, err := d.client.Unsubscribe(context.TODO(), &proto.Subscription{ Header: CreateHeader(d.cfg), ConsumerGroup: d.cfg.ConsumerGroup, SubscriptionItems: func() []*proto.Subscription_SubscriptionItem { var sitems []*proto.Subscription_SubscriptionItem d.topics.Range(func(key, value interface{}) bool { sitems = append(sitems, value.(*proto.Subscription_SubscriptionItem)) return true }) return sitems }(), }) if err != nil { log.Warnf("failed to subscribe topic:%v, err :%v", d.topics, err) return err } log.Infof("success unsubscribe with resp:%s", resp.String()) return nil } // SubscribeWithStream subscribe stream, dispatch the message for all topic func (d *eventMeshConsumer) SubscribeWithStream(item conf.SubscribeItem, handler OnMessage) error { log.Infof("subscribe stream topic:%v", item) subItem := &proto.Subscription_SubscriptionItem{ Topic: item.Topic, Mode: proto.Subscription_SubscriptionItem_SubscriptionMode(item.SubscribeMode), Type: proto.Subscription_SubscriptionItem_SubscriptionType(item.SubscribeType), } if err := d.addSubscribeHandler(item, handler); err != nil { return err } d.streamSubscribeChan <- &proto.Subscription{ Header: CreateHeader(d.cfg), ConsumerGroup: d.cfg.ConsumerGroup, SubscriptionItems: []*proto.Subscription_SubscriptionItem{subItem}, } log.Infof("success subscribe stream with topic:%s", item.Topic) return nil } func (d *eventMeshConsumer) addSubscribeHandler(item conf.SubscribeItem, handler OnMessage) error { subItem := &proto.Subscription_SubscriptionItem{ Topic: item.Topic, Mode: proto.Subscription_SubscriptionItem_SubscriptionMode(item.SubscribeMode), Type: proto.Subscription_SubscriptionItem_SubscriptionType(item.SubscribeType), } if err := d.dispatcher.addHandler(item.Topic, handler); err != nil { log.Warnf("failed to add handler for topic:%s", item.Topic) return err } d.topics.Store(item.Topic, subItem) d.heartbeat.addHeartbeat(subItem) return nil } func (d *eventMeshConsumer) close() error { if d.heartbeat != nil { if err := d.heartbeat.close(); err != nil { log.Warnf("failed to close heartbeat:%v", err) } d.heartbeat = nil } return nil } // needToReply check the message need to reply, only works on RequestReply func (d *eventMeshConsumer) needToReply(topic string) bool { val, ok := d.topics.Load(topic) if !ok { return false } subType := val.(*proto.Subscription_SubscriptionItem) return subType.Type == proto.Subscription_SubscriptionItem_SYNC }