in collector/filter/orphan_filter.go [38:92]
func (filter *OrphanFilter) Filter(docD bson.D, namespace string) bool {
if filter.chunkMap == nil {
LOG.Warn("chunk map is nil")
return false
}
shardCol, hasChunk := filter.chunkMap[namespace]
if !hasChunk {
return false
}
NextChunk:
for _, chunkRage := range shardCol.Chunks {
// check greater and equal than the minimum of the chunk range
for keyInd, keyName := range shardCol.Keys {
key := oplog.GetKey(docD, keyName)
if key == nil {
LOG.Crashf("OrphanFilter find no shard key[%v] in doc %v", keyName, docD)
}
if shardCol.ShardType == sharding.HashedShard {
key = ComputeHash(key)
}
if chunkLt(key, chunkRage.Mins[keyInd]) {
continue NextChunk
}
if chunkGt(key, chunkRage.Mins[keyInd]) {
break
}
}
// check less than the maximum of the chunk range
for keyInd, keyName := range shardCol.Keys {
key := oplog.GetKey(docD, keyName)
if key == nil {
LOG.Crashf("OrphanFilter find no shard ke[%v] in doc %v", keyName, docD)
}
if shardCol.ShardType == sharding.HashedShard {
key = ComputeHash(key)
}
if chunkGt(key, chunkRage.Maxs[keyInd]) {
continue NextChunk
}
if chunkLt(key, chunkRage.Maxs[keyInd]) {
break
}
if keyInd == len(shardCol.Keys)-1 {
continue NextChunk
}
}
// current key in the chunk, therefore dont filter
return false
}
LOG.Warn("document syncer %v filter orphan document %v with shard key %v in ns[%v]",
filter.replset, docD, shardCol.Keys, namespace)
return true
}