collector/ckpt/ckpt_operation.go (143 lines of code) (raw):
package ckpt
import (
"bytes"
"encoding/json"
"fmt"
conf "github.com/alibaba/MongoShake/v2/collector/configure"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
"io/ioutil"
"net/http"
utils "github.com/alibaba/MongoShake/v2/common"
LOG "github.com/vinllen/log4go"
)
const (
// we can't insert Timestamp(0, 0) that will be treat as Now() inserted
// into mongo. so we use Timestamp(0, 1)
InitCheckpoint = int64(1)
EmptyCheckpoint = int64(0)
)
type CheckpointContext struct {
Name string `bson:"name" json:"name"`
Timestamp int64 `bson:"ckpt" json:"ckpt"`
Version int `bson:"version" json:"version"`
FetchMethod string `bson:"fetch_method" json:"fetch_method"`
OplogDiskQueue string `bson:"oplog_disk_queue" json:"oplog_disk_queue"`
OplogDiskQueueFinishTs int64 `bson:"oplog_disk_queue_apply_finish_ts" json:"oplog_disk_queue_apply_finish_ts"`
}
func (cc *CheckpointContext) String() string {
if ret, err := json.Marshal(cc); err != nil {
return err.Error()
} else {
return string(ret)
}
}
type CheckpointOperation interface {
// read checkpoint from remote storage. and encapsulation
// with CheckpointContext struct
// bool means whether exists on remote
Get() (*CheckpointContext, bool)
// save checkpoint
Insert(ckpt *CheckpointContext) error
// log info
String() string
}
// mongo
type MongoCheckpoint struct {
CheckpointContext
client *utils.MongoCommunityConn
// connection info
URL string
DB, Table string
}
func (ckpt *MongoCheckpoint) ensureNetwork() bool {
if ckpt.client == nil {
if client, err := utils.NewMongoCommunityConn(ckpt.URL, utils.VarMongoConnectModePrimary, true,
utils.ReadWriteConcernMajority, utils.ReadWriteConcernMajority,
conf.Options.CheckpointStorageUrlMongoSslRootCaFile); err == nil {
ckpt.client = client
} else {
LOG.Warn("%s CheckpointOperation manager connect mongo cluster failed. %v", ckpt.Name, err)
return false
}
}
return true
}
func (ckpt *MongoCheckpoint) close() {
ckpt.client.Close()
ckpt.client = nil
}
func (ckpt *MongoCheckpoint) Get() (*CheckpointContext, bool) {
if !ckpt.ensureNetwork() {
LOG.Warn("%s Reload ckpt ensure network failed. %v", ckpt.Name, ckpt.client)
return nil, false
}
var err error
value := new(CheckpointContext)
if err = ckpt.client.Client.Database(ckpt.DB).Collection(ckpt.Table).FindOne(nil,
bson.M{CheckpointName: ckpt.Name}).Decode(value); err == nil {
LOG.Info("%s Load exist checkpoint. content %v", ckpt.Name, value)
return value, true
} else if err == mongo.ErrNoDocuments {
if InitCheckpoint > ckpt.Timestamp {
ckpt.Timestamp = InitCheckpoint
}
value.Name = ckpt.Name
value.Timestamp = ckpt.Timestamp
value.Version = ckpt.Version
value.OplogDiskQueue = ckpt.OplogDiskQueue
value.OplogDiskQueueFinishTs = ckpt.OplogDiskQueueFinishTs
LOG.Info("%s Regenerate checkpoint but won't persist. content: %s", ckpt.Name, value)
return value, false
}
ckpt.close()
LOG.Error("%s Reload ckpt find context fail. %v", ckpt.Name, err)
return nil, false
}
func (ckpt *MongoCheckpoint) Insert(updates *CheckpointContext) error {
if !ckpt.ensureNetwork() {
LOG.Warn("%s Record ckpt ensure network failed. %v", ckpt.Name, ckpt.client)
return fmt.Errorf("%s record ckpt network failed", ckpt.Name)
}
opts := options.Update().SetUpsert(true)
filter := bson.M{CheckpointName: ckpt.Name}
update := bson.M{"$set": updates}
_, err := ckpt.client.Client.Database(ckpt.DB).Collection(ckpt.Table).UpdateOne(nil, filter, update, opts)
if err != nil {
LOG.Warn("%s Record checkpoint %v upsert error %v", ckpt.Name, updates, err)
ckpt.close()
return err
}
LOG.Info("%s Record new checkpoint in MongoDB success [%d]", ckpt.Name,
utils.ExtractMongoTimestamp(updates.Timestamp))
return nil
}
// http
type HttpApiCheckpoint struct {
CheckpointContext
URL string
}
func (ckpt *HttpApiCheckpoint) Get() (*CheckpointContext, bool) {
var err error
var resp *http.Response
var stream []byte
value := new(CheckpointContext)
if resp, err = http.Get(ckpt.URL); err != nil {
LOG.Warn("%s Http api ckpt request failed, %v", ckpt.Name, err)
return nil, false
}
if stream, err = ioutil.ReadAll(resp.Body); err != nil {
return nil, false
}
if err = json.Unmarshal(stream, value); err != nil {
return nil, false
}
// TODO, may have problem
if value.Timestamp == 0 {
// use default start position
value.Timestamp = ckpt.Timestamp
value.Name = ckpt.Name
value.OplogDiskQueueFinishTs = ckpt.OplogDiskQueueFinishTs
value.OplogDiskQueue = ckpt.OplogDiskQueue
return value, false
}
return value, true
}
func (ckpt *HttpApiCheckpoint) Insert(insert *CheckpointContext) error {
body, _ := json.Marshal(insert)
if resp, err := http.Post(ckpt.URL, "application/json", bytes.NewReader(body)); err != nil || resp.StatusCode != http.StatusOK {
LOG.Warn("%s Context api manager write request failed, %v", ckpt.Name, err)
return err
}
LOG.Info("%s Record new checkpoint in HttpApi success [%d]", ckpt.Name, utils.ExtractMongoTimestamp(insert.Timestamp))
return nil
}