diff --git a/iotdb-client/client-py/iotdb/Session.py b/iotdb-client/client-py/iotdb/Session.py index 3f44bd41584d6..a3d89d717464b 100644 --- a/iotdb-client/client-py/iotdb/Session.py +++ b/iotdb-client/client-py/iotdb/Session.py @@ -95,7 +95,13 @@ def __init__( self.__default_endpoint = TEndPoint(self.__host, self.__port) self.__user = user self.__password = password - self.__fetch_size = fetch_size + if fetch_size > 0: + self.__fetch_size = fetch_size + else: + logger.warning( + f"fetch_size {fetch_size} is illegal, use default fetch_size {self.DEFAULT_FETCH_SIZE}" + ) + self.__fetch_size = self.DEFAULT_FETCH_SIZE self.__is_close = True self.__client = None self.__default_connection = None diff --git a/iotdb-client/client-py/iotdb/tsfile/utils/tsblock_serde.py b/iotdb-client/client-py/iotdb/tsfile/utils/tsblock_serde.py index 937ca9ca32615..cc5577e885c62 100644 --- a/iotdb-client/client-py/iotdb/tsfile/utils/tsblock_serde.py +++ b/iotdb-client/client-py/iotdb/tsfile/utils/tsblock_serde.py @@ -18,7 +18,6 @@ import numpy as np - # Serialized tsBlock: # +-------------+---------------+---------+------------+-----------+----------+ # | val col cnt | val col types | pos cnt | encodings | time col | val col | diff --git a/iotdb-client/client-py/iotdb/utils/SessionDataSet.py b/iotdb-client/client-py/iotdb/utils/SessionDataSet.py index b079ee28c1aaf..9c5b3ec06ddc5 100644 --- a/iotdb-client/client-py/iotdb/utils/SessionDataSet.py +++ b/iotdb-client/client-py/iotdb/utils/SessionDataSet.py @@ -16,6 +16,7 @@ # under the License. # import logging +from typing import Optional from iotdb.utils.Field import Field @@ -143,6 +144,24 @@ def construct_row_record_from_data_frame(self): def close_operation_handle(self): self.iotdb_rpc_data_set.close() + def has_next_df(self) -> bool: + """ + Evaluate if there are more DataFrames to be fetched. + :return: whether there are more DataFrames to be fetched + """ + # Check if buffer has data or if there are more results to fetch + rpc_ds = self.iotdb_rpc_data_set + return rpc_ds._has_buffered_data() or rpc_ds._has_next_result_set() + + def next_df(self) -> Optional[pd.DataFrame]: + """ + Get the next DataFrame from the result set. + Each returned DataFrame contains exactly fetch_size rows, + except for the last DataFrame which may contain fewer rows. + :return: the next DataFrame, or None if no more data + """ + return self.iotdb_rpc_data_set.next_dataframe() + def todf(self) -> pd.DataFrame: return result_set_to_pandas(self) diff --git a/iotdb-client/client-py/iotdb/utils/iotdb_rpc_dataset.py b/iotdb-client/client-py/iotdb/utils/iotdb_rpc_dataset.py index dc77136866938..0edc76f68fd6d 100644 --- a/iotdb-client/client-py/iotdb/utils/iotdb_rpc_dataset.py +++ b/iotdb-client/client-py/iotdb/utils/iotdb_rpc_dataset.py @@ -18,6 +18,7 @@ # for package import logging +from typing import Optional import numpy as np import pandas as pd @@ -120,10 +121,12 @@ def __init__( self.data_frame = None self.__zone_id = zone_id self.__time_precision = time_precision + self.__df_buffer = None # Buffer for streaming DataFrames def close(self): if self.__is_closed: return + self.__df_buffer = None # Clean up streaming DataFrame buffer if self.__client is not None: try: status = self.__client.closeOperation( @@ -243,11 +246,73 @@ def _has_next_result_set(self): return True return False + def _has_buffered_data(self) -> bool: + """ + Check if there is buffered data for streaming DataFrame interface. + :return: True if there is buffered data, False otherwise + """ + return self.__df_buffer is not None and len(self.__df_buffer) > 0 + + def next_dataframe(self) -> Optional[pd.DataFrame]: + """ + Get the next DataFrame from the result set with exactly fetch_size rows. + The last DataFrame may have fewer rows. + :return: the next DataFrame with fetch_size rows, or None if no more data + """ + # Accumulate data until we have at least fetch_size rows or no more data + while True: + buffer_len = 0 if self.__df_buffer is None else len(self.__df_buffer) + if buffer_len >= self.__fetch_size: + # We have enough rows, return a chunk + break + if not self._has_next_result_set(): + # No more data to fetch + break + # Process and accumulate + result = self._process_buffer() + new_df = self._build_dataframe(result) + if self.__df_buffer is None: + self.__df_buffer = new_df + else: + self.__df_buffer = pd.concat( + [self.__df_buffer, new_df], ignore_index=True + ) + + if self.__df_buffer is None or len(self.__df_buffer) == 0: + return None + + if len(self.__df_buffer) <= self.__fetch_size: + # Return all remaining rows + result_df = self.__df_buffer + self.__df_buffer = None + return result_df + else: + # Slice off fetch_size rows + result_df = self.__df_buffer.iloc[: self.__fetch_size].reset_index( + drop=True + ) + self.__df_buffer = self.__df_buffer.iloc[self.__fetch_size :].reset_index( + drop=True + ) + return result_df + def result_set_to_pandas(self): result = {} for i in range(len(self.__column_index_2_tsblock_column_index_list)): result[i] = [] while self._has_next_result_set(): + batch_result = self._process_buffer() + for k, v in batch_result.items(): + result[k].extend(v) + + return self._build_dataframe(result) + + def _process_buffer(self): + result = {} + for i in range(len(self.__column_index_2_tsblock_column_index_list)): + result[i] = [] + + while self.__query_result_index < len(self.__query_result): time_array, column_arrays, null_indicators, array_length = deserialize( memoryview(self.__query_result[self.__query_result_index]) ) @@ -339,6 +404,9 @@ def result_set_to_pandas(self): result[i].append(data_array) + return result + + def _build_dataframe(self, result): for k, v in result.items(): if v is None or len(v) < 1 or v[0] is None: result[k] = [] diff --git a/iotdb-client/client-py/session_example.py b/iotdb-client/client-py/session_example.py index d0a6a3aba8e37..996339c0088d6 100644 --- a/iotdb-client/client-py/session_example.py +++ b/iotdb-client/client-py/session_example.py @@ -411,6 +411,12 @@ df = dataset.todf() print(df.to_string()) +with session.execute_query_statement( + "select s_01,s_02,s_03,s_04 from root.sg_test_01.d_04" +) as dataset: + while dataset.has_next_df(): + print(dataset.next_df()) + # delete database session.delete_storage_group("root.sg_test_01") diff --git a/iotdb-client/client-py/table_model_session_example.py b/iotdb-client/client-py/table_model_session_example.py index c9aa62b97a0ca..048f1022b76a0 100644 --- a/iotdb-client/client-py/table_model_session_example.py +++ b/iotdb-client/client-py/table_model_session_example.py @@ -158,5 +158,9 @@ df = dataset.todf() print(df) +with session.execute_query_statement("select * from table5 order by time") as dataset: + while dataset.has_next_df(): + print(dataset.next_df()) + # close session connection. session.close() diff --git a/iotdb-client/client-py/tests/integration/test_dataframe.py b/iotdb-client/client-py/tests/integration/test_dataframe.py index f314fbac18434..61a243a23f487 100644 --- a/iotdb-client/client-py/tests/integration/test_dataframe.py +++ b/iotdb-client/client-py/tests/integration/test_dataframe.py @@ -42,6 +42,56 @@ def test_simple_query(): assert_array_equal(df.values, [[123.0, 15.0]]) +def test_stream_query(): + with IoTDBContainer("iotdb:dev") as db: + db: IoTDBContainer + session = Session( + db.get_container_host_ip(), db.get_exposed_port(6667), fetch_size=1 + ) + session.open(False) + session.execute_non_query_statement("CREATE DATABASE root.device0") + + # Write data + session.insert_str_record("root.device0", 123, "pressure", "15.0") + session.insert_str_record("root.device0", 124, "pressure", "15.0") + session.insert_str_record("root.device0", 125, "pressure", "15.0") + + # Read + session_data_set = session.execute_query_statement("SELECT * FROM root.device0") + index = 0 + while session_data_set.has_next_df(): + df = session_data_set.next_df() + assert list(df.columns) == ["Time", "root.device0.pressure"] + assert_array_equal(df.values, [[123.0 + index, 15.0]]) + index += 1 + session.close() + assert index == 3 + + +def test_stream_query_with_illegal_fetch_size(): + with IoTDBContainer("iotdb:dev") as db: + db: IoTDBContainer + session = Session( + db.get_container_host_ip(), db.get_exposed_port(6667), fetch_size=-1 + ) + session.open(False) + session.execute_non_query_statement("CREATE DATABASE root.device0") + + # Write data + session.insert_str_record("root.device0", 123, "pressure", "15.0") + session.insert_str_record("root.device0", 124, "pressure", "15.0") + session.insert_str_record("root.device0", 125, "pressure", "15.0") + + # Read + session_data_set = session.execute_query_statement("SELECT * FROM root.device0") + + while session_data_set.has_next_df(): + df = session_data_set.next_df() + assert list(df.columns) == ["Time", "root.device0.pressure"] + assert_array_equal(df.values, [[123.0, 15.0], [124.0, 15.0], [125.0, 15.0]]) + session.close() + + def test_non_time_query(): with IoTDBContainer("iotdb:dev") as db: db: IoTDBContainer