in pulsar/schema/definition.py [0:0]
def schema_info(cls, defined_names):
namespace_prefix = ''
if cls._avro_namespace is not None:
namespace_prefix = cls._avro_namespace + '.'
namespace_name = namespace_prefix + cls.__name__
if namespace_name in defined_names:
return namespace_name
defined_names.add(namespace_name)
schema = {
'type': 'record',
'name': str(cls.__name__)
}
if cls._avro_namespace is not None:
schema['namespace'] = cls._avro_namespace
schema['fields'] = []
def get_filed_default_value(value):
if isinstance(value, Enum):
return value.name
else:
return value
if cls._sorted_fields:
fields = sorted(cls._fields.keys())
else:
fields = cls._fields.keys()
for name in fields:
field = cls._fields[name]
field_type = field.schema_info(defined_names) \
if field._required else ['null', field.schema_info(defined_names)]
schema['fields'].append({
'name': name,
'default': get_filed_default_value(field.default()),
'type': field_type
}) if field.required_default() else schema['fields'].append({
'name': name,
'type': field_type,
})
return schema