executor/collision_matrix.go (283 lines of code) (raw):

package executor import ( "bytes" "fmt" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/bson/primitive" "reflect" "strings" "github.com/alibaba/MongoShake/v2/oplog" nimo "github.com/gugemichael/nimo4go" LOG "github.com/vinllen/log4go" ) const MultiColumnIndexSplitter = "|" type OplogRecord struct { original *PartialLogWithCallbak // wait() procedure will stop to wait for the dependent OplogRecord // that operate the equivalent index values ahead of the this one // we would insert sync mutex here ! wait func() } type CollisionMatrix interface { // split oplogs into safety segments split(logs []*PartialLogWithCallbak) [][]*PartialLogWithCallbak convert(segment []*PartialLogWithCallbak) []*OplogRecord } type NoopMatrix struct{} func (noop *NoopMatrix) split(logs []*PartialLogWithCallbak) [][]*PartialLogWithCallbak { return [][]*PartialLogWithCallbak{logs} } func (noop *NoopMatrix) convert(segment []*PartialLogWithCallbak) []*OplogRecord { records := make([]*OplogRecord, len(segment), len(segment)) for index, log := range segment { // nothing to change ! records[index] = &OplogRecord{original: log, wait: nil} } return records } type OplogUniqueIdentifier struct { signatureTable []string // sequence id. order by oplogs timing order int // key is unique index column name. value is index values operations map[string]interface{} } func (oplogUniqueIdentifier *OplogUniqueIdentifier) addSignature(signature string) { // for every field. sum name and value signature value oplogUniqueIdentifier.signatureTable = append(oplogUniqueIdentifier.signatureTable, signature) } func fillupOperationValues(log *PartialLogWithCallbak) { if log.partialLog.Operation != "i" && log.partialLog.Operation != "u" { return } // for update. need to initialize the UniqueIndexesUpdates if log.partialLog.Operation == "u" { log.partialLog.UniqueIndexesUpdates = bson.M{} } o := log.partialLog.Object for k := range log.partialLog.UniqueIndexes { // multi key index like "name|phone", seperate by MultiColumnIndexSplitter // every index value should be fetched respectively from oplog.o for _, singleIndex := range strings.Split(k, MultiColumnIndexSplitter) { // single index may be "aaa" or "aaa.bbb.ccc" parent, _ := oplog.ConvertBsonD2M(o) // all types of $inc, $mul, $rename, $unset, $set change to $set,$unset in oplog // $set looks like o:{$set:{a:{b:1}}} or o:{$set:{"a.b":1}} if m, exist := parent["$set"]; exist { if child, ok := m.(bson.M); ok { // skip $set operator parent = child } } var value interface{} // check if there already has "a.b.c" in $set if v, exist := parent[singleIndex]; exist { value = v } else { cascades := strings.Split(singleIndex, ".") descend := len(cascades) - 1 // going down inPosition := true for i := 0; i != descend; i++ { if down, ok := parent[cascades[i]].(bson.M); ok { parent = down } else { inPosition = false } } if inPosition { value = parent[cascades[len(cascades)-1]] } } var fill interface{} switch log.partialLog.Operation { case "i": fill = log.partialLog.UniqueIndexes[k] case "u": fill = log.partialLog.UniqueIndexesUpdates[k] } if fill == nil { fill = make([]interface{}, 0) } if array, ok := fill.([]interface{}); ok { // doesn't find the target value. just fill NULL into it array = append(array, value) // reset to uk switch log.partialLog.Operation { case "i": log.partialLog.UniqueIndexes[k] = array case "u": log.partialLog.UniqueIndexesUpdates[k] = array } } } } } func newUniqueIdentifier(order int, log *PartialLogWithCallbak) *OplogUniqueIdentifier { nimo.AssertTrue(len(log.partialLog.UniqueIndexes) != 0, "make identifier of empty indexes wrong") // first of all, we need to fill up the oplog.uk colum field // if the operation is equal to "i","u" fillupOperationValues(log) // construct identifier identifier := &OplogUniqueIdentifier{} identifier.order = order for columnName, columnValue := range log.partialLog.UniqueIndexes { // if column value is an array of sub values. the size of array should be exactly equals // for MongoDB same index. so we don't counting the size of array into "signature" identifier.addSignature(fmt.Sprintf("%f", calculateSignature(columnName)+calculateSignature(columnValue))) } // for update only if log.partialLog.Operation == "u" { for columnName, columnValue := range log.partialLog.UniqueIndexesUpdates { identifier.addSignature(fmt.Sprintf("%f", calculateSignature(columnName)+calculateSignature(columnValue))) } } identifier.operations = log.partialLog.UniqueIndexes return identifier } func calculateSignature(object interface{}) (sign float64) { if object == nil { // prime number. different from boolean return 3 } switch o := object.(type) { case bson.M: // recursive for k, v := range o { sign += calculateSignature(k) sign += calculateSignature(v) } case []interface{}: // recursive for _, v := range o { sign += calculateSignature(v) } case primitive.Binary: // byte array for _, c := range o.Data { // consult from Java String.hashcode() sign = 31.0*sign + float64(c) } case string: for _, c := range o { // consult from Java String.hashcode() sign = 31.0*sign + float64(c) } case []byte: for _, c := range o { // consult from Java String.hashcode() sign = 31.0*sign + float64(c) } case primitive.Timestamp: // numbers sign = float64(int64(o.T<<32) + int64(o.I)) case int, uint, int8, uint8, int16, uint16, int32, uint32, int64, uint64, float32, float64: if v, ok := object.(float64); ok { sign = float64(v) } case bool: // boolean sign++ if o { sign++ } default: nimo.Assert(fmt.Sprintf("bson value signature type is not recognized [%d, %s]", reflect.TypeOf(object).Kind(), reflect.TypeOf(object).Name())) LOG.Critical("Bson value signature type is not recognized [%d, %s]", reflect.TypeOf(object).Kind(), reflect.TypeOf(object).Name()) // default value represents may be conflict. the signature is unsafe. just speed up ! // so simply sum zero value here return 0 } return sign } func ExactlyMatch(first, second interface{}) bool { if (first != nil && second == nil) || (first == nil && second != nil) || reflect.TypeOf(first).Kind() != reflect.TypeOf(second).Kind() { return false } // both of them are null is accept if first == nil && second == nil { return true } switch o := first.(type) { case bson.M: // recursive for key := range o { if v, ok := second.(map[string]interface{}); ok && !ExactlyMatch(o[key], v[key]) { return false } } case []interface{}: // recursive if v, ok := second.([]interface{}); ok { if len(o) != len(v) { return false } } for i := range o { if v, ok := second.([]interface{}); ok && !ExactlyMatch(o[i], v[i]) { return false } } case []byte: // byte array if v, ok := second.([]byte); ok { return bytes.Compare(o, v) == 0 } case primitive.Binary: if v, ok := second.(primitive.Binary); ok { return bytes.Compare(o.Data, v.Data) == 0 } case string: if v, ok := second.(string); ok { return o == v } case primitive.Timestamp: // numbers return (first.(primitive.Timestamp)).Equal(second.(primitive.Timestamp)) case int, uint, int8, uint8, int16, uint16, int32, uint32, int64, uint64, float32, float64: if v1, ok := first.(float64); ok { if v2, ok := second.(float64); ok { return v1 == v2 } } case bool: // boolean if v, ok := second.(bool); ok { return o == v } default: nimo.Assert(fmt.Sprintf("bson value check similar. not recognized [%s]", reflect.TypeOf(first).Name())) LOG.Critical("bson value check similar. not recognized [%s]", reflect.TypeOf(first).Name()) } // TODO: execute path may be walked from DEFAULT we meet something that the type // we don't know now. for safety, we treat these oplogs' unique indexes as equal exactly !! // This may cause more segments splitted but more safe. return true } func intersectionInOrder(one bson.M, other bson.M) bool { // TODO: BSON's object or array is already sorted ?! we can get the intersection more easier // BUT for now. we don't make such assumption. so make it simple, we iterate every field // in Map. and compare to another for k1, v1 := range one { if v2, exist := other[k1]; exist { return ExactlyMatch(v1, v2) } } return false } func intersection(this bson.M, other bson.M) bool { if len(this) == 0 || len(other) == 0 { return false } // compare each other return intersectionInOrder(this, other) || intersectionInOrder(other, this) } func haveMutualIndex(first, second *oplog.PartialLog) bool { if first == second { return false } var firstId, secondId primitive.ObjectID var got bool firstId, got = oplog.GetIdOrNSFromOplog(first).(primitive.ObjectID) secondId, got = oplog.GetIdOrNSFromOplog(second).(primitive.ObjectID) if got && firstId.Hex() == secondId.Hex() { // oplogs operate the single MongoDB record. they should be serialized by executor return false } // unified approach for insert, update, delete return intersection(first.UniqueIndexes, second.UniqueIndexes) || intersection(first.UniqueIndexes, second.UniqueIndexesUpdates) || intersection(first.UniqueIndexesUpdates, second.UniqueIndexes) || intersection(first.UniqueIndexesUpdates, second.UniqueIndexesUpdates) } // BarrierMatrix only split oplogs into segments. without convert stage stage. type BarrierMatrix struct { NoopMatrix // original []*PartialLogWithCallbak } func NewBarrierMatrix() *BarrierMatrix { return &BarrierMatrix{NoopMatrix{}} } func (*BarrierMatrix) split(logs []*PartialLogWithCallbak) [][]*PartialLogWithCallbak { var segmentList [][]*PartialLogWithCallbak var seg []*PartialLogWithCallbak var identifier *OplogUniqueIdentifier signatureSet := make(map[string][]*PartialLogWithCallbak) for i, log := range logs { // put the log into segment straightly if no unique index operations found if len(log.partialLog.UniqueIndexes) != 0 { identifier = newUniqueIdentifier(i, log) for _, current := range identifier.signatureTable { if potentialCandidates, exist := signatureSet[current]; exist { // found possible conflict keys for _, candidate := range potentialCandidates { // candidate and the "log" may be the same one ! if haveMutualIndex(candidate.partialLog, log.partialLog) { // add this signature to duplication list segmentList = append(segmentList, seg) seg = nil break } } LOG.Warn("Logs have same identifier signature. but don't exactly match. signature : %s", current) } signatureSet[current] = append(signatureSet[current], log) } } seg = append(seg, log) } if len(seg) != 0 { segmentList = append(segmentList, seg) } LOG.Info("Barrier matrix split vector to length %d", len(segmentList)) return segmentList } func (barrier *BarrierMatrix) convert(segment []*PartialLogWithCallbak) []*OplogRecord { return barrier.NoopMatrix.convert(segment) }