in iotdb-client/client-py/iotdb/utils/iotdb_rpc_dataset.py [0:0]
def result_set_to_pandas(self):
result = {}
for i in range(len(self.__column_index_2_tsblock_column_index_list)):
result[i] = []
while self._has_next_result_set():
time_array, column_arrays, null_indicators, array_length = deserialize(
memoryview(self.__query_result[self.__query_result_index])
)
self.__query_result[self.__query_result_index] = None
self.__query_result_index += 1
if self.ignore_timestamp is None or self.ignore_timestamp is False:
if time_array.dtype.byteorder == ">" and len(time_array) > 0:
time_array = time_array.byteswap().view(
time_array.dtype.newbyteorder("<")
)
result[0].append(time_array)
for i, location in enumerate(
self.__column_index_2_tsblock_column_index_list
):
if location < 0:
continue
data_type = self.__data_type_for_tsblock_column[location]
column_array = column_arrays[location]
# BOOLEAN, INT32, INT64, FLOAT, DOUBLE, BLOB
if data_type in (0, 1, 2, 3, 4, 10):
data_array = column_array
if (
data_type != 10
and len(data_array) > 0
and data_array.dtype.byteorder == ">"
):
data_array = data_array.byteswap().view(
data_array.dtype.newbyteorder("<")
)
# TEXT, STRING
elif data_type in (5, 11):
data_array = np.array([x.decode("utf-8") for x in column_array])
# TIMESTAMP
elif data_type == 8:
data_array = pd.Series(
[
convert_to_timestamp(
x, self.__time_precision, self.__zone_id
)
for x in column_array
],
dtype=object,
)
# DATE
elif data_type == 9:
data_array = pd.Series(column_array).apply(parse_int_to_date)
else:
raise RuntimeError("unsupported data type {}.".format(data_type))
null_indicator = null_indicators[location]
if len(data_array) < array_length or (
data_type == 0 and null_indicator is not None
):
tmp_array = []
# BOOLEAN, INT32, INT64
if data_type == 0 or data_type == 1 or data_type == 2:
tmp_array = np.full(array_length, pd.NA, dtype=object)
# FLOAT, DOUBLE
elif data_type == 3 or data_type == 4:
tmp_array = np.full(
array_length, np.nan, dtype=data_type.np_dtype()
)
# TEXT, STRING, BLOB, DATE, TIMESTAMP
elif (
data_type == 5
or data_type == 11
or data_type == 10
or data_type == 9
or data_type == 8
):
tmp_array = np.full(array_length, None, dtype=object)
if null_indicator is not None:
indexes = [not v for v in null_indicator]
if data_type == 0:
tmp_array[indexes] = data_array[indexes]
elif len(data_array) != 0:
tmp_array[indexes] = data_array
if data_type == 1:
tmp_array = pd.Series(tmp_array).astype("Int32")
elif data_type == 2:
tmp_array = pd.Series(tmp_array).astype("Int64")
elif data_type == 0:
tmp_array = pd.Series(tmp_array).astype("boolean")
data_array = tmp_array
result[i].append(data_array)
for k, v in result.items():
if v is None or len(v) < 1 or v[0] is None:
result[k] = []
elif v[0].dtype == "Int32":
v = [x if isinstance(x, pd.Series) else pd.Series(x) for x in v]
result[k] = pd.concat(v, ignore_index=True).astype("Int32")
elif v[0].dtype == "Int64":
v = [x if isinstance(x, pd.Series) else pd.Series(x) for x in v]
result[k] = pd.concat(v, ignore_index=True).astype("Int64")
elif v[0].dtype == bool:
result[k] = pd.Series(np.concatenate(v, axis=0)).astype("boolean")
else:
result[k] = np.concatenate(v, axis=0)
df = pd.DataFrame(result)
df.columns = self.__column_name_list
return df