func()

in dialer.go [295:453]


func (d *Dialer) Dial(ctx context.Context, instance string, opts ...DialOption) (conn net.Conn, err error) {
	select {
	case <-d.closed:
		return nil, ErrDialerClosed
	default:
	}

	inst, err := alloydb.ParseInstURI(instance)
	if err != nil {
		return nil, err
	}
	mr := d.metricRecorder(ctx, inst)

	var (
		startTime = time.Now()
		endDial   tel.EndSpanFunc
		attrs     = telv2.Attributes{
			IAMAuthN:    d.useIAMAuthN,
			UserAgent:   d.userAgent,
			RefreshType: telv2.RefreshAheadType,
		}
	)
	if d.lazyRefresh {
		attrs.RefreshType = telv2.RefreshLazyType
	}
	ctx, endDial = tel.StartSpan(ctx, "cloud.google.com/go/alloydbconn.Dial",
		tel.AddInstanceName(instance),
		tel.AddDialerID(d.dialerID),
	)
	defer func() {
		go tel.RecordDialError(context.Background(), instance, d.dialerID, err)
		go mr.RecordDialCount(ctx, attrs)
		endDial(err)
	}()

	cfg := d.defaultDialCfg
	for _, opt := range opts {
		opt(&cfg)
	}

	var endInfo tel.EndSpanFunc
	ctx, endInfo = tel.StartSpan(ctx, "cloud.google.com/go/alloydbconn/internal.InstanceInfo")
	cache, cacheHit, err := d.connectionInfoCache(ctx, inst, mr)
	attrs.CacheHit = cacheHit
	if err != nil {
		attrs.DialStatus = telv2.DialCacheError
		endInfo(err)
		return nil, err
	}
	ci, err := cache.ConnectionInfo(ctx)
	if err != nil {
		attrs.DialStatus = telv2.DialCacheError
		d.removeCached(ctx, inst, cache, err)
		endInfo(err)
		return nil, err
	}
	endInfo(err)

	// If the client certificate has expired (as when the computer goes to
	// sleep, and the refresh cycle cannot run), force a refresh immediately.
	// The TLS handshake will not fail on an expired client certificate. It's
	// not until the first read where the client cert error will be surfaced.
	// So check that the certificate is valid before proceeding.
	if invalidClientCert(ctx, inst, d.logger, ci.Expiration) {
		d.logger.Debugf(ctx, "[%v] Refreshing certificate now", inst.String())
		cache.ForceRefresh()
		// Block on refreshed connection info
		ci, err = cache.ConnectionInfo(ctx)
		if err != nil {
			d.removeCached(ctx, inst, cache, err)
			attrs.DialStatus = telv2.DialCacheError
			return nil, err
		}
	}
	addr, ok := ci.IPAddrs[cfg.ipType]
	if !ok {
		d.removeCached(ctx, inst, cache, err)
		err := errtype.NewConfigError(
			fmt.Sprintf("instance does not have IP of type %q", cfg.ipType),
			inst.String(),
		)
		attrs.DialStatus = telv2.DialUserError
		return nil, err
	}

	var connectEnd tel.EndSpanFunc
	ctx, connectEnd = tel.StartSpan(ctx, "cloud.google.com/go/alloydbconn/internal.Connect")
	defer func() { connectEnd(err) }()
	hostPort := net.JoinHostPort(addr, serverProxyPort)
	f := d.dialFunc
	if cfg.dialFunc != nil {
		f = cfg.dialFunc
	}
	d.logger.Debugf(ctx, "[%v] Dialing %v", inst.String(), hostPort)
	conn, err = f(ctx, "tcp", hostPort)
	if err != nil {
		d.logger.Debugf(ctx, "[%v] Dialing %v failed: %v", inst.String(), hostPort, err)
		// refresh the instance info in case it caused the connection failure
		cache.ForceRefresh()
		attrs.DialStatus = telv2.DialTCPError
		return nil, errtype.NewDialError("failed to dial", inst.String(), err)
	}
	if c, ok := conn.(*net.TCPConn); ok {
		if err := c.SetKeepAlive(true); err != nil {
			attrs.DialStatus = telv2.DialTCPError
			return nil, errtype.NewDialError("failed to set keep-alive", inst.String(), err)
		}
		if err := c.SetKeepAlivePeriod(cfg.tcpKeepAlive); err != nil {
			attrs.DialStatus = telv2.DialTCPError
			return nil, errtype.NewDialError("failed to set keep-alive period", inst.String(), err)
		}
	}

	c := &tls.Config{
		Certificates: []tls.Certificate{ci.ClientCert},
		RootCAs:      ci.RootCAs,
		// The PSC, private, and public IP all appear in the certificate as
		// SAN. Use the server name that corresponds to the requested
		// connection path.
		ServerName: addr,
		MinVersion: tls.VersionTLS13,
	}
	tlsConn := tls.Client(conn, c)
	if err := tlsConn.HandshakeContext(ctx); err != nil {
		d.logger.Debugf(ctx, "[%v] TLS handshake failed: %v", inst.String(), err)
		// refresh the instance info in case it caused the handshake failure
		cache.ForceRefresh()
		_ = tlsConn.Close() // best effort close attempt
		attrs.DialStatus = telv2.DialTLSError
		return nil, errtype.NewDialError("handshake failed", inst.String(), err)
	}

	if !d.disableMetadataExchange {
		// The metadata exchange must occur after the TLS connection is established
		// to avoid leaking sensitive information.
		err = d.metadataExchange(tlsConn)
		if err != nil {
			_ = tlsConn.Close() // best effort close attempt
			attrs.DialStatus = telv2.DialMDXError
			return nil, err
		}
	}
	attrs.DialStatus = telv2.DialSuccess

	latency := time.Since(startTime).Milliseconds()
	go func() {
		n := atomic.AddUint64(cache.openConns, 1)
		tel.RecordOpenConnections(ctx, int64(n), d.dialerID, inst.String())
		tel.RecordDialLatency(ctx, instance, d.dialerID, latency)
		mr.RecordOpenConnection(ctx, attrs)
		mr.RecordDialLatency(ctx, latency, attrs)
	}()

	return newInstrumentedConn(tlsConn, mr, attrs, func() {
		n := atomic.AddUint64(cache.openConns, ^uint64(0))
		tel.RecordOpenConnections(context.Background(), int64(n), d.dialerID, inst.String())
		mr.RecordClosedConnection(context.Background(), attrs)
	}, d.dialerID, inst.String()), nil
}