Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion iotdb-client/client-py/iotdb/Session.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,13 @@ def __init__(
self.__default_endpoint = TEndPoint(self.__host, self.__port)
self.__user = user
self.__password = password
self.__fetch_size = fetch_size
if fetch_size > 0:
self.__fetch_size = fetch_size
else:
logger.warning(
f"fetch_size {fetch_size} is illegal, use default fetch_size {self.DEFAULT_FETCH_SIZE}"
)
self.__fetch_size = self.DEFAULT_FETCH_SIZE
self.__is_close = True
self.__client = None
self.__default_connection = None
Expand Down
1 change: 0 additions & 1 deletion iotdb-client/client-py/iotdb/tsfile/utils/tsblock_serde.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

import numpy as np


# Serialized tsBlock:
# +-------------+---------------+---------+------------+-----------+----------+
# | val col cnt | val col types | pos cnt | encodings | time col | val col |
Expand Down
19 changes: 19 additions & 0 deletions iotdb-client/client-py/iotdb/utils/SessionDataSet.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
# under the License.
#
import logging
from typing import Optional

from iotdb.utils.Field import Field

Expand Down Expand Up @@ -143,6 +144,24 @@ def construct_row_record_from_data_frame(self):
def close_operation_handle(self):
self.iotdb_rpc_data_set.close()

def has_next_df(self) -> bool:
"""
Evaluate if there are more DataFrames to be fetched.
:return: whether there are more DataFrames to be fetched
"""
# Check if buffer has data or if there are more results to fetch
rpc_ds = self.iotdb_rpc_data_set
return rpc_ds._has_buffered_data() or rpc_ds._has_next_result_set()

def next_df(self) -> Optional[pd.DataFrame]:
"""
Get the next DataFrame from the result set.
Each returned DataFrame contains exactly fetch_size rows,
except for the last DataFrame which may contain fewer rows.
:return: the next DataFrame, or None if no more data
"""
return self.iotdb_rpc_data_set.next_dataframe()

def todf(self) -> pd.DataFrame:
return result_set_to_pandas(self)

Expand Down
68 changes: 68 additions & 0 deletions iotdb-client/client-py/iotdb/utils/iotdb_rpc_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

# for package
import logging
from typing import Optional

import numpy as np
import pandas as pd
Expand Down Expand Up @@ -120,10 +121,12 @@ def __init__(
self.data_frame = None
self.__zone_id = zone_id
self.__time_precision = time_precision
self.__df_buffer = None # Buffer for streaming DataFrames

def close(self):
if self.__is_closed:
return
self.__df_buffer = None # Clean up streaming DataFrame buffer
if self.__client is not None:
try:
status = self.__client.closeOperation(
Expand Down Expand Up @@ -243,11 +246,73 @@ def _has_next_result_set(self):
return True
return False

def _has_buffered_data(self) -> bool:
"""
Check if there is buffered data for streaming DataFrame interface.
:return: True if there is buffered data, False otherwise
"""
return self.__df_buffer is not None and len(self.__df_buffer) > 0

def next_dataframe(self) -> Optional[pd.DataFrame]:
"""
Get the next DataFrame from the result set with exactly fetch_size rows.
The last DataFrame may have fewer rows.
:return: the next DataFrame with fetch_size rows, or None if no more data
"""
# Accumulate data until we have at least fetch_size rows or no more data
while True:
buffer_len = 0 if self.__df_buffer is None else len(self.__df_buffer)
if buffer_len >= self.__fetch_size:
# We have enough rows, return a chunk
break
if not self._has_next_result_set():
# No more data to fetch
break
# Process and accumulate
result = self._process_buffer()
new_df = self._build_dataframe(result)
if self.__df_buffer is None:
self.__df_buffer = new_df
else:
self.__df_buffer = pd.concat(
[self.__df_buffer, new_df], ignore_index=True
)

if self.__df_buffer is None or len(self.__df_buffer) == 0:
return None

if len(self.__df_buffer) <= self.__fetch_size:
# Return all remaining rows
result_df = self.__df_buffer
self.__df_buffer = None
return result_df
else:
# Slice off fetch_size rows
result_df = self.__df_buffer.iloc[: self.__fetch_size].reset_index(
drop=True
)
self.__df_buffer = self.__df_buffer.iloc[self.__fetch_size :].reset_index(
drop=True
)
return result_df

def result_set_to_pandas(self):
result = {}
for i in range(len(self.__column_index_2_tsblock_column_index_list)):
result[i] = []
while self._has_next_result_set():
batch_result = self._process_buffer()
for k, v in batch_result.items():
result[k].extend(v)

return self._build_dataframe(result)

def _process_buffer(self):
result = {}
for i in range(len(self.__column_index_2_tsblock_column_index_list)):
result[i] = []

while self.__query_result_index < len(self.__query_result):
time_array, column_arrays, null_indicators, array_length = deserialize(
memoryview(self.__query_result[self.__query_result_index])
)
Expand Down Expand Up @@ -339,6 +404,9 @@ def result_set_to_pandas(self):

result[i].append(data_array)

return result

def _build_dataframe(self, result):
for k, v in result.items():
if v is None or len(v) < 1 or v[0] is None:
result[k] = []
Expand Down
6 changes: 6 additions & 0 deletions iotdb-client/client-py/session_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -411,6 +411,12 @@
df = dataset.todf()
print(df.to_string())

with session.execute_query_statement(
"select s_01,s_02,s_03,s_04 from root.sg_test_01.d_04"
) as dataset:
while dataset.has_next_df():
print(dataset.next_df())

# delete database
session.delete_storage_group("root.sg_test_01")

Expand Down
4 changes: 4 additions & 0 deletions iotdb-client/client-py/table_model_session_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,5 +158,9 @@
df = dataset.todf()
print(df)

with session.execute_query_statement("select * from table5 order by time") as dataset:
while dataset.has_next_df():
print(dataset.next_df())

# close session connection.
session.close()
50 changes: 50 additions & 0 deletions iotdb-client/client-py/tests/integration/test_dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,56 @@ def test_simple_query():
assert_array_equal(df.values, [[123.0, 15.0]])


def test_stream_query():
with IoTDBContainer("iotdb:dev") as db:
db: IoTDBContainer
session = Session(
db.get_container_host_ip(), db.get_exposed_port(6667), fetch_size=1
)
session.open(False)
session.execute_non_query_statement("CREATE DATABASE root.device0")

# Write data
session.insert_str_record("root.device0", 123, "pressure", "15.0")
session.insert_str_record("root.device0", 124, "pressure", "15.0")
session.insert_str_record("root.device0", 125, "pressure", "15.0")

# Read
session_data_set = session.execute_query_statement("SELECT * FROM root.device0")
index = 0
while session_data_set.has_next_df():
df = session_data_set.next_df()
assert list(df.columns) == ["Time", "root.device0.pressure"]
assert_array_equal(df.values, [[123.0 + index, 15.0]])
index += 1
session.close()
assert index == 3


def test_stream_query_with_illegal_fetch_size():
with IoTDBContainer("iotdb:dev") as db:
db: IoTDBContainer
session = Session(
db.get_container_host_ip(), db.get_exposed_port(6667), fetch_size=-1
)
session.open(False)
session.execute_non_query_statement("CREATE DATABASE root.device0")

# Write data
session.insert_str_record("root.device0", 123, "pressure", "15.0")
session.insert_str_record("root.device0", 124, "pressure", "15.0")
session.insert_str_record("root.device0", 125, "pressure", "15.0")

# Read
session_data_set = session.execute_query_statement("SELECT * FROM root.device0")

while session_data_set.has_next_df():
df = session_data_set.next_df()
assert list(df.columns) == ["Time", "root.device0.pressure"]
assert_array_equal(df.values, [[123.0, 15.0], [124.0, 15.0], [125.0, 15.0]])
session.close()


def test_non_time_query():
with IoTDBContainer("iotdb:dev") as db:
db: IoTDBContainer
Expand Down
Loading