func()

in pipe/sql.go [248:293]


func (p *sqlConsumer) initTable() error {
	var err error
	q := "`"
	if p.cfg.SQL.Type == postgres {
		q = `"`
	}
	tp := q + util.EscapeQuotes(p.topic, q[0]) + q
	for {
		p.rows, err = util.QuerySQL(p.conn, "SELECT * from "+tp)
		if err != nil {
			if strings.Contains(err.Error(), "doesn't exist") {
				time.Sleep(200 * time.Millisecond)
				continue
			}
			return err
		}
		break
	}
	c, err := p.rows.Columns()
	if err != nil {
		return err
	}
	t, err := p.rows.ColumnTypes()
	if err != nil {
		return err
	}
	p.row = make([]interface{}, len(c))
	p.insert = "INSERT INTO " + tp + " ("
	for i, f := range c {
		if i != 0 {
			p.insert += ","
		}
		p.insert += f
		dt := strings.ToLower(t[i].DatabaseTypeName())
		if p.cfg.SQL.Type == postgres {
			p.row[i] = util.PostgresToDriverType(dt)
		} else if p.cfg.SQL.Type == clickhouse {
			p.row[i] = util.ClickHouseToDriverType(dt)
		} else { //default is mysql
			p.row[i] = util.MySQLToDriverType(dt, "")
		}
	}
	p.insert += ") VALUES ("
	p.inited = true
	return nil
}