in client.go [171:233]
func NewClient(addrs []string, conf *Config) (Client, error) {
DebugLogger.Println("Initializing new client")
if conf == nil {
conf = NewConfig()
}
if err := conf.Validate(); err != nil {
return nil, err
}
if len(addrs) < 1 {
return nil, ConfigurationError("You must provide at least one broker address")
}
if strings.Contains(addrs[0], ".servicebus.windows.net") {
if conf.Version.IsAtLeast(V1_1_0_0) || !conf.Version.IsAtLeast(V0_11_0_0) {
Logger.Println("Connecting to Azure Event Hubs, forcing version to V1_0_0_0 for compatibility")
conf.Version = V1_0_0_0
}
}
client := &client{
conf: conf,
closer: make(chan none),
closed: make(chan none),
brokers: make(map[int32]*Broker),
metadata: make(map[string]map[int32]*PartitionMetadata),
metadataTopics: make(map[string]none),
cachedPartitionsResults: make(map[string][maxPartitionIndex][]int32),
coordinators: make(map[string]int32),
transactionCoordinators: make(map[string]int32),
}
if conf.Net.ResolveCanonicalBootstrapServers {
var err error
addrs, err = client.resolveCanonicalNames(addrs)
if err != nil {
return nil, err
}
}
client.randomizeSeedBrokers(addrs)
if conf.Metadata.Full {
// do an initial fetch of all cluster metadata by specifying an empty list of topics
err := client.RefreshMetadata()
if err == nil {
} else if errors.Is(err, ErrLeaderNotAvailable) || errors.Is(err, ErrReplicaNotAvailable) || errors.Is(err, ErrTopicAuthorizationFailed) || errors.Is(err, ErrClusterAuthorizationFailed) {
// indicates that maybe part of the cluster is down, but is not fatal to creating the client
Logger.Println(err)
} else {
close(client.closed) // we haven't started the background updater yet, so we have to do this manually
_ = client.Close()
return nil, err
}
}
go withRecover(client.backgroundMetadataUpdater)
DebugLogger.Println("Successfully initialized new client")
return client, nil
}