nimo-full-check/checker/document-checker.go (265 lines of code) (raw):

package checker import ( "context" "encoding/json" "fmt" conf "nimo-full-check/configure" "os" "reflect" shakeUtils "nimo-shake/common" "nimo-shake/protocal" shakeQps "nimo-shake/qps" "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/service/dynamodb" "github.com/google/go-cmp/cmp" LOG "github.com/vinllen/log4go" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/bson/primitive" ) func convertToMap(data interface{}) interface{} { switch v := data.(type) { case primitive.D: result := make(map[string]interface{}) for _, elem := range v { result[elem.Key] = convertToMap(elem.Value) } return result case primitive.ObjectID: return v.Hex() case []interface{}: var newSlice []interface{} for _, item := range v { newSlice = append(newSlice, convertToMap(item)) } return newSlice default: return v } } func interIsEqual(dynamoData, convertedMongo interface{}) bool { //convertedMongo := convertToMap(mongoData) if m, ok := convertedMongo.(map[string]interface{}); ok { delete(m, "_id") } else { LOG.Warn("don't have _id in mongodb document") return false } opts := cmp.Options{ cmp.FilterPath(func(p cmp.Path) bool { if len(p) > 0 { switch p.Last().(type) { case cmp.MapIndex, cmp.SliceIndex: return true } } return false }, cmp.Transformer("NormalizeNumbers", func(in interface{}) interface{} { switch v := in.(type) { case int, int32, int64, float32, float64: return reflect.ValueOf(v).Convert(reflect.TypeOf(float64(0))).Float() default: return in } })), } // LOG.Warn("tmp2 %v", cmp.Diff(dynamoData, convertedMongo, opts)) return cmp.Equal(dynamoData, convertedMongo, opts) } const ( fetcherChanSize = 512 parserChanSize = 4096 ) type KeyUnion struct { name string tp string union string } type DocumentChecker struct { id int ns shakeUtils.NS sourceConn *dynamodb.DynamoDB mongoClient *shakeUtils.MongoCommunityConn fetcherChan chan *dynamodb.ScanOutput // chan between fetcher and parser parserChan chan protocal.RawData // chan between parser and writer converter protocal.Converter // converter sampler *Sample // use to sample primaryKeyWithType KeyUnion sortKeyWithType KeyUnion } func NewDocumentChecker(id int, table string, dynamoSession *dynamodb.DynamoDB) *DocumentChecker { // check mongodb connection mongoClient, err := shakeUtils.NewMongoCommunityConn(conf.Opts.TargetAddress, shakeUtils.ConnectModePrimary, true) if err != nil { LOG.Crashf("documentChecker[%v] with table[%v] connect mongodb[%v] failed[%v]", id, table, conf.Opts.TargetAddress, err) } converter := protocal.NewConverter(conf.Opts.ConvertType) if converter == nil { LOG.Error("documentChecker[%v] with table[%v] create converter failed", id, table) return nil } return &DocumentChecker{ id: id, sourceConn: dynamoSession, mongoClient: mongoClient, converter: converter, ns: shakeUtils.NS{ Collection: table, Database: conf.Opts.Id, }, } } func (dc *DocumentChecker) String() string { return fmt.Sprintf("documentChecker[%v] with table[%s]", dc.id, dc.ns) } func (dc *DocumentChecker) Run() { // check outline if err := dc.checkOutline(); err != nil { LOG.Crashf("%s check outline failed[%v]", dc.String(), err) } LOG.Info("%s check outline finish, starts checking details", dc.String()) dc.fetcherChan = make(chan *dynamodb.ScanOutput, fetcherChanSize) dc.parserChan = make(chan protocal.RawData, parserChanSize) // start fetcher to fetch all data from DynamoDB go dc.fetcher() // start parser to get data from fetcher and write into exector. go dc.parser() // start executor to check dc.executor() } func (dc *DocumentChecker) fetcher() { LOG.Info("%s start fetcher", dc.String()) qos := shakeQps.StartQoS(int(conf.Opts.QpsFull)) defer qos.Close() // init nil var previousKey map[string]*dynamodb.AttributeValue for { <-qos.Bucket out, err := dc.sourceConn.Scan(&dynamodb.ScanInput{ TableName: aws.String(dc.ns.Collection), ExclusiveStartKey: previousKey, Limit: aws.Int64(conf.Opts.QpsFullBatchNum), }) if err != nil { // TODO check network error and retry LOG.Crashf("%s fetcher scan failed[%v]", dc.String(), err) } // pass result to parser dc.fetcherChan <- out previousKey = out.LastEvaluatedKey if previousKey == nil { // complete break } } LOG.Info("%s close fetcher", dc.String()) close(dc.fetcherChan) } func (dc *DocumentChecker) parser() { LOG.Info("%s start parser", dc.String()) for { data, ok := <-dc.fetcherChan if !ok { break } LOG.Debug("%s reads data[%v]", dc.String(), data) list := data.Items for _, ele := range list { out, err := dc.converter.Run(ele) if err != nil { LOG.Crashf("%s parses ele[%v] failed[%v]", dc.String(), ele, err) } // sample if dc.sampler.Hit() == false { continue } dc.parserChan <- out.(protocal.RawData) } } LOG.Info("%s close parser", dc.String()) close(dc.parserChan) } func (dc *DocumentChecker) executor() { LOG.Info("%s start executor", dc.String()) diffFile := fmt.Sprintf("%s/%s", conf.Opts.DiffOutputFile, dc.ns.Collection) f, err := os.Create(diffFile) if err != nil { LOG.Crashf("%s create diff output file[%v] failed", dc.String(), diffFile) return } for { data, ok := <-dc.parserChan if !ok { break } //var query map[string]interface{} query := make(map[string]interface{}) if dc.primaryKeyWithType.name != "" { // find by union key if conf.Opts.ConvertType == shakeUtils.ConvertMTypeChange { query[dc.primaryKeyWithType.name] = data.Data.(map[string]interface{})[dc.primaryKeyWithType.name] } else { LOG.Crashf("unknown convert type[%v]", conf.Opts.ConvertType) } } if dc.sortKeyWithType.name != "" { if conf.Opts.ConvertType == shakeUtils.ConvertMTypeChange { query[dc.sortKeyWithType.name] = data.Data.(map[string]interface{})[dc.sortKeyWithType.name] } else { LOG.Crashf("unknown convert type[%v]", conf.Opts.ConvertType) } } LOG.Info("query: %v", query) // query var output, outputMap interface{} isSame := true err := dc.mongoClient.Client.Database(dc.ns.Database).Collection(dc.ns.Collection). FindOne(context.TODO(), query).Decode(&output) if err != nil { err = fmt.Errorf("target query failed[%v][%v][%v]", err, output, query) LOG.Error("%s %v", dc.String(), err) } else { outputMap = convertToMap(output) isSame = interIsEqual(data.Data, outputMap) } inputJson, _ := json.Marshal(data.Data) outputJson, _ := json.Marshal(outputMap) if err != nil { f.WriteString(fmt.Sprintf("compare src[%s] to dst[%s] failed: %v\n", inputJson, outputJson, err)) } else if isSame == false { LOG.Warn("compare src[%s] and dst[%s] failed", inputJson, outputJson) f.WriteString(fmt.Sprintf("src[%s] != dst[%s]\n", inputJson, outputJson)) } } LOG.Info("%s close executor", dc.String()) f.Close() // remove file if size == 0 if fi, err := os.Stat(diffFile); err != nil { LOG.Warn("stat diffFile[%v] failed[%v]", diffFile, err) return } else if fi.Size() == 0 { if err := os.Remove(diffFile); err != nil { LOG.Warn("remove diffFile[%v] failed[%v]", diffFile, err) } } } func (dc *DocumentChecker) checkOutline() error { // describe dynamodb table out, err := dc.sourceConn.DescribeTable(&dynamodb.DescribeTableInput{ TableName: aws.String(dc.ns.Collection), }) if err != nil { return fmt.Errorf("describe table failed[%v]", err) } LOG.Info("describe table[%v] result: %v", dc.ns.Collection, out) // 1. check total number // dynamo count dynamoCount := out.Table.ItemCount // mongo count cnt, err := dc.mongoClient.Client.Database(dc.ns.Database).Collection(dc.ns.Collection).CountDocuments(context.Background(), bson.M{}) if err != nil { return fmt.Errorf("get mongo count failed[%v]", err) } if *dynamoCount != cnt { // return fmt.Errorf("dynamo count[%v] != mongo count[%v]", *dynamoCount, cnt) LOG.Warn("dynamo count[%v] != mongo count[%v]", *dynamoCount, cnt) } // set sampler dc.sampler = NewSample(conf.Opts.Sample, cnt) // 2. check index // TODO // parse index // parse primary key with sort key allIndexes := out.Table.AttributeDefinitions primaryIndexes := out.Table.KeySchema // parse index type parseMap := shakeUtils.ParseIndexType(allIndexes) primaryKey, sortKey, err := shakeUtils.ParsePrimaryAndSortKey(primaryIndexes, parseMap) if err != nil { return fmt.Errorf("parse primary and sort key failed[%v]", err) } dc.primaryKeyWithType = KeyUnion{ name: primaryKey, tp: parseMap[primaryKey], union: fmt.Sprintf("%s.%s", primaryKey, parseMap[primaryKey]), } dc.sortKeyWithType = KeyUnion{ name: sortKey, tp: parseMap[sortKey], union: fmt.Sprintf("%s.%s", sortKey, parseMap[sortKey]), } return nil }