func handleAddCmd()

in server/table_handler.go [90:130]


func handleAddCmd(_ http.ResponseWriter, t *tableCmdReq) error {
	fillSQLFormat(t)

	if !checkSQLFormat(t.Output, t.OutputFormat) {
		return fmt.Errorf("incompatible output format. MySQL, Postgres, ClickHouse outputs only support SQL output format")
	}
	if t.Params != "" {
		v := &config.TableParams{}
		err := json.Unmarshal([]byte(t.Params), v)
		if err != nil {
			return fmt.Errorf("invalid table params value: %v", err)
		}
	}
	if t.AutoVersion {
		var err error
		t.Version, err = state.TableMaxVersion(t.Service, t.Cluster, t.DB, t.Table, t.Input, t.Output)
		if err != nil {
			return err
		}
		t.Version++
	}
	if t.PublishSchema != "" && t.Input == types.InputMySQL {
		err := SchemaRegister(t.Service, t.Cluster, t.DB, t.Table, t.Input, t.Output, t.Version, t.OutputFormat, t.PublishSchema, t.CreateTopic && t.Output == "kafka")
		if err != nil {
			return err
		}
	}
	tn, err := config.Get().GetChangelogTopicName(t.Service, t.DB, t.Table, t.Input, t.Output, t.Version, time.Now())
	if err != nil {
		return err
	}
	//TODO: Implement generic interface for handing offsets in pipe
	if err := pipe.DeleteKafkaOffsets(tn, state.GetDB()); err != nil {
		return err
	}
	if !state.RegisterTable(t.Cluster, t.Service, t.DB, t.Table, t.Input, t.Output, t.Version, t.OutputFormat, t.Params) {
		return errors.New("error registering table")
	}

	return nil
}