func()

in broker.go [163:284]


func (b *Broker) Open(conf *Config) error {
	if !atomic.CompareAndSwapInt32(&b.opened, 0, 1) {
		return ErrAlreadyConnected
	}

	if conf == nil {
		conf = NewConfig()
	}

	err := conf.Validate()
	if err != nil {
		return err
	}

	usingApiVersionsRequests := conf.Version.IsAtLeast(V2_4_0_0) && conf.ApiVersionsRequest

	b.lock.Lock()

	if b.metricRegistry == nil {
		b.metricRegistry = newCleanupRegistry(conf.MetricRegistry)
	}

	go withRecover(func() {
		defer func() {
			b.lock.Unlock()

			// Send an ApiVersionsRequest to identify the client (KIP-511).
			// Ideally Sarama would use the response to control protocol versions,
			// but for now just fire-and-forget just to send
			if usingApiVersionsRequests {
				_, err = b.ApiVersions(&ApiVersionsRequest{
					Version:               3,
					ClientSoftwareName:    defaultClientSoftwareName,
					ClientSoftwareVersion: version(),
				})
				if err != nil {
					Logger.Printf("Error while sending ApiVersionsRequest to broker %s: %s\n", b.addr, err)
				}
			}
		}()
		dialer := conf.getDialer()
		b.conn, b.connErr = dialer.Dial("tcp", b.addr)
		if b.connErr != nil {
			Logger.Printf("Failed to connect to broker %s: %s\n", b.addr, b.connErr)
			b.conn = nil
			atomic.StoreInt32(&b.opened, 0)
			return
		}
		if conf.Net.TLS.Enable {
			b.conn = tls.Client(b.conn, validServerNameTLS(b.addr, conf.Net.TLS.Config))
		}

		b.conn = newBufConn(b.conn)
		b.conf = conf

		// Create or reuse the global metrics shared between brokers
		b.incomingByteRate = metrics.GetOrRegisterMeter("incoming-byte-rate", b.metricRegistry)
		b.requestRate = metrics.GetOrRegisterMeter("request-rate", b.metricRegistry)
		b.fetchRate = metrics.GetOrRegisterMeter("consumer-fetch-rate", b.metricRegistry)
		b.requestSize = getOrRegisterHistogram("request-size", b.metricRegistry)
		b.requestLatency = getOrRegisterHistogram("request-latency-in-ms", b.metricRegistry)
		b.outgoingByteRate = metrics.GetOrRegisterMeter("outgoing-byte-rate", b.metricRegistry)
		b.responseRate = metrics.GetOrRegisterMeter("response-rate", b.metricRegistry)
		b.responseSize = getOrRegisterHistogram("response-size", b.metricRegistry)
		b.requestsInFlight = metrics.GetOrRegisterCounter("requests-in-flight", b.metricRegistry)
		b.protocolRequestsRate = map[int16]metrics.Meter{}
		// Do not gather metrics for seeded broker (only used during bootstrap) because they share
		// the same id (-1) and are already exposed through the global metrics above
		if b.id >= 0 && !metrics.UseNilMetrics {
			b.registerMetrics()
		}

		if conf.Net.SASL.Mechanism == SASLTypeOAuth && conf.Net.SASL.Version == SASLHandshakeV0 {
			conf.Net.SASL.Version = SASLHandshakeV1
		}

		useSaslV0 := conf.Net.SASL.Version == SASLHandshakeV0 || conf.Net.SASL.Mechanism == SASLTypeGSSAPI
		if conf.Net.SASL.Enable && useSaslV0 {
			b.connErr = b.authenticateViaSASLv0()

			if b.connErr != nil {
				err = b.conn.Close()
				if err == nil {
					DebugLogger.Printf("Closed connection to broker %s\n", b.addr)
				} else {
					Logger.Printf("Error while closing connection to broker %s: %s\n", b.addr, err)
				}
				b.conn = nil
				atomic.StoreInt32(&b.opened, 0)
				return
			}
		}

		b.done = make(chan bool)
		b.responses = make(chan *responsePromise, b.conf.Net.MaxOpenRequests-1)

		go withRecover(b.responseReceiver)
		if conf.Net.SASL.Enable && !useSaslV0 {
			b.connErr = b.authenticateViaSASLv1()
			if b.connErr != nil {
				close(b.responses)
				<-b.done
				err = b.conn.Close()
				if err == nil {
					DebugLogger.Printf("Closed connection to broker %s\n", b.addr)
				} else {
					Logger.Printf("Error while closing connection to broker %s: %s\n", b.addr, err)
				}
				b.conn = nil
				atomic.StoreInt32(&b.opened, 0)
				return
			}
		}
		if b.id >= 0 {
			DebugLogger.Printf("Connected to broker at %s (registered as #%d)\n", b.addr, b.id)
		} else {
			DebugLogger.Printf("Connected to broker at %s (unregistered)\n", b.addr)
		}
	})

	return nil
}