in lang/py/avro/io.py [0:0]
def write_data(self, writers_schema: avro.schema.Schema, datum: object, encoder: BinaryEncoder) -> None:
# function dispatch to write datum
logical_type = getattr(writers_schema, "logical_type", None)
if writers_schema.type == "null":
if datum is None:
return encoder.write_null(datum)
raise avro.errors.AvroTypeException(writers_schema, datum)
if writers_schema.type == "boolean":
if isinstance(datum, bool):
return encoder.write_boolean(datum)
raise avro.errors.AvroTypeException(writers_schema, datum)
if writers_schema.type == "string":
if isinstance(datum, str):
return encoder.write_utf8(datum)
raise avro.errors.AvroTypeException(writers_schema, datum)
if writers_schema.type == "int":
if logical_type == avro.constants.DATE:
if isinstance(datum, datetime.date):
return encoder.write_date_int(datum)
warnings.warn(avro.errors.IgnoredLogicalType(f"{datum} is not a date type"))
elif logical_type == avro.constants.TIME_MILLIS:
if isinstance(datum, datetime.time):
return encoder.write_time_millis_int(datum)
warnings.warn(avro.errors.IgnoredLogicalType(f"{datum} is not a time type"))
if isinstance(datum, int):
return encoder.write_int(datum)
raise avro.errors.AvroTypeException(writers_schema, datum)
if writers_schema.type == "long":
if logical_type == avro.constants.TIME_MICROS:
if isinstance(datum, datetime.time):
return encoder.write_time_micros_long(datum)
warnings.warn(avro.errors.IgnoredLogicalType(f"{datum} is not a time type"))
elif logical_type == avro.constants.TIMESTAMP_MILLIS:
if isinstance(datum, datetime.datetime):
return encoder.write_timestamp_millis_long(datum)
warnings.warn(avro.errors.IgnoredLogicalType(f"{datum} is not a datetime type"))
elif logical_type == avro.constants.TIMESTAMP_MICROS:
if isinstance(datum, datetime.datetime):
return encoder.write_timestamp_micros_long(datum)
warnings.warn(avro.errors.IgnoredLogicalType(f"{datum} is not a datetime type"))
if isinstance(datum, int):
return encoder.write_long(datum)
raise avro.errors.AvroTypeException(writers_schema, datum)
if writers_schema.type == "float":
if isinstance(datum, (int, float)):
return encoder.write_float(datum)
raise avro.errors.AvroTypeException(writers_schema, datum)
if writers_schema.type == "double":
if isinstance(datum, (int, float)):
return encoder.write_double(datum)
raise avro.errors.AvroTypeException(writers_schema, datum)
if writers_schema.type == "bytes":
if logical_type == "decimal":
scale = writers_schema.get_prop("scale")
if not (isinstance(scale, int) and scale >= 0):
warnings.warn(avro.errors.IgnoredLogicalType(f"Invalid decimal scale {scale}. Must be a non-negative integer."))
elif not isinstance(datum, decimal.Decimal):
warnings.warn(avro.errors.IgnoredLogicalType(f"{datum} is not a decimal type"))
else:
return encoder.write_decimal_bytes(datum, scale)
if isinstance(datum, bytes):
return encoder.write_bytes(datum)
raise avro.errors.AvroTypeException(writers_schema, datum)
if isinstance(writers_schema, avro.schema.FixedSchema):
if logical_type == "decimal":
scale = writers_schema.get_prop("scale")
size = writers_schema.size
if not (isinstance(scale, int) and scale >= 0):
warnings.warn(avro.errors.IgnoredLogicalType(f"Invalid decimal scale {scale}. Must be a non-negative integer."))
elif not isinstance(datum, decimal.Decimal):
warnings.warn(avro.errors.IgnoredLogicalType(f"{datum} is not a decimal type"))
else:
return encoder.write_decimal_fixed(datum, scale, size)
if isinstance(datum, bytes):
return self.write_fixed(writers_schema, datum, encoder)
raise avro.errors.AvroTypeException(writers_schema, datum)
if isinstance(writers_schema, avro.schema.EnumSchema):
if isinstance(datum, str):
return self.write_enum(writers_schema, datum, encoder)
raise avro.errors.AvroTypeException(writers_schema, datum)
if isinstance(writers_schema, avro.schema.ArraySchema):
if isinstance(datum, Sequence):
return self.write_array(writers_schema, datum, encoder)
raise avro.errors.AvroTypeException(writers_schema, datum)
if isinstance(writers_schema, avro.schema.MapSchema):
if isinstance(datum, Mapping):
return self.write_map(writers_schema, datum, encoder)
raise avro.errors.AvroTypeException(writers_schema, datum)
if isinstance(writers_schema, avro.schema.UnionSchema):
return self.write_union(writers_schema, datum, encoder)
if isinstance(writers_schema, avro.schema.RecordSchema):
if isinstance(datum, Mapping):
return self.write_record(writers_schema, datum, encoder)
raise avro.errors.AvroTypeException(writers_schema, datum)
raise avro.errors.AvroException(f"Unknown type: {writers_schema.type}")