in iotdb-core/ainode/ainode/core/util/serde.py [0:0]
def convert_to_df(name_list, type_list, name_index, binary_list):
column_name_list = [TIMESTAMP_STR]
column_type_list = [TSDataType.INT64]
column_ordinal_dict = {TIMESTAMP_STR: 1}
if name_index is not None:
column_type_deduplicated_list = [
None for _ in range(len(name_index))
]
for i in range(len(name_list)):
name = name_list[i]
column_name_list.append(name)
column_type_list.append(TSDataType[type_list[i]])
if name not in column_ordinal_dict:
index = name_index[name]
column_ordinal_dict[name] = index + START_INDEX
column_type_deduplicated_list[index] = TSDataType[type_list[i]]
else:
index = START_INDEX
column_type_deduplicated_list = []
for i in range(len(name_list)):
name = name_list[i]
column_name_list.append(name)
column_type_list.append(TSDataType[type_list[i]])
if name not in column_ordinal_dict:
column_ordinal_dict[name] = index
index += 1
column_type_deduplicated_list.append(
TSDataType[type_list[i]]
)
binary_size = len(binary_list)
binary_index = 0
result = {}
for column_name in column_name_list:
result[column_name] = None
while binary_index < binary_size:
buffer = binary_list[binary_index]
binary_index += 1
time_column_values, column_values, null_indicators, _ = deserialize(buffer)
time_array = np.frombuffer(
time_column_values, np.dtype(np.longlong).newbyteorder(">")
)
if time_array.dtype.byteorder == ">":
time_array = time_array.byteswap().view(time_array.dtype.newbyteorder("<"))
if result[TIMESTAMP_STR] is None:
result[TIMESTAMP_STR] = time_array
else:
result[TIMESTAMP_STR] = np.concatenate(
(result[TIMESTAMP_STR], time_array), axis=0
)
total_length = len(time_array)
for i in range(len(column_values)):
column_name = column_name_list[i + 1]
location = column_ordinal_dict[column_name] - START_INDEX
if location < 0:
continue
data_type = column_type_deduplicated_list[location]
value_buffer = column_values[location]
value_buffer_len = len(value_buffer)
if data_type == TSDataType.DOUBLE:
data_array = np.frombuffer(
value_buffer, np.dtype(np.double).newbyteorder(">")
)
elif data_type == TSDataType.FLOAT:
data_array = np.frombuffer(
value_buffer, np.dtype(np.float32).newbyteorder(">")
)
elif data_type == TSDataType.BOOLEAN:
data_array = []
for index in range(len(value_buffer)):
data_array.append(value_buffer[index])
data_array = np.array(data_array).astype("bool")
elif data_type == TSDataType.INT32:
data_array = np.frombuffer(
value_buffer, np.dtype(np.int32).newbyteorder(">")
)
elif data_type == TSDataType.INT64:
data_array = np.frombuffer(
value_buffer, np.dtype(np.int64).newbyteorder(">")
)
elif data_type == TSDataType.TEXT:
index = 0
data_array = []
while index < value_buffer_len:
value_bytes = value_buffer[index]
value = value_bytes.decode("utf-8")
data_array.append(value)
index += 1
data_array = np.array(data_array, dtype=object)
else:
raise RuntimeError("unsupported data type {}.".format(data_type))
if data_array.dtype.byteorder == ">":
data_array = data_array.byteswap().view(data_array.dtype.newbyteorder("<"))
null_indicator = null_indicators[location]
if len(data_array) < total_length or (data_type == TSDataType.BOOLEAN and null_indicator is not None):
if data_type == TSDataType.INT32 or data_type == TSDataType.INT64:
tmp_array = np.full(total_length, np.nan, np.float32)
elif data_type == TSDataType.FLOAT or data_type == TSDataType.DOUBLE:
tmp_array = np.full(total_length, np.nan, data_array.dtype)
elif data_type == TSDataType.BOOLEAN:
tmp_array = np.full(total_length, np.nan, np.float32)
elif data_type == TSDataType.TEXT:
tmp_array = np.full(total_length, np.nan, dtype=data_array.dtype)
else:
raise Exception("Unsupported dataType in deserialization")
if null_indicator is not None:
indexes = [not v for v in null_indicator]
if data_type == TSDataType.BOOLEAN:
tmp_array[indexes] = data_array[indexes]
else:
tmp_array[indexes] = data_array
if data_type == TSDataType.INT32:
tmp_array = pd.Series(tmp_array).astype("Int32")
elif data_type == TSDataType.INT64:
tmp_array = pd.Series(tmp_array).astype("Int64")
elif data_type == TSDataType.BOOLEAN:
tmp_array = pd.Series(tmp_array).astype("boolean")
data_array = tmp_array
if result[column_name] is None:
result[column_name] = data_array
else:
if isinstance(result[column_name], pd.Series):
if not isinstance(data_array, pd.Series):
if data_type == TSDataType.INT32:
data_array = pd.Series(data_array).astype("Int32")
elif data_type == TSDataType.INT64:
data_array = pd.Series(data_array).astype("Int64")
elif data_type == TSDataType.BOOLEAN:
data_array = pd.Series(data_array).astype("boolean")
else:
raise RuntimeError("Series Error")
result[column_name] = result[column_name].append(data_array)
else:
result[column_name] = np.concatenate(
(result[column_name], data_array), axis=0
)
for k, v in result.items():
if v is None:
result[k] = []
df = pd.DataFrame(result)
df = df.reset_index(drop=True)
return df