in redshift_connector/core.py [0:0]
def array_inspect(self: "Connection", value):
# Check if array has any values. If empty, we can just assume it's an
# array of strings
first_element = array_find_first_element(value)
if first_element is None:
oid: int = 25
# Use binary ARRAY format to avoid having to properly
# escape text in the array literals
fc: int = FC_BINARY
array_oid: int = pg_array_types[oid]
else:
# supported array output
typ: type = type(first_element)
if issubclass(typ, int):
# special int array support -- send as smallest possible array
# type
typ = int
int2_ok, int4_ok, int8_ok = True, True, True
for v in array_flatten(value):
if v is None:
continue
if min_int2 < v < max_int2:
continue
int2_ok = False
if min_int4 < v < max_int4:
continue
int4_ok = False
if min_int8 < v < max_int8:
continue
int8_ok = False
if int2_ok:
array_oid = 1005 # INT2[]
oid, fc, send_func = (21, FC_BINARY, h_pack)
elif int4_ok:
array_oid = 1007 # INT4[]
oid, fc, send_func = (23, FC_BINARY, i_pack)
elif int8_ok:
array_oid = 1016 # INT8[]
oid, fc, send_func = (20, FC_BINARY, q_pack)
else:
raise ArrayContentNotSupportedError("numeric not supported as array contents")
else:
try:
oid, fc, send_func = self.make_params((first_element,))[0]
# If unknown or string, assume it's a string array
if oid in (705, 1043, 25):
oid = 25
# Use binary ARRAY format to avoid having to properly
# escape text in the array literals
fc = FC_BINARY
array_oid = pg_array_types[oid]
except KeyError:
raise ArrayContentNotSupportedError("oid " + str(oid) + " not supported as array contents")
except NotSupportedError:
raise ArrayContentNotSupportedError("type " + str(typ) + " not supported as array contents")
if fc == FC_BINARY:
def send_array(arr: typing.List) -> typing.Union[bytes, bytearray]:
# check that all array dimensions are consistent
array_check_dimensions(arr)
has_null: bool = array_has_null(arr)
dim_lengths: typing.List[int] = array_dim_lengths(arr)
data: bytearray = bytearray(iii_pack(len(dim_lengths), has_null, oid))
for i in dim_lengths:
data.extend(ii_pack(i, 1))
for v in array_flatten(arr):
if v is None:
data += i_pack(-1)
elif isinstance(v, typ):
inner_data = send_func(v)
data += i_pack(len(inner_data))
data += inner_data
else:
raise ArrayContentNotHomogenousError("not all array elements are of type " + str(typ))
return data
else:
def send_array(arr: typing.List) -> typing.Union[bytes, bytearray]:
array_check_dimensions(arr)
ar: typing.List = deepcopy(arr)
for a, i, v in walk_array(ar):
if v is None:
a[i] = "NULL"
elif isinstance(v, typ):
a[i] = send_func(v).decode("ascii")
else:
raise ArrayContentNotHomogenousError("not all array elements are of type " + str(typ))
return str(ar).translate(arr_trans).encode("ascii")
return (array_oid, fc, send_array)