datahub/schemaclient.go (76 lines of code) (raw):
package datahub
import (
"sync"
)
type schemaRegistryClient struct {
dh DataHubApi
schemaMap sync.Map
versionMap sync.Map
}
func NewSchemaClient(dh DataHubApi) *schemaRegistryClient {
return &schemaRegistryClient{
dh: dh,
}
}
func (client *schemaRegistryClient) getSchemaByVersion(project, topic string, version int) (*RecordSchema, error) {
key := client.genKey(project, topic)
value, ok := client.versionMap.Load(key)
if !ok {
m := sync.Map{}
client.versionMap.Store(key, m)
value, _ = client.versionMap.Load(key)
}
topicCache := value.(sync.Map)
schemaVal, ok := topicCache.Load(version)
if !ok {
newSchema, err := client.fetchSchemaById(project, topic, version)
if err != nil {
return nil, err
}
schemaVal = *newSchema
topicCache.Store(version, schemaVal)
client.versionMap.Store(key, topicCache)
}
schema := schemaVal.(RecordSchema)
return &schema, nil
}
func (client *schemaRegistryClient) getVersionBySchema(project, topic string, schema *RecordSchema) (int, error) {
key := client.genKey(project, topic)
value, ok := client.schemaMap.Load(key)
if !ok {
m := sync.Map{}
client.schemaMap.Store(key, m)
value, _ = client.schemaMap.Load(key)
}
topicCache := value.(sync.Map)
schemaKey := schema.String()
versionVal, ok := topicCache.Load(schemaKey)
if !ok {
newVersion, err := client.fetchVersionBySchema(project, topic, schema)
if err != nil {
return -1, err
}
versionVal = newVersion
topicCache.Store(schemaKey, versionVal)
client.schemaMap.Store(key, topicCache)
}
version := versionVal.(int)
return version, nil
}
func (client *schemaRegistryClient) fetchSchemaById(project, topic string, version int) (*RecordSchema, error) {
ret, err := client.dh.GetTopicSchemaByVersion(project, topic, version)
if err != nil {
return nil, err
}
return &ret.RecordSchema, nil
}
func (client *schemaRegistryClient) fetchVersionBySchema(project, topic string, schema *RecordSchema) (int, error) {
ret, err := client.dh.GetTopicSchemaBySchema(project, topic, schema)
if err != nil {
return -1, err
}
return ret.VersionId, nil
}
func (client *schemaRegistryClient) genKey(project, topic string) string {
return project + "/" + topic
}