func()

in changelog/mysql.go [313:392]


func (b *mysqlReader) reloadState() bool {
	st, err := state.GetCond("cluster=? AND input='mysql'", b.dbl.Cluster)
	if err != nil {
		b.log.Errorf("Failed to read state, Error: %v", err.Error())
		return false
	}

	b.log.Debugf("reloadState")

	for i := 0; i < len(st); i++ {
		t := &st[i]

		if b.tables[t.DB] == nil {
			b.tables[t.DB] = make(map[string]map[string][]*table)
		}

		if b.seqNo == 0 {
			b.seqNo = t.SeqNo
		}

		if b.tables[t.DB][t.Table] == nil {
			b.tables[t.DB][t.Table] = make(map[string][]*table)
		}

		if b.tables[t.DB][t.Table][t.Service] == nil {
			b.tables[t.DB][t.Table][t.Service] = make([]*table, 0)
		}

		tver := b.tables[t.DB][t.Table][t.Service]

		j := findInVersionArray(tver, t.Output, t.Version)

		if j < len(tver) {
			/* Table was deleted and inserted again. Reinitialize */
			if tver[j].id != t.ID {
				err = tver[j].producer.Close()
				log.EL(b.log, err)
				tver[j] = tver[len(tver)-1]
				tver[len(tver)-1] = nil
				b.tables[t.DB][t.Table][t.Service] = tver[:len(tver)-1]
			} else {
				tver[j].dead = false
				if t.SnapshotTimeChanged(tver[j].snapshottedAt) {
					pn, err := config.Get().GetChangelogTopicName(t.Service, t.DB, t.Table, t.Input, t.Output, t.Version, t.SnapshottedAt)
					if log.EL(b.log, err) {
						return false
					}

					err = tver[j].producer.Close()
					log.EL(b.log, err)

					tver[j].producer, err = b.createProducer(pn, t)
					if log.EL(b.log, err) {
						return false
					}
					tver[j].snapshottedAt = t.SnapshottedAt
				}
			}
		} else if !b.addNewTable(t) {
			return false
		}
	}

	c := b.removeDeletedTables()

	b.metrics.NumTablesIngesting.Set(int64(c))

	if b.bufPipe.Type() == "local" && b.tpool != nil {
		b.tpool.Adjust(c + 1)
	}

	if c == 0 {
		log.Debugf("No tables remaining. Finish binlog reader")
		return false
	}

	b.numTables = int(c)

	return true
}