def convert_to_df()

in iotdb-core/ainode/ainode/core/util/serde.py [0:0]


def convert_to_df(name_list, type_list, name_index, binary_list):
    column_name_list = [TIMESTAMP_STR]
    column_type_list = [TSDataType.INT64]
    column_ordinal_dict = {TIMESTAMP_STR: 1}

    if name_index is not None:
        column_type_deduplicated_list = [
            None for _ in range(len(name_index))
        ]
        for i in range(len(name_list)):
            name = name_list[i]
            column_name_list.append(name)
            column_type_list.append(TSDataType[type_list[i]])
            if name not in column_ordinal_dict:
                index = name_index[name]
                column_ordinal_dict[name] = index + START_INDEX
                column_type_deduplicated_list[index] = TSDataType[type_list[i]]
    else:
        index = START_INDEX
        column_type_deduplicated_list = []
        for i in range(len(name_list)):
            name = name_list[i]
            column_name_list.append(name)
            column_type_list.append(TSDataType[type_list[i]])
            if name not in column_ordinal_dict:
                column_ordinal_dict[name] = index
                index += 1
                column_type_deduplicated_list.append(
                    TSDataType[type_list[i]]
                )

    binary_size = len(binary_list)
    binary_index = 0
    result = {}
    for column_name in column_name_list:
        result[column_name] = None

    while binary_index < binary_size:
        buffer = binary_list[binary_index]
        binary_index += 1
        time_column_values, column_values, null_indicators, _ = deserialize(buffer)
        time_array = np.frombuffer(
            time_column_values, np.dtype(np.longlong).newbyteorder(">")
        )
        if time_array.dtype.byteorder == ">":
            time_array = time_array.byteswap().view(time_array.dtype.newbyteorder("<"))

        if result[TIMESTAMP_STR] is None:
            result[TIMESTAMP_STR] = time_array
        else:
            result[TIMESTAMP_STR] = np.concatenate(
                (result[TIMESTAMP_STR], time_array), axis=0
            )
        total_length = len(time_array)

        for i in range(len(column_values)):
            column_name = column_name_list[i + 1]

            location = column_ordinal_dict[column_name] - START_INDEX
            if location < 0:
                continue

            data_type = column_type_deduplicated_list[location]
            value_buffer = column_values[location]
            value_buffer_len = len(value_buffer)

            if data_type == TSDataType.DOUBLE:
                data_array = np.frombuffer(
                    value_buffer, np.dtype(np.double).newbyteorder(">")
                )
            elif data_type == TSDataType.FLOAT:
                data_array = np.frombuffer(
                    value_buffer, np.dtype(np.float32).newbyteorder(">")
                )
            elif data_type == TSDataType.BOOLEAN:
                data_array = []
                for index in range(len(value_buffer)):
                    data_array.append(value_buffer[index])
                data_array = np.array(data_array).astype("bool")
            elif data_type == TSDataType.INT32:
                data_array = np.frombuffer(
                    value_buffer, np.dtype(np.int32).newbyteorder(">")
                )
            elif data_type == TSDataType.INT64:
                data_array = np.frombuffer(
                    value_buffer, np.dtype(np.int64).newbyteorder(">")
                )
            elif data_type == TSDataType.TEXT:
                index = 0
                data_array = []
                while index < value_buffer_len:
                    value_bytes = value_buffer[index]
                    value = value_bytes.decode("utf-8")
                    data_array.append(value)
                    index += 1
                data_array = np.array(data_array, dtype=object)
            else:
                raise RuntimeError("unsupported data type {}.".format(data_type))

            if data_array.dtype.byteorder == ">":
                data_array = data_array.byteswap().view(data_array.dtype.newbyteorder("<"))

            null_indicator = null_indicators[location]
            if len(data_array) < total_length or (data_type == TSDataType.BOOLEAN and null_indicator is not None):
                if data_type == TSDataType.INT32 or data_type == TSDataType.INT64:
                    tmp_array = np.full(total_length, np.nan, np.float32)
                elif data_type == TSDataType.FLOAT or data_type == TSDataType.DOUBLE:
                    tmp_array = np.full(total_length, np.nan, data_array.dtype)
                elif data_type == TSDataType.BOOLEAN:
                    tmp_array = np.full(total_length, np.nan, np.float32)
                elif data_type == TSDataType.TEXT:
                    tmp_array = np.full(total_length, np.nan, dtype=data_array.dtype)
                else:
                    raise Exception("Unsupported dataType in deserialization")

                if null_indicator is not None:
                    indexes = [not v for v in null_indicator]
                    if data_type == TSDataType.BOOLEAN:
                        tmp_array[indexes] = data_array[indexes]
                    else:
                        tmp_array[indexes] = data_array

                if data_type == TSDataType.INT32:
                    tmp_array = pd.Series(tmp_array).astype("Int32")
                elif data_type == TSDataType.INT64:
                    tmp_array = pd.Series(tmp_array).astype("Int64")
                elif data_type == TSDataType.BOOLEAN:
                    tmp_array = pd.Series(tmp_array).astype("boolean")

                data_array = tmp_array

            if result[column_name] is None:
                result[column_name] = data_array
            else:
                if isinstance(result[column_name], pd.Series):
                    if not isinstance(data_array, pd.Series):
                        if data_type == TSDataType.INT32:
                            data_array = pd.Series(data_array).astype("Int32")
                        elif data_type == TSDataType.INT64:
                            data_array = pd.Series(data_array).astype("Int64")
                        elif data_type == TSDataType.BOOLEAN:
                            data_array = pd.Series(data_array).astype("boolean")
                        else:
                            raise RuntimeError("Series Error")
                    result[column_name] = result[column_name].append(data_array)
                else:
                    result[column_name] = np.concatenate(
                        (result[column_name], data_array), axis=0
                    )
    for k, v in result.items():
        if v is None:
            result[k] = []
    df = pd.DataFrame(result)
    df = df.reset_index(drop=True)
    return df