func()

in collector/filter/orphan_filter.go [38:92]


func (filter *OrphanFilter) Filter(docD bson.D, namespace string) bool {
	if filter.chunkMap == nil {
		LOG.Warn("chunk map is nil")
		return false
	}

	shardCol, hasChunk := filter.chunkMap[namespace]
	if !hasChunk {
		return false
	}

NextChunk:
	for _, chunkRage := range shardCol.Chunks {
		// check greater and equal than the minimum of the chunk range
		for keyInd, keyName := range shardCol.Keys {
			key := oplog.GetKey(docD, keyName)
			if key == nil {
				LOG.Crashf("OrphanFilter find no shard key[%v] in doc %v", keyName, docD)
			}
			if shardCol.ShardType == sharding.HashedShard {
				key = ComputeHash(key)
			}
			if chunkLt(key, chunkRage.Mins[keyInd]) {
				continue NextChunk
			}
			if chunkGt(key, chunkRage.Mins[keyInd]) {
				break
			}
		}
		// check less than the maximum of the chunk range
		for keyInd, keyName := range shardCol.Keys {
			key := oplog.GetKey(docD, keyName)
			if key == nil {
				LOG.Crashf("OrphanFilter find no shard ke[%v] in doc %v", keyName, docD)
			}
			if shardCol.ShardType == sharding.HashedShard {
				key = ComputeHash(key)
			}
			if chunkGt(key, chunkRage.Maxs[keyInd]) {
				continue NextChunk
			}
			if chunkLt(key, chunkRage.Maxs[keyInd]) {
				break
			}
			if keyInd == len(shardCol.Keys)-1 {
				continue NextChunk
			}
		}
		// current key in the chunk, therefore dont filter
		return false
	}
	LOG.Warn("document syncer %v filter orphan document %v with shard key %v in ns[%v]",
		filter.replset, docD, shardCol.Keys, namespace)
	return true
}