func()

in pulsar/consumer_partition.go [1671:1781]


func (pc *partitionConsumer) grabConn() error {
	lr, err := pc.client.lookupService.Lookup(pc.topic)
	if err != nil {
		pc.log.WithError(err).Warn("Failed to lookup topic")
		return err
	}
	pc.log.Debugf("Lookup result: %+v", lr)

	subType := toProtoSubType(pc.options.subscriptionType)
	initialPosition := toProtoInitialPosition(pc.options.subscriptionInitPos)
	keySharedMeta := toProtoKeySharedMeta(pc.options.keySharedPolicy)
	requestID := pc.client.rpcClient.NewRequestID()

	var pbSchema *pb.Schema

	if pc.options.schema != nil && pc.options.schema.GetSchemaInfo() != nil {
		tmpSchemaType := pb.Schema_Type(int32(pc.options.schema.GetSchemaInfo().Type))
		pbSchema = &pb.Schema{
			Name:       proto.String(pc.options.schema.GetSchemaInfo().Name),
			Type:       &tmpSchemaType,
			SchemaData: []byte(pc.options.schema.GetSchemaInfo().Schema),
			Properties: internal.ConvertFromStringMap(pc.options.schema.GetSchemaInfo().Properties),
		}
		pc.log.Debugf("The partition consumer schema name is: %s", pbSchema.Name)
	} else {
		pc.log.Debug("The partition consumer schema is nil")
	}

	cmdSubscribe := &pb.CommandSubscribe{
		Topic:                      proto.String(pc.topic),
		Subscription:               proto.String(pc.options.subscription),
		SubType:                    subType.Enum(),
		ConsumerId:                 proto.Uint64(pc.consumerID),
		RequestId:                  proto.Uint64(requestID),
		ConsumerName:               proto.String(pc.name),
		PriorityLevel:              nil,
		Durable:                    proto.Bool(pc.options.subscriptionMode == Durable),
		Metadata:                   internal.ConvertFromStringMap(pc.options.metadata),
		SubscriptionProperties:     internal.ConvertFromStringMap(pc.options.subProperties),
		ReadCompacted:              proto.Bool(pc.options.readCompacted),
		Schema:                     pbSchema,
		InitialPosition:            initialPosition.Enum(),
		ReplicateSubscriptionState: proto.Bool(pc.options.replicateSubscriptionState),
		KeySharedMeta:              keySharedMeta,
	}

	pc.startMessageID.set(pc.clearReceiverQueue())
	if pc.options.subscriptionMode != Durable {
		// For regular subscriptions the broker will determine the restarting point
		cmdSubscribe.StartMessageId = convertToMessageIDData(pc.startMessageID.get())
	}

	if len(pc.options.metadata) > 0 {
		cmdSubscribe.Metadata = toKeyValues(pc.options.metadata)
	}

	if len(pc.options.subProperties) > 0 {
		cmdSubscribe.SubscriptionProperties = toKeyValues(pc.options.subProperties)
	}

	// force topic creation is enabled by default so
	// we only need to set the flag when disabling it
	if pc.options.disableForceTopicCreation {
		cmdSubscribe.ForceTopicCreation = proto.Bool(false)
	}

	res, err := pc.client.rpcClient.Request(lr.LogicalAddr, lr.PhysicalAddr, requestID,
		pb.BaseCommand_SUBSCRIBE, cmdSubscribe)

	if err != nil {
		pc.log.WithError(err).Error("Failed to create consumer")
		if err == internal.ErrRequestTimeOut {
			requestID := pc.client.rpcClient.NewRequestID()
			cmdClose := &pb.CommandCloseConsumer{
				ConsumerId: proto.Uint64(pc.consumerID),
				RequestId:  proto.Uint64(requestID),
			}
			_, _ = pc.client.rpcClient.Request(lr.LogicalAddr, lr.PhysicalAddr, requestID,
				pb.BaseCommand_CLOSE_CONSUMER, cmdClose)
		}
		return err
	}

	if res.Response.ConsumerStatsResponse != nil {
		pc.name = res.Response.ConsumerStatsResponse.GetConsumerName()
	}

	pc._setConn(res.Cnx)
	pc.log.Info("Connected consumer")
	err = pc._getConn().AddConsumeHandler(pc.consumerID, pc)
	if err != nil {
		pc.log.WithError(err).Error("Failed to add consumer handler")
		return err
	}

	msgType := res.Response.GetType()

	switch msgType {
	case pb.BaseCommand_SUCCESS:
		// notify the dispatcher we have connection
		go func() {
			pc.connectedCh <- struct{}{}
		}()
		return nil
	case pb.BaseCommand_ERROR:
		errMsg := res.Response.GetError()
		return fmt.Errorf("%s: %s", errMsg.GetError().String(), errMsg.GetMessage())
	default:
		return newUnexpectedErrMsg(msgType, requestID)
	}
}