in mysqlx-connector-python/lib/mysqlx/protocol.py [0:0]
def _create_any(self, arg: Any) -> Optional[MessageType]:
"""Create any.
Args:
arg (object): Arbitrary object.
Returns:
mysqlx.protobuf.Message: MySQL X Protobuf Message.
"""
if isinstance(arg, str):
value = Message("Mysqlx.Datatypes.Scalar.String", value=arg)
scalar = Message("Mysqlx.Datatypes.Scalar", type=8, v_string=value)
return Message("Mysqlx.Datatypes.Any", type=1, scalar=scalar)
if isinstance(arg, bool):
return Message(
"Mysqlx.Datatypes.Any", type=1, scalar=build_bool_scalar(arg)
)
if isinstance(arg, int):
if arg < 0:
return Message(
"Mysqlx.Datatypes.Any",
type=1,
scalar=build_int_scalar(arg),
)
return Message(
"Mysqlx.Datatypes.Any",
type=1,
scalar=build_unsigned_int_scalar(arg),
)
if isinstance(arg, tuple) and len(arg) == 2:
arg_key, arg_value = arg
obj_fld = Message(
"Mysqlx.Datatypes.Object.ObjectField",
key=arg_key,
value=self._create_any(arg_value),
)
obj = Message("Mysqlx.Datatypes.Object", fld=[obj_fld.get_message()])
return Message("Mysqlx.Datatypes.Any", type=2, obj=obj)
if isinstance(arg, dict) or (
isinstance(arg, (list, tuple)) and isinstance(arg[0], dict)
):
array_values = []
for items in arg:
obj_flds = []
for key, value in items.items():
# Array can only handle Any types, Mysqlx.Datatypes.Any.obj
obj_fld = Message(
"Mysqlx.Datatypes.Object.ObjectField",
key=key,
value=self._create_any(value),
)
obj_flds.append(obj_fld.get_message())
msg_obj = Message("Mysqlx.Datatypes.Object", fld=obj_flds)
msg_any = Message("Mysqlx.Datatypes.Any", type=2, obj=msg_obj)
array_values.append(msg_any.get_message())
msg = Message("Mysqlx.Datatypes.Array")
msg["value"] = array_values
return Message("Mysqlx.Datatypes.Any", type=3, array=msg)
if isinstance(arg, list):
obj_flds = []
for key, value in arg:
obj_fld = Message(
"Mysqlx.Datatypes.Object.ObjectField",
key=key,
value=self._create_any(value),
)
obj_flds.append(obj_fld.get_message())
msg_obj = Message("Mysqlx.Datatypes.Object", fld=obj_flds)
msg_any = Message("Mysqlx.Datatypes.Any", type=2, obj=msg_obj)
return msg_any
return None