in pulsar/consumer_partition.go [1936:2044]
func (pc *partitionConsumer) grabConn(assignedBrokerURL string) error {
lr, err := pc.lookupTopic(assignedBrokerURL)
if err != nil {
return err
}
subType := toProtoSubType(pc.options.subscriptionType)
initialPosition := toProtoInitialPosition(pc.options.subscriptionInitPos)
keySharedMeta := toProtoKeySharedMeta(pc.options.keySharedPolicy)
requestID := pc.client.rpcClient.NewRequestID()
var pbSchema *pb.Schema
if pc.options.schema != nil && pc.options.schema.GetSchemaInfo() != nil {
tmpSchemaType := pb.Schema_Type(int32(pc.options.schema.GetSchemaInfo().Type))
pbSchema = &pb.Schema{
Name: proto.String(pc.options.schema.GetSchemaInfo().Name),
Type: &tmpSchemaType,
SchemaData: []byte(pc.options.schema.GetSchemaInfo().Schema),
Properties: internal.ConvertFromStringMap(pc.options.schema.GetSchemaInfo().Properties),
}
pc.log.Debugf("The partition consumer schema name is: %s", pbSchema.Name)
} else {
pc.log.Debug("The partition consumer schema is nil")
}
cmdSubscribe := &pb.CommandSubscribe{
Topic: proto.String(pc.topic),
Subscription: proto.String(pc.options.subscription),
SubType: subType.Enum(),
ConsumerId: proto.Uint64(pc.consumerID),
RequestId: proto.Uint64(requestID),
ConsumerName: proto.String(pc.name),
PriorityLevel: nil,
Durable: proto.Bool(pc.options.subscriptionMode == Durable),
Metadata: internal.ConvertFromStringMap(pc.options.metadata),
SubscriptionProperties: internal.ConvertFromStringMap(pc.options.subProperties),
ReadCompacted: proto.Bool(pc.options.readCompacted),
Schema: pbSchema,
InitialPosition: initialPosition.Enum(),
ReplicateSubscriptionState: proto.Bool(pc.options.replicateSubscriptionState),
KeySharedMeta: keySharedMeta,
}
pc.startMessageID.set(pc.clearReceiverQueue())
if pc.options.subscriptionMode != Durable {
// For regular subscriptions the broker will determine the restarting point
cmdSubscribe.StartMessageId = convertToMessageIDData(pc.startMessageID.get())
}
if len(pc.options.metadata) > 0 {
cmdSubscribe.Metadata = toKeyValues(pc.options.metadata)
}
if len(pc.options.subProperties) > 0 {
cmdSubscribe.SubscriptionProperties = toKeyValues(pc.options.subProperties)
}
// force topic creation is enabled by default so
// we only need to set the flag when disabling it
if pc.options.disableForceTopicCreation {
cmdSubscribe.ForceTopicCreation = proto.Bool(false)
}
res, err := pc.client.rpcClient.RequestWithCnxKeySuffix(lr.LogicalAddr, lr.PhysicalAddr, pc.cnxKeySuffix, requestID,
pb.BaseCommand_SUBSCRIBE, cmdSubscribe)
if err != nil {
pc.log.WithError(err).Error("Failed to create consumer")
if err == internal.ErrRequestTimeOut {
requestID := pc.client.rpcClient.NewRequestID()
cmdClose := &pb.CommandCloseConsumer{
ConsumerId: proto.Uint64(pc.consumerID),
RequestId: proto.Uint64(requestID),
}
_, _ = pc.client.rpcClient.RequestWithCnxKeySuffix(lr.LogicalAddr, lr.PhysicalAddr, pc.cnxKeySuffix, requestID,
pb.BaseCommand_CLOSE_CONSUMER, cmdClose)
}
return err
}
if res.Response.ConsumerStatsResponse != nil {
pc.name = res.Response.ConsumerStatsResponse.GetConsumerName()
}
pc._setConn(res.Cnx)
pc.log.Info("Connected consumer")
err = pc._getConn().AddConsumeHandler(pc.consumerID, pc)
if err != nil {
pc.log.WithError(err).Error("Failed to add consumer handler")
return err
}
msgType := res.Response.GetType()
switch msgType {
case pb.BaseCommand_SUCCESS:
// notify the dispatcher we have connection
go func() {
pc.connectedCh <- struct{}{}
}()
return nil
case pb.BaseCommand_ERROR:
errMsg := res.Response.GetError()
return fmt.Errorf("%s: %s", errMsg.GetError().String(), errMsg.GetMessage())
default:
return newUnexpectedErrMsg(msgType, requestID)
}
}