in pipe/sql.go [248:293]
func (p *sqlConsumer) initTable() error {
var err error
q := "`"
if p.cfg.SQL.Type == postgres {
q = `"`
}
tp := q + util.EscapeQuotes(p.topic, q[0]) + q
for {
p.rows, err = util.QuerySQL(p.conn, "SELECT * from "+tp)
if err != nil {
if strings.Contains(err.Error(), "doesn't exist") {
time.Sleep(200 * time.Millisecond)
continue
}
return err
}
break
}
c, err := p.rows.Columns()
if err != nil {
return err
}
t, err := p.rows.ColumnTypes()
if err != nil {
return err
}
p.row = make([]interface{}, len(c))
p.insert = "INSERT INTO " + tp + " ("
for i, f := range c {
if i != 0 {
p.insert += ","
}
p.insert += f
dt := strings.ToLower(t[i].DatabaseTypeName())
if p.cfg.SQL.Type == postgres {
p.row[i] = util.PostgresToDriverType(dt)
} else if p.cfg.SQL.Type == clickhouse {
p.row[i] = util.ClickHouseToDriverType(dt)
} else { //default is mysql
p.row[i] = util.MySQLToDriverType(dt, "")
}
}
p.insert += ") VALUES ("
p.inited = true
return nil
}