oplog/hasher.go (130 lines of code) (raw):

package oplog import ( LOG "github.com/vinllen/log4go" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/bson/primitive" ) const ( ShardByID = "id" ShardByNamespace = "collection" ShardAutomatic = "auto" ) const ( DefaultHashValue = 0 ) type Hasher interface { DistributeOplogByMod(log *PartialLog, mod int) uint32 } /*********************************************/ // PrimaryKeyHasher type TableHasher struct { Hasher } func (collectionHasher *TableHasher) DistributeOplogByMod(log *PartialLog, mod int) uint32 { if mod == 1 { return 0 } if len(log.Namespace) == 0 { return DefaultHashValue } // when oplog is DDL, go into worker 0. if log.Operation == "c" { return 0 } return stringHashValue(log.Namespace) % uint32(mod) } /*********************************************/ // PrimaryKeyHasher type PrimaryKeyHasher struct { Hasher } // we need to ensure that oplog entry will be sent to the same job[$hash] // if they have the same ObjectID. thus we can consume the oplog entry // sequentially func (objectIdHasher *PrimaryKeyHasher) DistributeOplogByMod(log *PartialLog, mod int) uint32 { if mod == 1 { return 0 } var hashObject interface{} switch log.Operation { case "i", "d", "u", "c": hashObject = GetIdOrNSFromOplog(log) case "n": return DefaultHashValue } if hashObject == nil { LOG.Warn("Couldn't extract hash object. collector has mixed up. use Oplog.Namespace instead %v", log) hashObject = log.Namespace } return Hash(hashObject) % uint32(mod) } /*********************************************/ // WhiteListObjectIdHasher: hash by collection in general, when hit white list, hash by _id type WhiteListObjectIdHasher struct { Hasher TableHasher PrimaryKeyHasher whiteList map[string]struct{} // no need to add lock, only reading operation } func NewWhiteListObjectIdHasher(whiteList []string) *WhiteListObjectIdHasher { mp := make(map[string]struct{}, len(whiteList)) for _, ele := range whiteList { mp[ele] = struct{}{} } return &WhiteListObjectIdHasher{ TableHasher: TableHasher{}, PrimaryKeyHasher: PrimaryKeyHasher{}, whiteList: mp, } } func (wloi *WhiteListObjectIdHasher) DistributeOplogByMod(log *PartialLog, mod int) uint32 { ns := log.Namespace if len(ns) == 0 { return DefaultHashValue } if _, ok := wloi.whiteList[ns]; ok { return wloi.PrimaryKeyHasher.DistributeOplogByMod(log, mod) } return wloi.TableHasher.DistributeOplogByMod(log, mod) } /*********************************************/ func getValueFromBsonD(obj bson.D, key string) (interface{}, bool) { for _, ele := range obj { if ele.Key == key { return ele.Value, true } } return nil, false } func GetIdOrNSFromOplog(log *PartialLog) interface{} { switch log.Operation { case "i", "d": return GetKey(log.Object, "") case "u": if id, ok := getValueFromBsonD(log.Query, "_id"); ok { return id } else { return GetKey(log.Object, "") } case "c": return log.Namespace default: LOG.Critical("Unrecognized oplog object operation %s", log.Operation) } return log.Namespace } func stringHashValue(s string) uint32 { // consult from Java String.hashcode() var hashValue uint32 for _, c := range s { hashValue = 31*hashValue + uint32(c) } if hashValue < 0 { return -hashValue } return hashValue } func Hash(hashObject interface{}) uint32 { switch object := hashObject.(type) { case primitive.ObjectID: return stringHashValue(object.Hex()) case string: return stringHashValue(object) case int: return uint32(object) case nil: LOG.Warn("Hash object is NIL. use default value %d", DefaultHashValue) default: LOG.Warn("Hash object is UNKNOWN type[%T], value is [%v]. use default value %d", hashObject, hashObject, DefaultHashValue) } return DefaultHashValue }