def array_inspect()

in redshift_connector/core.py [0:0]


    def array_inspect(self: "Connection", value):
        # Check if array has any values. If empty, we can just assume it's an
        # array of strings
        first_element = array_find_first_element(value)
        if first_element is None:
            oid: int = 25
            # Use binary ARRAY format to avoid having to properly
            # escape text in the array literals
            fc: int = FC_BINARY
            array_oid: int = pg_array_types[oid]
        else:
            # supported array output
            typ: type = type(first_element)

            if issubclass(typ, int):
                # special int array support -- send as smallest possible array
                # type
                typ = int
                int2_ok, int4_ok, int8_ok = True, True, True
                for v in array_flatten(value):
                    if v is None:
                        continue
                    if min_int2 < v < max_int2:
                        continue
                    int2_ok = False
                    if min_int4 < v < max_int4:
                        continue
                    int4_ok = False
                    if min_int8 < v < max_int8:
                        continue
                    int8_ok = False
                if int2_ok:
                    array_oid = 1005  # INT2[]
                    oid, fc, send_func = (21, FC_BINARY, h_pack)
                elif int4_ok:
                    array_oid = 1007  # INT4[]
                    oid, fc, send_func = (23, FC_BINARY, i_pack)
                elif int8_ok:
                    array_oid = 1016  # INT8[]
                    oid, fc, send_func = (20, FC_BINARY, q_pack)
                else:
                    raise ArrayContentNotSupportedError("numeric not supported as array contents")
            else:
                try:
                    oid, fc, send_func = self.make_params((first_element,))[0]

                    # If unknown or string, assume it's a string array
                    if oid in (705, 1043, 25):
                        oid = 25
                        # Use binary ARRAY format to avoid having to properly
                        # escape text in the array literals
                        fc = FC_BINARY
                    array_oid = pg_array_types[oid]
                except KeyError:
                    raise ArrayContentNotSupportedError("oid " + str(oid) + " not supported as array contents")
                except NotSupportedError:
                    raise ArrayContentNotSupportedError("type " + str(typ) + " not supported as array contents")
        if fc == FC_BINARY:

            def send_array(arr: typing.List) -> typing.Union[bytes, bytearray]:
                # check that all array dimensions are consistent
                array_check_dimensions(arr)

                has_null: bool = array_has_null(arr)
                dim_lengths: typing.List[int] = array_dim_lengths(arr)
                data: bytearray = bytearray(iii_pack(len(dim_lengths), has_null, oid))
                for i in dim_lengths:
                    data.extend(ii_pack(i, 1))
                for v in array_flatten(arr):
                    if v is None:
                        data += i_pack(-1)
                    elif isinstance(v, typ):
                        inner_data = send_func(v)
                        data += i_pack(len(inner_data))
                        data += inner_data
                    else:
                        raise ArrayContentNotHomogenousError("not all array elements are of type " + str(typ))
                return data

        else:

            def send_array(arr: typing.List) -> typing.Union[bytes, bytearray]:
                array_check_dimensions(arr)
                ar: typing.List = deepcopy(arr)
                for a, i, v in walk_array(ar):
                    if v is None:
                        a[i] = "NULL"
                    elif isinstance(v, typ):
                        a[i] = send_func(v).decode("ascii")
                    else:
                        raise ArrayContentNotHomogenousError("not all array elements are of type " + str(typ))
                return str(ar).translate(arr_trans).encode("ascii")

        return (array_oid, fc, send_array)