datahub/batch/schema_registry_client.py (77 lines of code) (raw):
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import threading
from ..models import RecordSchema
from ..exceptions import ResourceNotFoundException
class SchemaRegistryClient:
def __init__(self, datahub_client):
self._datahub_client = datahub_client
self._cache = dict()
self._cache_lock = threading.Lock()
self._lock = threading.Lock()
def get_version_id(self, project_name, topic_name, schema):
schema_meta = self.__get_schema_meta(project_name, topic_name)
try:
result = schema_meta.get_version_id(schema)
except ResourceNotFoundException:
self.__update(project_name, topic_name, schema_meta)
result = schema_meta.get_version_id(schema)
except Exception as e:
raise ResourceNotFoundException("VersionId not found. project: {}, topic: {}, schema: {}. {}"
.format(project_name, topic_name, schema.to_json_string(), e))
return result
def get_schema(self, project_name, topic_name, version_id):
schema_meta = self.__get_schema_meta(project_name, topic_name)
try:
result = schema_meta.get_schema(version_id)
except ResourceNotFoundException:
self.__update(project_name, topic_name, schema_meta)
result = schema_meta.get_schema(version_id)
except Exception as e:
raise ResourceNotFoundException("Schema not found. project: {}, topic: {}, version_id: {}. {}"
.format(project_name, topic_name, version_id, e))
return result
def __get_schema_meta(self, project_name, topic_name):
key = self.__gen_key(project_name, topic_name)
if key not in self._cache:
with self._cache_lock:
if key not in self._cache:
self._cache[key] = SchemaMeta()
return self._cache.get(key)
def __update(self, project_name, topic_name, schema_meta):
with self._lock:
page_num, page_size = 0, 100
while True:
page_num += 1
list_schema = self._datahub_client.list_topic_schema(project_name, topic_name, page_num, page_size)
for record_schema in list_schema.record_schema_list:
schema_meta.add(record_schema.get("VersionId"), RecordSchema.from_json_str(record_schema.get("RecordSchema")))
if page_num >= list_schema.page_count:
break
def __gen_key(self, project_name, topic_name):
return "{}/{}".format(project_name, topic_name)
class SchemaMeta:
def __init__(self):
self._version_schema_map = dict() # version_string --> schema_string
self._schema_version_map = dict() # schema_string --> version_string
self._version_lock = threading.Lock()
self._schema_lock = threading.Lock()
def get_schema(self, version_id):
with self._schema_lock:
if version_id == -1:
if len(self._version_schema_map) == 0:
raise ResourceNotFoundException("VersionSchemaMap is empty. version_id = -1")
schema_str = list(iter(self._version_schema_map.values()))[-1]
else:
schema_str = self._version_schema_map.get(version_id)
if schema_str is None:
raise ResourceNotFoundException("Schema not found with the specified version_id {}.".format(version_id))
return RecordSchema.from_json_str(schema_str)
def get_version_id(self, schema):
with self._version_lock:
version_id = self._schema_version_map.get(schema.to_json_string())
if version_id is None:
raise ResourceNotFoundException("VersionId not found with the specified schema {}.".format(schema.to_json_string()))
return version_id
def add(self, version_id, schema):
schema_str = schema.to_json_string()
self._version_schema_map[version_id] = schema_str
self._schema_version_map[schema_str] = version_id