in lang/py/avro/io.py [0:0]
def read_data(self, writers_schema: avro.schema.Schema, readers_schema: avro.schema.Schema, decoder: "BinaryDecoder") -> object:
# schema matching
if not readers_schema.match(writers_schema):
raise avro.errors.SchemaResolutionException("Schemas do not match.", writers_schema, readers_schema)
logical_type = getattr(writers_schema, "logical_type", None)
# function dispatch for reading data based on type of writer's schema
if isinstance(writers_schema, avro.schema.UnionSchema) and isinstance(readers_schema, avro.schema.UnionSchema):
return self.read_union(writers_schema, readers_schema, decoder)
if isinstance(readers_schema, avro.schema.UnionSchema):
# schema resolution: reader's schema is a union, writer's schema is not
for s in readers_schema.schemas:
if s.match(writers_schema):
return self.read_data(writers_schema, s, decoder)
# This shouldn't happen because of the match check at the start of this method.
raise avro.errors.SchemaResolutionException("Schemas do not match.", writers_schema, readers_schema)
if writers_schema.type == "null":
return None
if writers_schema.type == "boolean":
return decoder.read_boolean()
if writers_schema.type == "string":
return decoder.read_utf8()
if writers_schema.type == "int":
if logical_type == avro.constants.DATE:
return decoder.read_date_from_int()
if logical_type == avro.constants.TIME_MILLIS:
return decoder.read_time_millis_from_int()
return decoder.read_int()
if writers_schema.type == "long":
if logical_type == avro.constants.TIME_MICROS:
return decoder.read_time_micros_from_long()
if logical_type == avro.constants.TIMESTAMP_MILLIS:
return decoder.read_timestamp_millis_from_long()
if logical_type == avro.constants.TIMESTAMP_MICROS:
return decoder.read_timestamp_micros_from_long()
return decoder.read_long()
if writers_schema.type == "float":
return decoder.read_float()
if writers_schema.type == "double":
return decoder.read_double()
if writers_schema.type == "bytes":
if logical_type == "decimal":
precision = writers_schema.get_prop("precision")
if not (isinstance(precision, int) and precision > 0):
warnings.warn(avro.errors.IgnoredLogicalType(f"Invalid decimal precision {precision}. Must be a positive integer."))
return decoder.read_bytes()
scale = writers_schema.get_prop("scale")
if not (isinstance(scale, int) and scale >= 0):
warnings.warn(avro.errors.IgnoredLogicalType(f"Invalid decimal scale {scale}. Must be a non-negative integer."))
return decoder.read_bytes()
return decoder.read_decimal_from_bytes(precision, scale)
return decoder.read_bytes()
if isinstance(writers_schema, avro.schema.FixedSchema) and isinstance(readers_schema, avro.schema.FixedSchema):
if logical_type == "decimal":
precision = writers_schema.get_prop("precision")
if not (isinstance(precision, int) and precision > 0):
warnings.warn(avro.errors.IgnoredLogicalType(f"Invalid decimal precision {precision}. Must be a positive integer."))
return self.read_fixed(writers_schema, readers_schema, decoder)
scale = writers_schema.get_prop("scale")
if not (isinstance(scale, int) and scale >= 0):
warnings.warn(avro.errors.IgnoredLogicalType(f"Invalid decimal scale {scale}. Must be a non-negative integer."))
return self.read_fixed(writers_schema, readers_schema, decoder)
return decoder.read_decimal_from_fixed(precision, scale, writers_schema.size)
return self.read_fixed(writers_schema, readers_schema, decoder)
if isinstance(writers_schema, avro.schema.EnumSchema) and isinstance(readers_schema, avro.schema.EnumSchema):
return self.read_enum(writers_schema, readers_schema, decoder)
if isinstance(writers_schema, avro.schema.ArraySchema) and isinstance(readers_schema, avro.schema.ArraySchema):
return self.read_array(writers_schema, readers_schema, decoder)
if isinstance(writers_schema, avro.schema.MapSchema) and isinstance(readers_schema, avro.schema.MapSchema):
return self.read_map(writers_schema, readers_schema, decoder)
if isinstance(writers_schema, avro.schema.RecordSchema) and isinstance(readers_schema, avro.schema.RecordSchema):
# .type in ["record", "error", "request"]:
return self.read_record(writers_schema, readers_schema, decoder)
raise avro.errors.AvroException(f"Cannot read unknown schema type: {writers_schema.type}")