in broker.go [163:284]
func (b *Broker) Open(conf *Config) error {
if !atomic.CompareAndSwapInt32(&b.opened, 0, 1) {
return ErrAlreadyConnected
}
if conf == nil {
conf = NewConfig()
}
err := conf.Validate()
if err != nil {
return err
}
usingApiVersionsRequests := conf.Version.IsAtLeast(V2_4_0_0) && conf.ApiVersionsRequest
b.lock.Lock()
if b.metricRegistry == nil {
b.metricRegistry = newCleanupRegistry(conf.MetricRegistry)
}
go withRecover(func() {
defer func() {
b.lock.Unlock()
// Send an ApiVersionsRequest to identify the client (KIP-511).
// Ideally Sarama would use the response to control protocol versions,
// but for now just fire-and-forget just to send
if usingApiVersionsRequests {
_, err = b.ApiVersions(&ApiVersionsRequest{
Version: 3,
ClientSoftwareName: defaultClientSoftwareName,
ClientSoftwareVersion: version(),
})
if err != nil {
Logger.Printf("Error while sending ApiVersionsRequest to broker %s: %s\n", b.addr, err)
}
}
}()
dialer := conf.getDialer()
b.conn, b.connErr = dialer.Dial("tcp", b.addr)
if b.connErr != nil {
Logger.Printf("Failed to connect to broker %s: %s\n", b.addr, b.connErr)
b.conn = nil
atomic.StoreInt32(&b.opened, 0)
return
}
if conf.Net.TLS.Enable {
b.conn = tls.Client(b.conn, validServerNameTLS(b.addr, conf.Net.TLS.Config))
}
b.conn = newBufConn(b.conn)
b.conf = conf
// Create or reuse the global metrics shared between brokers
b.incomingByteRate = metrics.GetOrRegisterMeter("incoming-byte-rate", b.metricRegistry)
b.requestRate = metrics.GetOrRegisterMeter("request-rate", b.metricRegistry)
b.fetchRate = metrics.GetOrRegisterMeter("consumer-fetch-rate", b.metricRegistry)
b.requestSize = getOrRegisterHistogram("request-size", b.metricRegistry)
b.requestLatency = getOrRegisterHistogram("request-latency-in-ms", b.metricRegistry)
b.outgoingByteRate = metrics.GetOrRegisterMeter("outgoing-byte-rate", b.metricRegistry)
b.responseRate = metrics.GetOrRegisterMeter("response-rate", b.metricRegistry)
b.responseSize = getOrRegisterHistogram("response-size", b.metricRegistry)
b.requestsInFlight = metrics.GetOrRegisterCounter("requests-in-flight", b.metricRegistry)
b.protocolRequestsRate = map[int16]metrics.Meter{}
// Do not gather metrics for seeded broker (only used during bootstrap) because they share
// the same id (-1) and are already exposed through the global metrics above
if b.id >= 0 && !metrics.UseNilMetrics {
b.registerMetrics()
}
if conf.Net.SASL.Mechanism == SASLTypeOAuth && conf.Net.SASL.Version == SASLHandshakeV0 {
conf.Net.SASL.Version = SASLHandshakeV1
}
useSaslV0 := conf.Net.SASL.Version == SASLHandshakeV0 || conf.Net.SASL.Mechanism == SASLTypeGSSAPI
if conf.Net.SASL.Enable && useSaslV0 {
b.connErr = b.authenticateViaSASLv0()
if b.connErr != nil {
err = b.conn.Close()
if err == nil {
DebugLogger.Printf("Closed connection to broker %s\n", b.addr)
} else {
Logger.Printf("Error while closing connection to broker %s: %s\n", b.addr, err)
}
b.conn = nil
atomic.StoreInt32(&b.opened, 0)
return
}
}
b.done = make(chan bool)
b.responses = make(chan *responsePromise, b.conf.Net.MaxOpenRequests-1)
go withRecover(b.responseReceiver)
if conf.Net.SASL.Enable && !useSaslV0 {
b.connErr = b.authenticateViaSASLv1()
if b.connErr != nil {
close(b.responses)
<-b.done
err = b.conn.Close()
if err == nil {
DebugLogger.Printf("Closed connection to broker %s\n", b.addr)
} else {
Logger.Printf("Error while closing connection to broker %s: %s\n", b.addr, err)
}
b.conn = nil
atomic.StoreInt32(&b.opened, 0)
return
}
}
if b.id >= 0 {
DebugLogger.Printf("Connected to broker at %s (registered as #%d)\n", b.addr, b.id)
} else {
DebugLogger.Printf("Connected to broker at %s (unregistered)\n", b.addr)
}
})
return nil
}