datasource/featuredb/featuredb.go (103 lines of code) (raw):
package featuredb
import (
"fmt"
"net"
"net/http"
"sync/atomic"
"time"
)
type FeatureDBClient struct {
Client *http.Client
address string
Token string
vpcAddress string
useVpcAddress atomic.Bool
checkInterval time.Duration
stopChan chan struct{}
}
var (
featureDBClient *FeatureDBClient
)
func InitFeatureDBClient(address, token, vpcAddress string) {
if featureDBClient != nil {
return
}
client := &http.Client{
Transport: &http.Transport{
MaxConnsPerHost: 1000,
MaxIdleConns: 1000,
MaxIdleConnsPerHost: 1000,
DialContext: (&net.Dialer{
Timeout: 200 * time.Millisecond,
KeepAlive: 30 * time.Second,
}).DialContext,
ResponseHeaderTimeout: 500 * time.Millisecond,
},
}
featureDBClient = &FeatureDBClient{
Client: client,
address: address,
Token: token,
vpcAddress: fmt.Sprintf("http://%s", vpcAddress),
checkInterval: 1 * time.Minute,
stopChan: make(chan struct{}),
}
featureDBClient.useVpcAddress.Store(false)
if vpcAddress != "" {
featureDBClient.CheckVpcAddress()
go featureDBClient.backgroundCheckVpcAddress()
}
}
func GetFeatureDBClient() (*FeatureDBClient, error) {
if featureDBClient == nil {
return nil, fmt.Errorf("FeatureDB has not been provisioned")
}
return featureDBClient, nil
}
func (f *FeatureDBClient) backgroundCheckVpcAddress() {
ticker := time.NewTicker(f.checkInterval)
defer ticker.Stop()
for {
select {
case <-f.stopChan:
return
case <-ticker.C:
f.CheckVpcAddress()
}
}
}
func (f *FeatureDBClient) CheckVpcAddress() {
req, err := http.NewRequest("GET", fmt.Sprintf("%s/health", f.vpcAddress), nil)
if err != nil {
f.useVpcAddress.Store(false)
return
}
req.Header.Set("Content-Type", "application/json")
resp, err := f.Client.Do(req)
if err != nil {
f.useVpcAddress.Store(false)
return
}
defer resp.Body.Close()
if resp.StatusCode == http.StatusOK {
f.useVpcAddress.Store(true)
return
}
f.useVpcAddress.Store(false)
}
func (f *FeatureDBClient) GetCurrentAddress(check bool) string {
if f.vpcAddress == "" {
return f.address
}
if check {
f.CheckVpcAddress()
}
if f.useVpcAddress.Load() {
return f.vpcAddress
} else {
return f.address
}
}
func (f *FeatureDBClient) Stop() {
close(f.stopChan)
}