func SchemaRegister()

in server/schema_handler.go [47:88]


func SchemaRegister(svc string, cluster string, sdb string, table string, inputType string, output string, version int, formatType string, dst string, createTopic bool) error {
	avroSchema, err := schema.ConvertToAvro(&db.Loc{Cluster: cluster, Service: svc, Name: sdb}, table, inputType, formatType)
	if err != nil {
		return err
	}

	outputSchemaName, err := encoder.GetOutputSchemaName(svc, sdb, table, inputType, output, version)
	if err != nil {
		return err
	}

	if dst == "state" || dst == "all" {
		err = state.InsertSchema(outputSchemaName, formatType, string(avroSchema))
		if err != nil {
			return err
		}
	}

	if createTopic {
		tm := time.Now()
		c, err := config.Get().GetChangelogTopicName(svc, sdb, table, inputType, "kafka", version, tm)
		if err != nil {
			return err
		}
		err = createKafkaTopic(c, inputType, svc, sdb, table)
		if err != nil {
			return err
		}

		o, err := config.Get().GetOutputTopicName(svc, sdb, table, inputType, "kafka", version, tm)
		if err != nil {
			return err
		}
		err = createKafkaTopic(o, inputType, svc, sdb, table)
		if err != nil {
			return err
		}
	}

	log.Infof("AvroSchema registered for(%v,%v, %v,%v,%v,%v,%v) = %s", svc, cluster, sdb, table, inputType, output, version, avroSchema)
	return nil
}