in server/schema_handler.go [47:88]
func SchemaRegister(svc string, cluster string, sdb string, table string, inputType string, output string, version int, formatType string, dst string, createTopic bool) error {
avroSchema, err := schema.ConvertToAvro(&db.Loc{Cluster: cluster, Service: svc, Name: sdb}, table, inputType, formatType)
if err != nil {
return err
}
outputSchemaName, err := encoder.GetOutputSchemaName(svc, sdb, table, inputType, output, version)
if err != nil {
return err
}
if dst == "state" || dst == "all" {
err = state.InsertSchema(outputSchemaName, formatType, string(avroSchema))
if err != nil {
return err
}
}
if createTopic {
tm := time.Now()
c, err := config.Get().GetChangelogTopicName(svc, sdb, table, inputType, "kafka", version, tm)
if err != nil {
return err
}
err = createKafkaTopic(c, inputType, svc, sdb, table)
if err != nil {
return err
}
o, err := config.Get().GetOutputTopicName(svc, sdb, table, inputType, "kafka", version, tm)
if err != nil {
return err
}
err = createKafkaTopic(o, inputType, svc, sdb, table)
if err != nil {
return err
}
}
log.Infof("AvroSchema registered for(%v,%v, %v,%v,%v,%v,%v) = %s", svc, cluster, sdb, table, inputType, output, version, avroSchema)
return nil
}