From f840f0dc5937313aec363424331ce4aaff965b2d Mon Sep 17 00:00:00 2001 From: JackieTien97 Date: Sat, 17 Jan 2026 22:10:13 +0800 Subject: [PATCH 1/6] Support stream DataFrame interface in iotdb python client --- .../client-py/iotdb/utils/SessionDataSet.py | 19 +++++++ .../iotdb/utils/iotdb_rpc_dataset.py | 53 +++++++++++++++++++ iotdb-client/client-py/session_example.py | 6 +++ .../client-py/table_model_session_example.py | 4 ++ 4 files changed, 82 insertions(+) diff --git a/iotdb-client/client-py/iotdb/utils/SessionDataSet.py b/iotdb-client/client-py/iotdb/utils/SessionDataSet.py index b079ee28c1aa..67509d3c059d 100644 --- a/iotdb-client/client-py/iotdb/utils/SessionDataSet.py +++ b/iotdb-client/client-py/iotdb/utils/SessionDataSet.py @@ -143,6 +143,25 @@ def construct_row_record_from_data_frame(self): def close_operation_handle(self): self.iotdb_rpc_data_set.close() + def has_next_df(self): + """ + Evaluate if there are more DataFrames to be fetched. + :return: whether there are more DataFrames to be fetched + """ + # Check if buffer has data or if there are more results to fetch + rpc_ds = self.iotdb_rpc_data_set + has_buffer = rpc_ds._IoTDBRpcDataSet__df_buffer is not None and len(rpc_ds._IoTDBRpcDataSet__df_buffer) > 0 + return has_buffer or rpc_ds._has_next_result_set() + + def next_df(self): + """ + Get the next DataFrame from the result set. + Each returned DataFrame contains exactly fetch_size rows, + except for the last DataFrame which may contain fewer rows. + :return: the next DataFrame, or None if no more data + """ + return self.iotdb_rpc_data_set.next_dataframe() + def todf(self) -> pd.DataFrame: return result_set_to_pandas(self) diff --git a/iotdb-client/client-py/iotdb/utils/iotdb_rpc_dataset.py b/iotdb-client/client-py/iotdb/utils/iotdb_rpc_dataset.py index dc7713686693..5afa8b152a59 100644 --- a/iotdb-client/client-py/iotdb/utils/iotdb_rpc_dataset.py +++ b/iotdb-client/client-py/iotdb/utils/iotdb_rpc_dataset.py @@ -120,6 +120,7 @@ def __init__( self.data_frame = None self.__zone_id = zone_id self.__time_precision = time_precision + self.__df_buffer = None # Buffer for streaming DataFrames def close(self): if self.__is_closed: @@ -243,11 +244,60 @@ def _has_next_result_set(self): return True return False + def next_dataframe(self): + """ + Get the next DataFrame from the result set with exactly fetch_size rows. + The last DataFrame may have fewer rows. + :return: the next DataFrame with fetch_size rows, or None if no more data + """ + # Accumulate data until we have at least fetch_size rows or no more data + while True: + buffer_len = 0 if self.__df_buffer is None else len(self.__df_buffer) + if buffer_len >= self.__fetch_size: + # We have enough rows, return a chunk + break + if not self._has_next_result_set(): + # No more data to fetch + break + # Process and accumulate + result = self._process_buffer() + new_df = self._build_dataframe(result) + if self.__df_buffer is None: + self.__df_buffer = new_df + else: + self.__df_buffer = pd.concat([self.__df_buffer, new_df], ignore_index=True) + + if self.__df_buffer is None or len(self.__df_buffer) == 0: + return None + + if len(self.__df_buffer) <= self.__fetch_size: + # Return all remaining rows + result_df = self.__df_buffer + self.__df_buffer = None + return result_df + else: + # Slice off fetch_size rows + result_df = self.__df_buffer.iloc[:self.__fetch_size].reset_index(drop=True) + self.__df_buffer = self.__df_buffer.iloc[self.__fetch_size:].reset_index(drop=True) + return result_df + def result_set_to_pandas(self): result = {} for i in range(len(self.__column_index_2_tsblock_column_index_list)): result[i] = [] while self._has_next_result_set(): + batch_result = self._process_buffer() + for k, v in batch_result.items(): + result[k].extend(v) + + return self._build_dataframe(result) + + def _process_buffer(self): + result = {} + for i in range(len(self.__column_index_2_tsblock_column_index_list)): + result[i] = [] + + while self.__query_result_index < len(self.__query_result): time_array, column_arrays, null_indicators, array_length = deserialize( memoryview(self.__query_result[self.__query_result_index]) ) @@ -339,6 +389,9 @@ def result_set_to_pandas(self): result[i].append(data_array) + return result + + def _build_dataframe(self, result): for k, v in result.items(): if v is None or len(v) < 1 or v[0] is None: result[k] = [] diff --git a/iotdb-client/client-py/session_example.py b/iotdb-client/client-py/session_example.py index d0a6a3aba8e3..996339c0088d 100644 --- a/iotdb-client/client-py/session_example.py +++ b/iotdb-client/client-py/session_example.py @@ -411,6 +411,12 @@ df = dataset.todf() print(df.to_string()) +with session.execute_query_statement( + "select s_01,s_02,s_03,s_04 from root.sg_test_01.d_04" +) as dataset: + while dataset.has_next_df(): + print(dataset.next_df()) + # delete database session.delete_storage_group("root.sg_test_01") diff --git a/iotdb-client/client-py/table_model_session_example.py b/iotdb-client/client-py/table_model_session_example.py index c9aa62b97a0c..048f1022b76a 100644 --- a/iotdb-client/client-py/table_model_session_example.py +++ b/iotdb-client/client-py/table_model_session_example.py @@ -158,5 +158,9 @@ df = dataset.todf() print(df) +with session.execute_query_statement("select * from table5 order by time") as dataset: + while dataset.has_next_df(): + print(dataset.next_df()) + # close session connection. session.close() From 909e3362fb06bb1403a76c5e8bdefc0cf4366275 Mon Sep 17 00:00:00 2001 From: JackieTien97 Date: Sun, 18 Jan 2026 13:46:17 +0800 Subject: [PATCH 2/6] change according to code review --- iotdb-client/client-py/iotdb/utils/SessionDataSet.py | 7 +++---- .../client-py/iotdb/utils/iotdb_rpc_dataset.py | 11 ++++++++++- 2 files changed, 13 insertions(+), 5 deletions(-) diff --git a/iotdb-client/client-py/iotdb/utils/SessionDataSet.py b/iotdb-client/client-py/iotdb/utils/SessionDataSet.py index 67509d3c059d..de4ef31dfa1a 100644 --- a/iotdb-client/client-py/iotdb/utils/SessionDataSet.py +++ b/iotdb-client/client-py/iotdb/utils/SessionDataSet.py @@ -143,17 +143,16 @@ def construct_row_record_from_data_frame(self): def close_operation_handle(self): self.iotdb_rpc_data_set.close() - def has_next_df(self): + def has_next_df(self) -> bool: """ Evaluate if there are more DataFrames to be fetched. :return: whether there are more DataFrames to be fetched """ # Check if buffer has data or if there are more results to fetch rpc_ds = self.iotdb_rpc_data_set - has_buffer = rpc_ds._IoTDBRpcDataSet__df_buffer is not None and len(rpc_ds._IoTDBRpcDataSet__df_buffer) > 0 - return has_buffer or rpc_ds._has_next_result_set() + return rpc_ds._has_buffered_data() or rpc_ds._has_next_result_set() - def next_df(self): + def next_df(self) -> "pd.DataFrame | None": """ Get the next DataFrame from the result set. Each returned DataFrame contains exactly fetch_size rows, diff --git a/iotdb-client/client-py/iotdb/utils/iotdb_rpc_dataset.py b/iotdb-client/client-py/iotdb/utils/iotdb_rpc_dataset.py index 5afa8b152a59..7c3831aca7f1 100644 --- a/iotdb-client/client-py/iotdb/utils/iotdb_rpc_dataset.py +++ b/iotdb-client/client-py/iotdb/utils/iotdb_rpc_dataset.py @@ -18,6 +18,7 @@ # for package import logging +from typing import Optional import numpy as np import pandas as pd @@ -125,6 +126,7 @@ def __init__( def close(self): if self.__is_closed: return + self.__df_buffer = None # Clean up streaming DataFrame buffer if self.__client is not None: try: status = self.__client.closeOperation( @@ -244,7 +246,14 @@ def _has_next_result_set(self): return True return False - def next_dataframe(self): + def _has_buffered_data(self) -> bool: + """ + Check if there is buffered data for streaming DataFrame interface. + :return: True if there is buffered data, False otherwise + """ + return self.__df_buffer is not None and len(self.__df_buffer) > 0 + + def next_dataframe(self) -> Optional[pd.DataFrame]: """ Get the next DataFrame from the result set with exactly fetch_size rows. The last DataFrame may have fewer rows. From 798176948bb8ae6ef997af96c7ff76911986efb1 Mon Sep 17 00:00:00 2001 From: JackieTien97 Date: Sun, 18 Jan 2026 14:00:15 +0800 Subject: [PATCH 3/6] format code --- iotdb-client/client-py/iotdb/utils/SessionDataSet.py | 2 +- .../client-py/iotdb/utils/iotdb_rpc_dataset.py | 12 +++++++++--- 2 files changed, 10 insertions(+), 4 deletions(-) diff --git a/iotdb-client/client-py/iotdb/utils/SessionDataSet.py b/iotdb-client/client-py/iotdb/utils/SessionDataSet.py index de4ef31dfa1a..32198150f329 100644 --- a/iotdb-client/client-py/iotdb/utils/SessionDataSet.py +++ b/iotdb-client/client-py/iotdb/utils/SessionDataSet.py @@ -152,7 +152,7 @@ def has_next_df(self) -> bool: rpc_ds = self.iotdb_rpc_data_set return rpc_ds._has_buffered_data() or rpc_ds._has_next_result_set() - def next_df(self) -> "pd.DataFrame | None": + def next_df(self) -> Optional[pd.DataFrame]: """ Get the next DataFrame from the result set. Each returned DataFrame contains exactly fetch_size rows, diff --git a/iotdb-client/client-py/iotdb/utils/iotdb_rpc_dataset.py b/iotdb-client/client-py/iotdb/utils/iotdb_rpc_dataset.py index 7c3831aca7f1..0edc76f68fd6 100644 --- a/iotdb-client/client-py/iotdb/utils/iotdb_rpc_dataset.py +++ b/iotdb-client/client-py/iotdb/utils/iotdb_rpc_dataset.py @@ -274,7 +274,9 @@ def next_dataframe(self) -> Optional[pd.DataFrame]: if self.__df_buffer is None: self.__df_buffer = new_df else: - self.__df_buffer = pd.concat([self.__df_buffer, new_df], ignore_index=True) + self.__df_buffer = pd.concat( + [self.__df_buffer, new_df], ignore_index=True + ) if self.__df_buffer is None or len(self.__df_buffer) == 0: return None @@ -286,8 +288,12 @@ def next_dataframe(self) -> Optional[pd.DataFrame]: return result_df else: # Slice off fetch_size rows - result_df = self.__df_buffer.iloc[:self.__fetch_size].reset_index(drop=True) - self.__df_buffer = self.__df_buffer.iloc[self.__fetch_size:].reset_index(drop=True) + result_df = self.__df_buffer.iloc[: self.__fetch_size].reset_index( + drop=True + ) + self.__df_buffer = self.__df_buffer.iloc[self.__fetch_size :].reset_index( + drop=True + ) return result_df def result_set_to_pandas(self): From b3b7714f411655777006b4e281ad6ade989af607 Mon Sep 17 00:00:00 2001 From: Yongzao <532741407@qq.com> Date: Sun, 18 Jan 2026 17:37:27 +0800 Subject: [PATCH 4/6] Bug fix "optional" --- iotdb-client/client-py/iotdb/utils/SessionDataSet.py | 1 + 1 file changed, 1 insertion(+) diff --git a/iotdb-client/client-py/iotdb/utils/SessionDataSet.py b/iotdb-client/client-py/iotdb/utils/SessionDataSet.py index 32198150f329..9c5b3ec06ddc 100644 --- a/iotdb-client/client-py/iotdb/utils/SessionDataSet.py +++ b/iotdb-client/client-py/iotdb/utils/SessionDataSet.py @@ -16,6 +16,7 @@ # under the License. # import logging +from typing import Optional from iotdb.utils.Field import Field From cf2c37544fbbf854fbf69b738b3bb8ea5c07204e Mon Sep 17 00:00:00 2001 From: HTHou Date: Mon, 19 Jan 2026 10:14:34 +0800 Subject: [PATCH 5/6] add IT --- .../iotdb/tsfile/utils/tsblock_serde.py | 1 - .../tests/integration/test_dataframe.py | 26 +++++++++++++++++++ 2 files changed, 26 insertions(+), 1 deletion(-) diff --git a/iotdb-client/client-py/iotdb/tsfile/utils/tsblock_serde.py b/iotdb-client/client-py/iotdb/tsfile/utils/tsblock_serde.py index 937ca9ca3261..cc5577e885c6 100644 --- a/iotdb-client/client-py/iotdb/tsfile/utils/tsblock_serde.py +++ b/iotdb-client/client-py/iotdb/tsfile/utils/tsblock_serde.py @@ -18,7 +18,6 @@ import numpy as np - # Serialized tsBlock: # +-------------+---------------+---------+------------+-----------+----------+ # | val col cnt | val col types | pos cnt | encodings | time col | val col | diff --git a/iotdb-client/client-py/tests/integration/test_dataframe.py b/iotdb-client/client-py/tests/integration/test_dataframe.py index f314fbac1843..5f2d0c4839a3 100644 --- a/iotdb-client/client-py/tests/integration/test_dataframe.py +++ b/iotdb-client/client-py/tests/integration/test_dataframe.py @@ -42,6 +42,32 @@ def test_simple_query(): assert_array_equal(df.values, [[123.0, 15.0]]) +def test_stream_query(): + with IoTDBContainer("iotdb:dev") as db: + db: IoTDBContainer + session = Session( + db.get_container_host_ip(), db.get_exposed_port(6667), fetch_size=1 + ) + session.open(False) + session.execute_non_query_statement("CREATE DATABASE root.device0") + + # Write data + session.insert_str_record("root.device0", 123, "pressure", "15.0") + session.insert_str_record("root.device0", 124, "pressure", "15.0") + session.insert_str_record("root.device0", 125, "pressure", "15.0") + + # Read + session_data_set = session.execute_query_statement("SELECT * FROM root.device0") + index = 0 + while session_data_set.has_next_df(): + df = session_data_set.next_df() + assert list(df.columns) == ["Time", "root.device0.pressure"] + assert_array_equal(df.values, [[123.0 + index, 15.0]]) + index += 1 + session.close() + assert index == 3 + + def test_non_time_query(): with IoTDBContainer("iotdb:dev") as db: db: IoTDBContainer From a453860068eff4e9ceed9ddb23500fccdea74fc1 Mon Sep 17 00:00:00 2001 From: HTHou Date: Mon, 19 Jan 2026 11:42:42 +0800 Subject: [PATCH 6/6] fix illegal fetch_size --- iotdb-client/client-py/iotdb/Session.py | 8 ++++++- .../tests/integration/test_dataframe.py | 24 +++++++++++++++++++ 2 files changed, 31 insertions(+), 1 deletion(-) diff --git a/iotdb-client/client-py/iotdb/Session.py b/iotdb-client/client-py/iotdb/Session.py index 3f44bd41584d..a3d89d717464 100644 --- a/iotdb-client/client-py/iotdb/Session.py +++ b/iotdb-client/client-py/iotdb/Session.py @@ -95,7 +95,13 @@ def __init__( self.__default_endpoint = TEndPoint(self.__host, self.__port) self.__user = user self.__password = password - self.__fetch_size = fetch_size + if fetch_size > 0: + self.__fetch_size = fetch_size + else: + logger.warning( + f"fetch_size {fetch_size} is illegal, use default fetch_size {self.DEFAULT_FETCH_SIZE}" + ) + self.__fetch_size = self.DEFAULT_FETCH_SIZE self.__is_close = True self.__client = None self.__default_connection = None diff --git a/iotdb-client/client-py/tests/integration/test_dataframe.py b/iotdb-client/client-py/tests/integration/test_dataframe.py index 5f2d0c4839a3..61a243a23f48 100644 --- a/iotdb-client/client-py/tests/integration/test_dataframe.py +++ b/iotdb-client/client-py/tests/integration/test_dataframe.py @@ -68,6 +68,30 @@ def test_stream_query(): assert index == 3 +def test_stream_query_with_illegal_fetch_size(): + with IoTDBContainer("iotdb:dev") as db: + db: IoTDBContainer + session = Session( + db.get_container_host_ip(), db.get_exposed_port(6667), fetch_size=-1 + ) + session.open(False) + session.execute_non_query_statement("CREATE DATABASE root.device0") + + # Write data + session.insert_str_record("root.device0", 123, "pressure", "15.0") + session.insert_str_record("root.device0", 124, "pressure", "15.0") + session.insert_str_record("root.device0", 125, "pressure", "15.0") + + # Read + session_data_set = session.execute_query_statement("SELECT * FROM root.device0") + + while session_data_set.has_next_df(): + df = session_data_set.next_df() + assert list(df.columns) == ["Time", "root.device0.pressure"] + assert_array_equal(df.values, [[123.0, 15.0], [124.0, 15.0], [125.0, 15.0]]) + session.close() + + def test_non_time_query(): with IoTDBContainer("iotdb:dev") as db: db: IoTDBContainer