common/community_client.go (206 lines of code) (raw):
package utils
import (
"context"
"crypto/tls"
"crypto/x509"
"encoding/pem"
"fmt"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/bson/primitive"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
"go.mongodb.org/mongo-driver/mongo/readconcern"
"go.mongodb.org/mongo-driver/mongo/readpref"
"go.mongodb.org/mongo-driver/mongo/writeconcern"
"io/ioutil"
"time"
LOG "github.com/vinllen/log4go"
)
type MongoCommunityConn struct {
Client *mongo.Client
URL string
ctx context.Context
}
func addCACertFromFile(cfg *tls.Config, file string) error {
data, err := ioutil.ReadFile(file)
if err != nil {
return err
}
certBytes, err := loadCert(data)
if err != nil {
return err
}
cert, err := x509.ParseCertificate(certBytes)
if err != nil {
return err
}
if cfg.RootCAs == nil {
cfg.RootCAs = x509.NewCertPool()
}
cfg.RootCAs.AddCert(cert)
return nil
}
func loadCert(data []byte) ([]byte, error) {
var certBlock *pem.Block
for certBlock == nil {
if data == nil || len(data) == 0 {
return nil, fmt.Errorf(".pem file must have both a CERTIFICATE and an RSA PRIVATE KEY section")
}
block, rest := pem.Decode(data)
if block == nil {
return nil, fmt.Errorf("invalid .pem file")
}
switch block.Type {
case "CERTIFICATE":
if certBlock != nil {
return nil, fmt.Errorf("multiple CERTIFICATE sections in .pem file")
}
certBlock = block
}
data = rest
}
return certBlock.Bytes, nil
}
func NewMongoCommunityConn(url string, connectMode string, timeout bool, readConcern,
writeConcern string, sslRootFile string) (*MongoCommunityConn, error) {
clientOps := options.Client().ApplyURI(url)
// tls tlsInsecure + tlsCaFile
if sslRootFile != "" {
tlsConfig := new(tls.Config)
err := addCACertFromFile(tlsConfig, sslRootFile)
if err != nil {
return nil, fmt.Errorf("load rootCaFile[%v] failed: %v", sslRootFile, err)
}
// not check hostname
tlsConfig.InsecureSkipVerify = true
clientOps.SetTLSConfig(tlsConfig)
}
// read concern
switch readConcern {
case ReadWriteConcernDefault:
default:
clientOps.SetReadConcern(readconcern.New(readconcern.Level(readConcern)))
}
// write concern
switch writeConcern {
case ReadWriteConcernMajority:
clientOps.SetWriteConcern(writeconcern.New(writeconcern.WMajority()))
}
// read pref
readPreference := readpref.Primary()
switch connectMode {
case VarMongoConnectModePrimary:
readPreference = readpref.Primary()
case VarMongoConnectModeSecondaryPreferred:
readPreference = readpref.SecondaryPreferred()
case VarMongoConnectModeStandalone:
// TODO, no standalone, choose nearest
fallthrough
case VarMongoConnectModeNearset:
readPreference = readpref.Nearest()
default:
readPreference = readpref.Primary()
}
clientOps.SetReadPreference(readPreference)
// set timeout
if !timeout {
clientOps.SetConnectTimeout(0)
} else {
clientOps.SetConnectTimeout(20 * time.Minute)
}
//clientOps.SetMaxConnIdleTime(1 * time.Hour)
// create default context
ctx := context.Background()
// connect
client, err := mongo.NewClient(clientOps)
if err != nil {
return nil, fmt.Errorf("new client failed: %v", err)
}
if err := client.Connect(ctx); err != nil {
return nil, fmt.Errorf("connect to %s failed: %v", BlockMongoUrlPassword(url, "***"), err)
}
// ping
if err = client.Ping(ctx, clientOps.ReadPreference); err != nil {
return nil, fmt.Errorf("ping to %v failed: %v\n"+
"If Mongo Server is standalone(single node) Or conn address is different with mongo server address"+
" try atandalone mode by mongodb://ip:port/admin?connect=direct",
BlockMongoUrlPassword(url, "***"), err)
}
LOG.Info("New session to %s successfully", BlockMongoUrlPassword(url, "***"))
return &MongoCommunityConn{
Client: client,
URL: url,
ctx: ctx,
}, nil
}
func (conn *MongoCommunityConn) Close() {
LOG.Info("Close client with %s", BlockMongoUrlPassword(conn.URL, "***"))
conn.Client.Disconnect(conn.ctx)
}
func (conn *MongoCommunityConn) IsGood() bool {
if err := conn.Client.Ping(nil, nil); err != nil {
return false
}
return true
}
func (conn *MongoCommunityConn) HasOplogNs(queryConditon bson.M) bool {
if ns, err := conn.Client.Database("local").ListCollectionNames(nil, queryConditon); err == nil {
for _, table := range ns {
if table == OplogNS {
return true
}
}
}
return false
}
func (conn *MongoCommunityConn) AcquireReplicaSetName() string {
res, err := conn.Client.Database("admin").
RunCommand(conn.ctx, bson.D{{"replSetGetStatus", 1}}).DecodeBytes()
if err != nil {
LOG.Warn("Replica set name not found in system.replset: %v", err)
return ""
}
id, ok := res.Lookup("set").StringValueOK()
if !ok {
LOG.Warn("Replica set name not found, is empty")
return ""
}
return id
}
func (conn *MongoCommunityConn) HasUniqueIndex(queryConditon bson.M) bool {
checkNs := make([]NS, 0, 128)
var databases []string
var err error
if databases, err = conn.Client.ListDatabaseNames(nil, bson.M{}); err != nil {
LOG.Critical("Couldn't get databases from remote server: %v", err)
return false
}
for _, db := range databases {
if db != "admin" && db != "local" && db != "config" {
coll, _ := conn.Client.Database(db).ListCollectionNames(nil, queryConditon)
for _, c := range coll {
if c != "system.profile" {
// push all collections
checkNs = append(checkNs, NS{Database: db, Collection: c})
}
}
}
}
LOG.Info("HasUniqueIndex checkNs:%v", checkNs)
for _, ns := range checkNs {
cursor, _ := conn.Client.Database(ns.Database).Collection(ns.Collection).Indexes().List(nil)
for cursor.Next(nil) {
unique, uerr := cursor.Current.LookupErr("unique")
if uerr == nil && unique.Boolean() == true {
LOG.Info("Found unique index %s on %s.%s in auto shard mode",
cursor.Current.Lookup("name").StringValue(), ns.Database, ns.Collection)
return true
}
}
}
return false
}
func (conn *MongoCommunityConn) CurrentDate() primitive.Timestamp {
res, _ := conn.Client.Database("admin").
RunCommand(conn.ctx, bson.D{{"replSetGetStatus", 1}}).DecodeBytes()
t, i, ok := res.Lookup("operationTime").TimestampOK()
if !ok {
LOG.Warn("Replica set operationTime not found, res[%v]", res)
return primitive.Timestamp{T: uint32(time.Now().Unix()), I: 0}
}
return primitive.Timestamp{T: t, I: i}
}
func (conn *MongoCommunityConn) IsTimeSeriesCollection(dbName string, collName string) bool {
res, _ := conn.Client.Database(dbName).
RunCommand(conn.ctx, bson.D{{"collStats", collName}}).DecodeBytes()
_, timeseries := res.Lookup("timeseries").DocumentOK()
return timeseries
}