func()

in runtime/core/protocol/grpc/consumer/consumer_processor.go [104:157]


func (p *processor) UnSubscribe(consumerMgr ConsumerManager, msg *pb.Subscription) (*pb.Response, error) {
	hdr := msg.Header
	if err := validator.ValidateHeader(hdr); err != nil {
		log.Warnf("invalid header:%v", err)
		return buildPBResponse(grpc.EVENTMESH_PROTOCOL_HEADER_ERR), err
	}
	if err := validator.ValidateSubscription(consts.WEBHOOK, msg); err != nil {
		log.Warnf("invalid body:%v", err)
		return buildPBResponse(grpc.EVENTMESH_PROTOCOL_BODY_ERR), err
	}
	consumerGroup := msg.ConsumerGroup
	url := msg.Url
	items := msg.SubscriptionItems
	var removeClients []*GroupClient
	for _, item := range items {
		removeClients = append(removeClients, &GroupClient{
			ENV:              hdr.Env,
			IDC:              hdr.Idc,
			SYS:              hdr.Sys,
			IP:               hdr.Ip,
			PID:              hdr.Pid,
			ConsumerGroup:    consumerGroup,
			Topic:            item.Topic,
			SubscriptionMode: item.Mode,
			GRPCType:         consts.WEBHOOK,
			URL:              url,
			LastUPTime:       time.Now(),
		})
	}
	for _, cli := range removeClients {
		if err := consumerMgr.DeRegisterClient(cli); err != nil {
			return buildPBResponse(grpc.EVENTMESH_Subscribe_Register_ERR), err
		}
	}
	meshConsumer, err := consumerMgr.GetConsumer(consumerGroup)
	if err != nil {
		return buildPBResponse(grpc.EVENTMESH_Consumer_NotFound_ERR), err
	}
	requireRestart := false
	for _, cli := range removeClients {
		if meshConsumer.DeRegisterClient(cli) {
			requireRestart = true
		}
	}
	if requireRestart {
		log.Infof("ConsumerGroup %v topic info changed, restart EventMesh Consumer", consumerGroup)
		if err := consumerMgr.RestartConsumer(consumerGroup); err != nil {
			return buildPBResponse(grpc.EVENTMESH_Consumer_NotFound_ERR), err
		}
	} else {
		log.Warnf("EventMesh consumer [%v] didn't restart.", consumerGroup)
	}
	return buildPBResponse(grpc.SUCCESS), nil
}