in collector/filter/oplog_filter.go [147:286]
func (filter *NamespaceFilter) Filter(log *oplog.PartialLog) bool {
var result bool
LOG.Debug("NamespaceFilter check oplog:%v", log.Object)
db := strings.SplitN(log.Namespace, ".", 2)[0]
if log.Operation != "c" {
// DML
// {"op" : "i", "ns" : "my.system.indexes", "o" : { "v" : 2, "key" : { "date" : 1 }, "name" : "date_1", "ns" : "my.tbl", "expireAfterSeconds" : 3600 }
if strings.HasSuffix(log.Namespace, "system.indexes") {
// DDL: change log.Namespace to ns of object, in order to do filter with real namespace
ns := log.Namespace
log.Namespace = oplog.GetKey(log.Object, "ns").(string)
result = filter.filter(log)
log.Namespace = ns
} else {
result = filter.filter(log)
}
return result
} else {
// DDL
operation, found := oplog.ExtraCommandName(log.Object)
if !found {
LOG.Warn("extraCommandName meets type[%s] which is not implemented, ignore!", operation)
return false
}
switch operation {
// startIndexBuild,abortIndexBuild,commitIndexBuild waw introduced in 4.4, first two command are ignored
// commitIndexBuild may have mutil indexes which need change to CreateIndexes command
case "startIndexBuild":
fallthrough
case "abortIndexBuild":
return true
case "commitIndexBuild":
fallthrough
case "create":
fallthrough
case "createIndexes":
fallthrough
case "collMod":
fallthrough
case "drop":
fallthrough
case "deleteIndex":
fallthrough
case "deleteIndexes":
fallthrough
case "dropIndex":
fallthrough
case "dropIndexes":
fallthrough
case "convertToCapped":
fallthrough
case "emptycapped":
col, ok := oplog.GetKey(log.Object, operation).(string)
if !ok {
LOG.Warn("extraCommandName meets illegal %v oplog %v, ignore!", operation, log.Object)
return false
}
log.Namespace = fmt.Sprintf("%s.%s", db, col)
return filter.filter(log)
case "renameCollection":
// { "renameCollection" : "my.tbl", "to" : "my.my", "stayTemp" : false, "dropTarget" : false }
ns, ok := oplog.GetKey(log.Object, operation).(string)
if !ok {
LOG.Warn("extraCommandName meets illegal %v oplog %v, ignore!", operation, log.Object)
return false
}
log.Namespace = ns
return filter.filter(log)
case "applyOps":
return false
// TODO move all extra and filter to individual func
//var ops []bson.D
//var remainOps []interface{} // return []interface{}
//
//LOG.Info("log.Object:%v , type:%v", log.Object, reflect.TypeOf(oplog.GetKey(log.Object, "applyOps")))
//// it's very strange, some documents are []interface, some are []bson.D
//switch v := oplog.GetKey(log.Object, "applyOps").(type) {
//case []interface{}:
// LOG.Info("Fiter interface %v\n", v)
// for _, ele := range v {
// ops = append(ops, ele.(bson.D))
// }
//case []bson.D:
// LOG.Info("Fiter bson.D %v\n", v)
// ops = v
//case primitive.A:
// for i, ele := range v {
// ops = append(ops, ele.(bson.D))
//
// LOG.Info("Fiter primitive.A type %v, ele:%v", reflect.TypeOf(ele), ele)
// is_filter := false
// for _, ele1 := range ele.(bson.D) {
// if ele1.Key == "ns" && filter.FilterNs(ele1.Value.(string)) {
// LOG.Info("Fiter primitive.A fitler:%v", ele1.Value)
// is_filter = true
// break
// }
// }
// if is_filter {
// for j, ele1 := range ele.(bson.D) {
// if ele1.Key == "op" {
// //ele1.Value = "n"
// v[i].(bson.D)[j].Value = "n"
// }
// }
//
// LOG.Info("Fiter zhangst result:%v", ele)
// }
// }
//default:
// LOG.Error("unknow applyOps type, log:%v", log.Object)
//}
//
//LOG.Info("Fiter primitive.A ops:%v", ops)
//// except field 'o'
//except := map[string]struct{}{
// "o": {},
//}
//
//for _, ele := range ops {
// m, _ := oplog.ConvertBsonD2MExcept(ele, except)
// subLog := oplog.NewPartialLog(m)
// if ok := filter.Filter(subLog); !ok {
// remainOps = append(remainOps, ele)
// }
//}
//oplog.SetFiled(log.Object, "applyOps", remainOps)
//
//LOG.Info("NamespaceFilter applyOps filter?[%v], remainOps: %v", len(remainOps) == 0, remainOps)
//return len(remainOps) == 0
default:
// such as: dropDatabase
return filter.filter(log)
}
}
}