in server/table_handler.go [90:130]
func handleAddCmd(_ http.ResponseWriter, t *tableCmdReq) error {
fillSQLFormat(t)
if !checkSQLFormat(t.Output, t.OutputFormat) {
return fmt.Errorf("incompatible output format. MySQL, Postgres, ClickHouse outputs only support SQL output format")
}
if t.Params != "" {
v := &config.TableParams{}
err := json.Unmarshal([]byte(t.Params), v)
if err != nil {
return fmt.Errorf("invalid table params value: %v", err)
}
}
if t.AutoVersion {
var err error
t.Version, err = state.TableMaxVersion(t.Service, t.Cluster, t.DB, t.Table, t.Input, t.Output)
if err != nil {
return err
}
t.Version++
}
if t.PublishSchema != "" && t.Input == types.InputMySQL {
err := SchemaRegister(t.Service, t.Cluster, t.DB, t.Table, t.Input, t.Output, t.Version, t.OutputFormat, t.PublishSchema, t.CreateTopic && t.Output == "kafka")
if err != nil {
return err
}
}
tn, err := config.Get().GetChangelogTopicName(t.Service, t.DB, t.Table, t.Input, t.Output, t.Version, time.Now())
if err != nil {
return err
}
//TODO: Implement generic interface for handing offsets in pipe
if err := pipe.DeleteKafkaOffsets(tn, state.GetDB()); err != nil {
return err
}
if !state.RegisterTable(t.Cluster, t.Service, t.DB, t.Table, t.Input, t.Output, t.Version, t.OutputFormat, t.Params) {
return errors.New("error registering table")
}
return nil
}