def _read_default_value()

in lang/py/avro/io.py [0:0]


    def _read_default_value(self, field_schema: avro.schema.Schema, default_value: object) -> object:
        """
        Basically a JSON Decoder?
        """
        if field_schema.type == "null":
            if default_value is None:
                return None
            raise avro.errors.InvalidDefaultException(field_schema, default_value)
        if field_schema.type == "boolean":
            return bool(default_value)
        if field_schema.type in ("int", "long"):
            if isinstance(default_value, int):
                return default_value
            raise avro.errors.InvalidDefaultException(field_schema, default_value)
        if field_schema.type in ("float", "double"):
            if isinstance(default_value, float):
                return default_value
            raise avro.errors.InvalidDefaultException(field_schema, default_value)
        if field_schema.type in ("bytes", "fixed"):
            if isinstance(default_value, bytes):
                return default_value
            if isinstance(default_value, str):
                return default_value.encode()
            raise avro.errors.InvalidDefaultException(field_schema, default_value)
        if field_schema.type in ("enum", "string"):
            if isinstance(default_value, str):
                return default_value
            raise avro.errors.InvalidDefaultException(field_schema, default_value)
        if isinstance(field_schema, avro.schema.ArraySchema):
            if isinstance(default_value, Iterable):
                return [self._read_default_value(field_schema.items, json_val) for json_val in default_value]
            raise avro.errors.InvalidDefaultException(field_schema, default_value)
        if isinstance(field_schema, avro.schema.MapSchema):
            if isinstance(default_value, Mapping):
                return {key: self._read_default_value(field_schema.values, json_val) for key, json_val in default_value.items()}
            raise avro.errors.InvalidDefaultException(field_schema, default_value)
        if isinstance(field_schema, avro.schema.UnionSchema):
            return self._read_default_value(field_schema.schemas[0], default_value)
        if isinstance(field_schema, avro.schema.RecordSchema):
            if not isinstance(default_value, Mapping):
                raise avro.errors.InvalidDefaultException(field_schema, default_value)
            read_record = {}
            for field in field_schema.fields:
                json_val = default_value.get(field.name)
                if json_val is None:
                    json_val = field.default
                field_val = self._read_default_value(field.type, json_val)
                read_record[field.name] = field_val
            return read_record
        raise avro.errors.AvroException(f"Unknown type: {field_schema.type}")