in pkg/controller/topictransfer/topictransfer_controller.go [427:447]
func isConsumeFinished(output string, topic string, cluster string) bool {
lines := strings.Split(output, "\n")
brokers := getClusterBrokerNames(cluster)
for i := 1; i < len(lines); i++ {
fields := strings.Fields(strings.TrimSpace(lines[i]))
if len(fields) > cons.Diff {
for _, broker := range brokers {
log.Info("broker = " + broker)
log.Info("fields[cons.Topic] = " + fields[cons.Topic] + " , in line " + strconv.Itoa(i))
log.Info("fields[cons.BrokerName] = " + fields[cons.BrokerName] + " , in line " + strconv.Itoa(i))
log.Info("fields[cons.Diff] = " + fields[cons.Diff] + " , in line " + strconv.Itoa(i))
if fields[cons.Topic] == topic && fields[cons.BrokerName] == broker {
if fields[cons.Diff] != "0" {
return false
}
}
}
}
}
return true
}