nimo-shake/checkpoint/mongoWriter.go (116 lines of code) (raw):
package checkpoint
import (
"context"
"fmt"
utils "nimo-shake/common"
LOG "github.com/vinllen/log4go"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
)
type MongoWriter struct {
address string
//conn *utils.MongoConn
nconn *utils.MongoCommunityConn
db string
}
func NewMongoWriter(address, db string) *MongoWriter {
targetConn, err := utils.NewMongoCommunityConn(address, utils.ConnectModePrimary, true)
if err != nil {
LOG.Error("create mongodb with address[%v] db[%v] connection error[%v]", address, db, err)
return nil
}
return &MongoWriter{
address: address,
nconn: targetConn,
db: db,
}
}
func (mw *MongoWriter) FindStatus() (string, error) {
var query Status
if err := mw.nconn.Client.Database(mw.db).Collection(CheckpointStatusTable).FindOne(context.TODO(),
bson.M{"Key": CheckpointStatusKey}).Decode(&query); err != nil {
if err == mongo.ErrNoDocuments {
return CheckpointStatusValueEmpty, nil
}
return "", err
} else {
return query.Value, nil
}
}
func (mw *MongoWriter) UpdateStatus(status string) error {
update := Status{
Key: CheckpointStatusKey,
Value: status,
}
opts := options.Update().SetUpsert(true)
filter := bson.M{"Key": CheckpointStatusKey}
updateStr := bson.M{"$set": update}
_, err := mw.nconn.Client.Database(mw.db).Collection(CheckpointStatusTable).UpdateOne(context.TODO(), filter, updateStr, opts)
return err
}
func (mw *MongoWriter) ExtractCheckpoint() (map[string]map[string]*Checkpoint, error) {
// extract checkpoint from mongodb, every collection checkpoint have independent collection(table)
ckptMap := make(map[string]map[string]*Checkpoint)
collectionList, err := mw.nconn.Client.Database(mw.db).ListCollectionNames(context.TODO(), bson.M{})
if err != nil {
return nil, fmt.Errorf("fetch checkpoint collection list failed[%v]", err)
}
for _, table := range collectionList {
if FilterCkptCollection(table) {
continue
}
innerMap, err := mw.ExtractSingleCheckpoint(table)
if err != nil {
return nil, err
}
ckptMap[table] = innerMap
}
return ckptMap, nil
}
func (mw *MongoWriter) ExtractSingleCheckpoint(table string) (map[string]*Checkpoint, error) {
innerMap := make(map[string]*Checkpoint)
data := make([]*Checkpoint, 0)
cursor, err := mw.nconn.Client.Database(mw.db).Collection(table).Find(context.TODO(), bson.M{})
if err != nil {
return nil, err
}
defer cursor.Close(context.TODO())
for cursor.Next(context.TODO()) {
var elem Checkpoint
if err := cursor.Decode(&elem); err != nil {
return nil, err
}
data = append(data, &elem)
}
for _, ele := range data {
innerMap[ele.ShardId] = ele
}
return innerMap, nil
}
func (mw *MongoWriter) Insert(ckpt *Checkpoint, table string) error {
_, err := mw.nconn.Client.Database(mw.db).Collection(table).InsertOne(context.TODO(), ckpt)
return err
}
func (mw *MongoWriter) Update(shardId string, ckpt *Checkpoint, table string) error {
filter := bson.M{"ShardId": shardId}
updateStr := bson.M{"$set": ckpt}
_, err := mw.nconn.Client.Database(mw.db).Collection(table).UpdateOne(context.TODO(), filter, updateStr)
return err
}
func (mw *MongoWriter) UpdateWithSet(shardId string, input map[string]interface{}, table string) error {
filter := bson.M{"ShardId": shardId}
updateStr := bson.M{"$set": input}
_, err := mw.nconn.Client.Database(mw.db).Collection(table).UpdateOne(context.TODO(), filter, updateStr)
return err
}
func (mw *MongoWriter) Query(shardId string, table string) (*Checkpoint, error) {
var res Checkpoint
err := mw.nconn.Client.Database(mw.db).Collection(table).FindOne(context.TODO(), bson.M{"ShardId": shardId}).Decode(&res)
return &res, err
}
func (mw *MongoWriter) DropAll() error {
return mw.nconn.Client.Database(mw.db).Drop(context.TODO())
}
func (fw *MongoWriter) IncrCacheFileInsert(table string, shardId string, fileName string,
lastSequenceNumber string, time string) error {
// write cachefile struct to db
return nil
}