From 3d3d801414c8e3bbecf3b16450275150cfe476bd Mon Sep 17 00:00:00 2001 From: Jaromir Hamala Date: Fri, 3 Jan 2025 15:47:24 +0100 Subject: [PATCH 01/12] Implement a custom SELECT with SAMPLE BY support TODO: - tests, including both sqlalchemy 1.4 and 2.0 and in combination with other clauses (GROUP BY etc) - documentation Example usage: ```python from sqlalchemy import create_engine, MetaData, Table, Column from questdb_connect import ( Timestamp, Double, Symbol, ) from sqlalchemy import func from questdb_connect import select engine = create_engine('questdb://admin:quest@localhost:8812/main') metadata = MetaData() # Define a table for sensor readings sensors = Table( 'sensors', metadata, Column('ts', Timestamp), Column('temperature', Double), Column('humidity', Double), Column('location', Symbol), ) def main(): metadata.create_all(engine) location_samples = select( sensors.c.ts, func.avg(sensors.c.temperature).label('avg_temp'), func.min(sensors.c.temperature).label('min_temp'), func.max(sensors.c.temperature).label('max_temp') ).where( sensors.c.location == 'warehouse' ).sample_by(1, 'd'); with engine.connect() as conn: for row in conn.execute(location_samples).fetchall(): print(f"Time: {row.ts}, Average Temp: {row.avg_temp}, Minimal Temp: {row.min_temp}, Maximal Temp: {row.max_temp}") if __name__ == '__main__': main() ``` --- src/questdb_connect/__init__.py | 6 ++ src/questdb_connect/compilers.py | 33 ++++++++++ src/questdb_connect/dml.py | 105 +++++++++++++++++++++++++++++++ 3 files changed, 144 insertions(+) create mode 100644 src/questdb_connect/dml.py diff --git a/src/questdb_connect/__init__.py b/src/questdb_connect/__init__.py index 82042ef..7d5b886 100644 --- a/src/questdb_connect/__init__.py +++ b/src/questdb_connect/__init__.py @@ -11,6 +11,7 @@ create_engine, create_superset_engine, ) +from questdb_connect.dml import select, QDBSelect from questdb_connect.identifier_preparer import QDBIdentifierPreparer from questdb_connect.inspector import QDBInspector from questdb_connect.keywords_functions import get_functions_list, get_keywords_list @@ -51,6 +52,11 @@ threadsafety = 2 paramstyle = "pyformat" +__all__ = ( + "select", + "QDBSelect", +) + class Error(Exception): pass diff --git a/src/questdb_connect/compilers.py b/src/questdb_connect/compilers.py index e45430e..053ff5b 100644 --- a/src/questdb_connect/compilers.py +++ b/src/questdb_connect/compilers.py @@ -33,6 +33,39 @@ class QDBSQLCompiler(sqlalchemy.sql.compiler.SQLCompiler, abc.ABC): # Maximum value for 64-bit signed integer (2^63 - 1) BIGINT_MAX = 9223372036854775807 + def visit_sample_by(self, sample_by, **kw): + """Compile a SAMPLE BY clause.""" + if sample_by.unit: + return f"SAMPLE BY {sample_by.value}{sample_by.unit}" + return f"SAMPLE BY {sample_by.value}" + + def visit_select(self, select, **kw): + """Add SAMPLE BY support to the standard SELECT compilation.""" + + text = super().visit_select(select, **kw) + + # TODO: The exact positioning is a big funky, fix it + if hasattr(select, '_sample_by_clause') and select._sample_by_clause is not None: + # Add SAMPLE BY before ORDER BY and LIMIT + sample_text = self.process(select._sample_by_clause, **kw) + + # Find positions of ORDER BY and LIMIT + order_by_pos = text.find("ORDER BY") + limit_pos = text.find("LIMIT") + + # Determine where to insert SAMPLE BY + if order_by_pos >= 0: + # Insert before ORDER BY + text = text[:order_by_pos] + sample_text + " " + text[order_by_pos:] + elif limit_pos >= 0: + # Insert before LIMIT + text = text[:limit_pos] + sample_text + " " + text[limit_pos:] + else: + # Append at the end + text += " " + sample_text + + return text + def _is_safe_for_fast_insert_values_helper(self): return True diff --git a/src/questdb_connect/dml.py b/src/questdb_connect/dml.py new file mode 100644 index 0000000..91e0fde --- /dev/null +++ b/src/questdb_connect/dml.py @@ -0,0 +1,105 @@ +from __future__ import annotations + +from typing import Any, Optional, Union, Sequence + +from sqlalchemy.sql import Select as StandardSelect +from sqlalchemy.sql import ClauseElement +from sqlalchemy import select as sa_select +from sqlalchemy.sql.visitors import Visitable + + +class SampleByClause(ClauseElement): + """Represents the QuestDB SAMPLE BY clause.""" + + __visit_name__ = "sample_by" + stringify_dialect = "questdb" + + def __init__( + self, + value: Union[int, float], + unit: Optional[str] = None + ): + self.value = value + self.unit = unit.lower() if unit else None + + def __str__(self) -> str: + if self.unit: + return f"SAMPLE BY {self.value}{self.unit}" + return f"SAMPLE BY {self.value}" + + def get_children(self, **kwargs: Any) -> Sequence[Visitable]: + return [] + + +class QDBSelect(StandardSelect): + """QuestDB-specific implementation of SELECT. + + Adds methods for QuestDB-specific syntaxes such as SAMPLE BY. + + The :class:`_questdb.QDBSelect` object is created using the + :func:`sqlalchemy.dialects.questdb.select` function. + """ + + stringify_dialect = "questdb" + _sample_by_clause: Optional[SampleByClause] = None + + def get_children(self, **kwargs: Any) -> Sequence[Visitable]: + children = super().get_children(**kwargs) + if self._sample_by_clause is not None: + children = children + [self._sample_by_clause] + return children + + def sample_by( + self, + value: Union[int, float], + unit: Optional[str] = None + ) -> QDBSelect: + """Add a SAMPLE BY clause to the select statement. + + The SAMPLE BY clause allows time-based sampling of data. + + :param value: + For time-based sampling: the time interval + + + :param unit: + Time unit for sampling: + - 's': seconds + - 'm': minutes + - 'h': hours + - 'd': days + + Example time-based sampling:: + + select([table.c.value]).sample_by(1, 'h') # sample every hour + select([table.c.value]).sample_by(30, 'm') # sample every 30 minutes + + """ + # Create a copy of our object with _generative + s = self.__class__.__new__(self.__class__) + s.__dict__ = self.__dict__.copy() + + # Set the sample by clause + s._sample_by_clause = SampleByClause(value, unit) + return s + + +def select(*entities: Any, **kwargs: Any) -> QDBSelect: + """Construct a QuestDB-specific variant :class:`_questdb.Select` construct. + + .. container:: inherited_member + + The :func:`sqlalchemy.dialects.questdb.select` function creates + a :class:`sqlalchemy.dialects.questdb.Select`. This class is based + on the dialect-agnostic :class:`_sql.Select` construct which may + be constructed using the :func:`_sql.select` function in + SQLAlchemy Core. + + The :class:`_questdb.Select` construct includes additional method + :meth:`_questdb.Select.sample_by` for QuestDB's SAMPLE BY clause. + """ + stmt = sa_select(*entities, **kwargs) + # Convert the SQLAlchemy Select into our QDBSelect + qdbs = QDBSelect.__new__(QDBSelect) + qdbs.__dict__ = stmt.__dict__.copy() + return qdbs \ No newline at end of file From 92b4cf1ebfcacd65531c37ee4ee9849c040e8cb3 Mon Sep 17 00:00:00 2001 From: Jaromir Hamala Date: Mon, 13 Jan 2025 15:35:14 +0100 Subject: [PATCH 02/12] better SAMPLE BY injection this impl no longer depends on SQL Text post-processing and uses clause rendering instead. this makes it more robust. --- src/questdb_connect/compilers.py | 46 +++++++++++++++++--------------- 1 file changed, 25 insertions(+), 21 deletions(-) diff --git a/src/questdb_connect/compilers.py b/src/questdb_connect/compilers.py index 053ff5b..0102726 100644 --- a/src/questdb_connect/compilers.py +++ b/src/questdb_connect/compilers.py @@ -1,6 +1,7 @@ import abc import sqlalchemy +from sqlalchemy.sql.base import elements from .common import quote_identifier, remove_public_schema from .types import QDBTypeMixin @@ -39,31 +40,34 @@ def visit_sample_by(self, sample_by, **kw): return f"SAMPLE BY {sample_by.value}{sample_by.unit}" return f"SAMPLE BY {sample_by.value}" + def group_by_clause(self, select, **kw): + """Customize GROUP BY to also render SAMPLE BY.""" + text = "" + + # Add SAMPLE BY first if present + if select._sample_by_clause is not None: + text += " " + self.process(select._sample_by_clause, **kw) + + # Use parent's GROUP BY implementation + group_by_text = super().group_by_clause(select, **kw) + if group_by_text: + text += group_by_text + + return text + def visit_select(self, select, **kw): """Add SAMPLE BY support to the standard SELECT compilation.""" - text = super().visit_select(select, **kw) - - # TODO: The exact positioning is a big funky, fix it - if hasattr(select, '_sample_by_clause') and select._sample_by_clause is not None: - # Add SAMPLE BY before ORDER BY and LIMIT - sample_text = self.process(select._sample_by_clause, **kw) - - # Find positions of ORDER BY and LIMIT - order_by_pos = text.find("ORDER BY") - limit_pos = text.find("LIMIT") - - # Determine where to insert SAMPLE BY - if order_by_pos >= 0: - # Insert before ORDER BY - text = text[:order_by_pos] + sample_text + " " + text[order_by_pos:] - elif limit_pos >= 0: - # Insert before LIMIT - text = text[:limit_pos] + sample_text + " " + text[limit_pos:] - else: - # Append at the end - text += " " + sample_text + # If we have SAMPLE BY but no GROUP BY, + # add a dummy GROUP BY clause to trigger the rendering + if ( + select._sample_by_clause is not None + and not select._group_by_clauses + ): + select = select._clone() + select._group_by_clauses = [elements.TextClause("")] + text = super().visit_select(select, **kw) return text def _is_safe_for_fast_insert_values_helper(self): From 92a90e443a42805dd9d512be6baf46d3a2944dd8 Mon Sep 17 00:00:00 2001 From: Jaromir Hamala Date: Fri, 3 Jan 2025 16:52:53 +0100 Subject: [PATCH 03/12] style fixes --- src/questdb_connect/__init__.py | 2 +- src/questdb_connect/dml.py | 12 +++++++----- 2 files changed, 8 insertions(+), 6 deletions(-) diff --git a/src/questdb_connect/__init__.py b/src/questdb_connect/__init__.py index 7d5b886..305254b 100644 --- a/src/questdb_connect/__init__.py +++ b/src/questdb_connect/__init__.py @@ -11,7 +11,7 @@ create_engine, create_superset_engine, ) -from questdb_connect.dml import select, QDBSelect +from questdb_connect.dml import QDBSelect, select from questdb_connect.identifier_preparer import QDBIdentifierPreparer from questdb_connect.inspector import QDBInspector from questdb_connect.keywords_functions import get_functions_list, get_keywords_list diff --git a/src/questdb_connect/dml.py b/src/questdb_connect/dml.py index 91e0fde..b64b1a3 100644 --- a/src/questdb_connect/dml.py +++ b/src/questdb_connect/dml.py @@ -1,11 +1,13 @@ from __future__ import annotations -from typing import Any, Optional, Union, Sequence +from typing import TYPE_CHECKING, Any, Optional, Sequence, Union -from sqlalchemy.sql import Select as StandardSelect -from sqlalchemy.sql import ClauseElement from sqlalchemy import select as sa_select -from sqlalchemy.sql.visitors import Visitable +from sqlalchemy.sql import ClauseElement +from sqlalchemy.sql import Select as StandardSelect + +if TYPE_CHECKING: + from sqlalchemy.sql.visitors import Visitable class SampleByClause(ClauseElement): @@ -46,7 +48,7 @@ class QDBSelect(StandardSelect): def get_children(self, **kwargs: Any) -> Sequence[Visitable]: children = super().get_children(**kwargs) if self._sample_by_clause is not None: - children = children + [self._sample_by_clause] + children = [*children, self._sample_by_clause] return children def sample_by( From ab1b354ac1a9831c36481c18fe67f13b56adcbe1 Mon Sep 17 00:00:00 2001 From: Jaromir Hamala Date: Mon, 13 Jan 2025 18:14:30 +0100 Subject: [PATCH 04/12] happy path tests --- tests/conftest.py | 26 +++++++++++++ tests/test_dialect.py | 88 +++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 114 insertions(+) diff --git a/tests/conftest.py b/tests/conftest.py index 222925f..b12c2d4 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -126,6 +126,32 @@ def collect_select_all(session, expected_rows) -> str: if rs.rowcount == expected_rows: return '\n'.join(str(row) for row in rs) +def wait_until_table_is_ready(test_engine, table_name, expected_rows, timeout=10): + """ + Wait until a table has the expected number of rows, with timeout. + Args: + test_engine: SQLAlchemy engine + table_name: Name of the table to check + expected_rows: Expected number of rows + timeout: Maximum time to wait in seconds (default: 10 seconds) + Returns: + bool: True if table is ready, False if timeout occurred + Raises: + sqlalchemy.exc.SQLAlchemyError: If there's a database error + """ + start_time = time.time() + + while time.time() - start_time < timeout: + with test_engine.connect() as conn: + result = conn.execute(text(f'SELECT count(*) FROM {table_name}')) + row = result.fetchone() + if row and row[0] == expected_rows: + return True + + print(f'Waiting for table {table_name} to have {expected_rows} rows, current: {row[0] if row else 0}') + time.sleep(0.01) # Wait 10ms between checks + return False + def wait_until_table_is_ready(test_engine, table_name, expected_rows, timeout=10): """ diff --git a/tests/test_dialect.py b/tests/test_dialect.py index 26aa19f..cf4f6a0 100644 --- a/tests/test_dialect.py +++ b/tests/test_dialect.py @@ -1,5 +1,6 @@ import datetime +import questdb_connect import questdb_connect as qdbc import sqlalchemy as sqla from sqlalchemy.orm import Session @@ -13,6 +14,93 @@ ) +def test_sample_by_clause(test_engine, test_model): + """Test SAMPLE BY clause functionality.""" + base_ts = datetime.datetime(2023, 4, 12, 0, 0, 0) + session = Session(test_engine) + try: + # Insert test data - one row every minute for 2 hours + num_rows = 120 # 2 hours * 60 minutes + models = [ + test_model( + col_boolean=True, + col_byte=8, + col_short=12, + col_int=idx, + col_long=14, + col_float=15.234, + col_double=16.88993244, + col_symbol='coconut', + col_string='banana', + col_char='C', + col_uuid='6d5eb038-63d1-4971-8484-30c16e13de5b', + col_date=base_ts.date(), + # Add idx minutes to base timestamp + col_ts=base_ts + datetime.timedelta(minutes=idx), + col_geohash='dfvgsj2vptwu', + col_long256='0xa3b400fcf6ed707d710d5d4e672305203ed3cc6254d1cefe313e4a465861f42a', + col_varchar='pineapple' + ) for idx in range(num_rows) + ] + session.bulk_save_objects(models) + session.commit() + + metadata = sqla.MetaData() + table = sqla.Table(ALL_TYPES_TABLE_NAME, metadata, autoload_with=test_engine) + wait_until_table_is_ready(test_engine, ALL_TYPES_TABLE_NAME, num_rows) + + with test_engine.connect() as conn: + # Simple SAMPLE BY + query = ( + questdb_connect.select(table.c.col_ts, sqla.func.avg(table.c.col_int).label('avg_int')) + .sample_by(30, 'm') # 30 minute samples + ) + result = conn.execute(query) + rows = result.fetchall() + assert len(rows) == 4 # 2 hours should give us 4 30-minute samples + + # Verify sample averages + # First 30 min should average 0-29, second 30-59, etc. + expected_averages = [14.5, 44.5, 74.5, 104.5] # (min+max)/2 for each 30-min period + for row, expected_avg in zip(rows, expected_averages): + assert abs(row.avg_int - expected_avg) < 0.1 + + # SAMPLE BY with ORDER BY + query = ( + questdb_connect.select(table.c.col_ts, sqla.func.avg(table.c.col_int).label('avg_int')) + .sample_by(1, 'h') # 1 hour samples + .order_by(sqla.desc('avg_int')) + ) + result = conn.execute(query) + rows = result.fetchall() + assert len(rows) == 2 # 2 one-hour samples + assert rows[0].avg_int > rows[1].avg_int # Descending order + + # SAMPLE BY with WHERE clause + query = ( + questdb_connect.select(table.c.col_ts, sqla.func.avg(table.c.col_int).label('avg_int')) + .where(table.c.col_int > 30) + .sample_by(1, 'h') + ) + result = conn.execute(query) + rows = result.fetchall() + assert len(rows) == 2 + assert all(row.avg_int > 30 for row in rows) + + # SAMPLE BY with LIMIT + query = ( + questdb_connect.select(table.c.col_ts, sqla.func.avg(table.c.col_int).label('avg_int')) + .sample_by(15, 'm') # 15 minute samples + .limit(3) + ) + result = conn.execute(query) + rows = result.fetchall() + assert len(rows) == 3 # Should limit to first 3 samples + + finally: + if session: + session.close() + def test_insert(test_engine, test_model): with test_engine.connect() as conn: assert test_engine.dialect.has_table(conn, ALL_TYPES_TABLE_NAME) From c55ee58e5d5d7ef77a43982c59af6b76339fcc15 Mon Sep 17 00:00:00 2001 From: Jaromir Hamala Date: Tue, 14 Jan 2025 11:44:27 +0100 Subject: [PATCH 05/12] test with GROUP BY --- tests/test_dialect.py | 25 +++++++++++++++++++++++++ 1 file changed, 25 insertions(+) diff --git a/tests/test_dialect.py b/tests/test_dialect.py index cf4f6a0..bc064a8 100644 --- a/tests/test_dialect.py +++ b/tests/test_dialect.py @@ -97,6 +97,31 @@ def test_sample_by_clause(test_engine, test_model): rows = result.fetchall() assert len(rows) == 3 # Should limit to first 3 samples + # SAMPLE BY with GROUP BY, ORDER BY and LIMIT + query = ( + questdb_connect.select(table.c.col_ts, sqla.func.avg(table.c.col_int).label('avg_int')) + .sample_by(30, 'm') # 15 minute samples + .group_by(table.c.col_int) + .order_by(table.c.col_int.desc()) + .limit(3) + ) + result = conn.execute(query) + rows = result.fetchall() + assert len(rows) == 3 # Should limit to first 3 samples + # expected results: + # 2023-04-12T01:30:00.000000Z, 119 + # 2023-04-12T01:30:00.000000Z, 118 + # 2023-04-12T01:30:00.000000Z, 117 + assert rows[0].col_ts == datetime.datetime(2023, 4, 12, 1, 30) + assert rows[0].avg_int == 119 + + assert rows[1].col_ts == datetime.datetime(2023, 4, 12, 1, 30) + assert rows[1].avg_int == 118 + + assert rows[2].col_ts == datetime.datetime(2023, 4, 12, 1, 30) + assert rows[2].avg_int == 117 + + finally: if session: session.close() From 879e8115bced8429efaffbd347650f020283f70b Mon Sep 17 00:00:00 2001 From: Jaromir Hamala Date: Tue, 14 Jan 2025 11:54:54 +0100 Subject: [PATCH 06/12] test SAMPLE BY with subqueries --- tests/test_dialect.py | 85 ++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 83 insertions(+), 2 deletions(-) diff --git a/tests/test_dialect.py b/tests/test_dialect.py index bc064a8..35f2ba2 100644 --- a/tests/test_dialect.py +++ b/tests/test_dialect.py @@ -1,5 +1,7 @@ import datetime +from sqlglot import subquery + import questdb_connect import questdb_connect as qdbc import sqlalchemy as sqla @@ -14,6 +16,87 @@ ) +def test_sample_by_in_subquery(test_engine, test_model): + """Test SAMPLE BY usage within subqueries.""" + base_ts = datetime.datetime(2023, 4, 12, 0, 0, 0) + session = Session(test_engine) + try: + # Insert test data - one row every minute for 2 hours + num_rows = 120 # 2 hours * 60 minutes + models = [ + test_model( + col_boolean=True, + col_byte=8, + col_short=12, + col_int=idx, + col_long=14, + col_float=15.234, + col_double=16.88993244, + col_symbol='coconut', + col_string='banana', + col_char='C', + col_uuid='6d5eb038-63d1-4971-8484-30c16e13de5b', + col_date=base_ts.date(), + col_ts=base_ts + datetime.timedelta(minutes=idx), + col_geohash='dfvgsj2vptwu', + col_long256='0xa3b400fcf6ed707d710d5d4e672305203ed3cc6254d1cefe313e4a465861f42a', + col_varchar='pineapple' + ) for idx in range(num_rows) + ] + session.bulk_save_objects(models) + session.commit() + + metadata = sqla.MetaData() + table = sqla.Table(ALL_TYPES_TABLE_NAME, metadata, autoload_with=test_engine) + wait_until_table_is_ready(test_engine, ALL_TYPES_TABLE_NAME, num_rows) + + with test_engine.connect() as conn: + # Subquery with SAMPLE BY + subq = ( + questdb_connect.select( + table.c.col_ts, + sqla.func.avg(table.c.col_int).label('avg_int') + ) + .sample_by(30, 'm') # 30 minute samples in subquery + .subquery() + ) + + # Main query selecting from subquery with extra conditions + query = ( + questdb_connect.select( + subq.c.col_ts, + subq.c.avg_int + ) + .where(subq.c.avg_int > 30) + .order_by(subq.c.col_ts) + ) + + result = conn.execute(query) + rows = result.fetchall() + + # Should only get samples from second half of the data + # where averages are > 30 + assert len(rows) == 3 # expecting 2 30-min samples > 30 + assert all(row.avg_int > 30 for row in rows) + assert rows[0].avg_int < rows[1].avg_int # ordered by timestamp + + # Test nested aggregation + outer_query = ( + questdb_connect.select( + sqla.func.sum(subq.c.avg_int).label('total_avg') + ) + .select_from(subq) + ) + + result = conn.execute(outer_query) + row = result.fetchone() + # Sum of all 30-min sample averages + assert row.total_avg == 238; + + finally: + if session: + session.close() + def test_sample_by_clause(test_engine, test_model): """Test SAMPLE BY clause functionality.""" base_ts = datetime.datetime(2023, 4, 12, 0, 0, 0) @@ -120,8 +203,6 @@ def test_sample_by_clause(test_engine, test_model): assert rows[2].col_ts == datetime.datetime(2023, 4, 12, 1, 30) assert rows[2].avg_int == 117 - - finally: if session: session.close() From 0e6e8c7cafcab23c6d13bc6c90d885e5a68706fe Mon Sep 17 00:00:00 2001 From: Jaromir Hamala Date: Tue, 14 Jan 2025 12:21:09 +0100 Subject: [PATCH 07/12] SAMPLE BY options --- src/questdb_connect/compilers.py | 28 +++++++++- src/questdb_connect/dml.py | 49 +++++++++-------- tests/test_dialect.py | 91 ++++++++++++++++++++++++++++++++ 3 files changed, 143 insertions(+), 25 deletions(-) diff --git a/src/questdb_connect/compilers.py b/src/questdb_connect/compilers.py index 0102726..ef002ac 100644 --- a/src/questdb_connect/compilers.py +++ b/src/questdb_connect/compilers.py @@ -36,9 +36,33 @@ class QDBSQLCompiler(sqlalchemy.sql.compiler.SQLCompiler, abc.ABC): def visit_sample_by(self, sample_by, **kw): """Compile a SAMPLE BY clause.""" + text = "" + + # Basic SAMPLE BY if sample_by.unit: - return f"SAMPLE BY {sample_by.value}{sample_by.unit}" - return f"SAMPLE BY {sample_by.value}" + text = f"SAMPLE BY {sample_by.value}{sample_by.unit}" + else: + text = f"SAMPLE BY {sample_by.value}" + + # Add FILL if specified + if sample_by.fill is not None: + if isinstance(sample_by.fill, str): + text += f" FILL({sample_by.fill})" + else: + text += f" FILL({sample_by.fill:g})" + + # Add ALIGN TO clause + text += f" ALIGN TO {sample_by.align_to}" + + # Add TIME ZONE if specified + if sample_by.timezone: + text += f" TIME ZONE '{sample_by.timezone}'" + + # Add WITH OFFSET if specified + if sample_by.offset: + text += f" WITH OFFSET '{sample_by.offset}'" + + return text def group_by_clause(self, select, **kw): """Customize GROUP BY to also render SAMPLE BY.""" diff --git a/src/questdb_connect/dml.py b/src/questdb_connect/dml.py index b64b1a3..000652e 100644 --- a/src/questdb_connect/dml.py +++ b/src/questdb_connect/dml.py @@ -19,10 +19,18 @@ class SampleByClause(ClauseElement): def __init__( self, value: Union[int, float], - unit: Optional[str] = None + unit: Optional[str] = None, + fill: Optional[Union[str, float]] = None, + align_to: str = "CALENDAR", # default per docs + timezone: Optional[str] = None, + offset: Optional[str] = None, ): self.value = value self.unit = unit.lower() if unit else None + self.fill = fill + self.align_to = align_to.upper() + self.timezone = timezone + self.offset = offset def __str__(self) -> str: if self.unit: @@ -54,35 +62,30 @@ def get_children(self, **kwargs: Any) -> Sequence[Visitable]: def sample_by( self, value: Union[int, float], - unit: Optional[str] = None + unit: Optional[str] = None, + fill: Optional[Union[str, float]] = None, + align_to: str = "CALENDAR", + timezone: Optional[str] = None, + offset: Optional[str] = None, ) -> QDBSelect: - """Add a SAMPLE BY clause to the select statement. - - The SAMPLE BY clause allows time-based sampling of data. - - :param value: - For time-based sampling: the time interval - - - :param unit: - Time unit for sampling: - - 's': seconds - - 'm': minutes - - 'h': hours - - 'd': days - - Example time-based sampling:: - - select([table.c.value]).sample_by(1, 'h') # sample every hour - select([table.c.value]).sample_by(30, 'm') # sample every 30 minutes - + """Add a SAMPLE BY clause. + + :param value: time interval value + :param unit: 's' for seconds, 'm' for minutes, 'h' for hours, etc. + :param fill: fill strategy - NONE, NULL, PREV, LINEAR, or constant value + :param align_to: CALENDAR or FIRST OBSERVATION + :param timezone: Optional timezone for calendar alignment + :param offset: Optional offset in format '+/-HH:mm' """ + # Create a copy of our object with _generative s = self.__class__.__new__(self.__class__) s.__dict__ = self.__dict__.copy() # Set the sample by clause - s._sample_by_clause = SampleByClause(value, unit) + s._sample_by_clause = SampleByClause( + value, unit, fill, align_to, timezone, offset + ) return s diff --git a/tests/test_dialect.py b/tests/test_dialect.py index 35f2ba2..388a11d 100644 --- a/tests/test_dialect.py +++ b/tests/test_dialect.py @@ -416,6 +416,97 @@ def test_bulk_insert(test_engine, test_model): assert collect_select_all_raw_connection(test_engine, expected_rows=num_rows) == expected +def test_sample_by_options(test_engine, test_model): + """Test SAMPLE BY with ALIGN TO and FILL options.""" + base_ts = datetime.datetime(2023, 4, 12, 0, 0, 0) + session = Session(test_engine) + try: + # Insert test data - one row every hour for a day + num_rows = 24 + models = [ + test_model( + col_int=idx, + col_ts=base_ts + datetime.timedelta(hours=idx), + ) for idx in range(num_rows) + ] + # Add some gaps by removing every 3rd record + models = [m for i, m in enumerate(models) if i % 3 != 0] + + session.bulk_save_objects(models) + session.commit() + + metadata = sqla.MetaData() + table = sqla.Table(ALL_TYPES_TABLE_NAME, metadata, autoload_with=test_engine) + wait_until_table_is_ready(test_engine, ALL_TYPES_TABLE_NAME, len(models)) + + with test_engine.connect() as conn: + # Test FILL(NULL) + query = ( + questdb_connect.select(table.c.col_ts, sqla.func.avg(table.c.col_int).label('avg_int')) + .sample_by(15, 'm', fill="NULL") + ) + result = conn.execute(query) + rows = result.fetchall() + assert len(rows) == 89 + # Should have NULLs for missing data points + assert any(row.avg_int is None for row in rows) + + + # Test FILL(PREV) + query = ( + questdb_connect.select(table.c.col_ts, sqla.func.avg(table.c.col_int).label('avg_int')) + .sample_by(15, 'm', fill="PREV") + ) + result = conn.execute(query) + rows = result.fetchall() + assert all(row.avg_int is not None for row in rows) + + # Test FILL with constant + query = ( + questdb_connect.select(table.c.col_ts, sqla.func.avg(table.c.col_int).label('avg_int')) + .sample_by(15, 'm', fill=999.99) + .limit(10) + ) + result = conn.execute(query) + rows = result.fetchall() + assert any(row.avg_int == 999.99 for row in rows) + + # Test ALIGN TO FIRST OBSERVATION + query = ( + questdb_connect.select(table.c.col_ts, sqla.func.avg(table.c.col_int).label('avg_int')) + .sample_by(15, 'm', align_to="FIRST OBSERVATION") + .limit(10) + ) + result = conn.execute(query) + first_row = result.fetchone() + # First timestamp should match our first data point + assert first_row.col_ts == models[0].col_ts + + # Test with timezone + query = ( + questdb_connect.select(table.c.col_ts, sqla.func.avg(table.c.col_int).label('avg_int')) + .sample_by(1, 'd', align_to="CALENDAR", timezone="Europe/Prague") + ) + result = conn.execute(query) + rows = result.fetchall() + # First row should be at midnight Prague time, that is 22:00 UTC the previous day + assert rows[0].col_ts.hour == 22 + assert rows[1].col_ts.hour == 22 + + # Test with offset + query = ( + questdb_connect.select(table.c.col_ts, sqla.func.avg(table.c.col_int).label('avg_int')) + .sample_by(1, 'd', align_to="CALENDAR", offset="02:00") + ) + result = conn.execute(query) + rows = result.fetchall() + # First row should start at 02:00 + assert rows[0].col_ts.hour == 2 + + finally: + if session: + session.close() + def test_dialect_get_schema_names(test_engine): dialect = qdbc.QuestDBDialect() with test_engine.connect() as conn: From 9044847d0a7544f555e50e17fe5697e0294986e7 Mon Sep 17 00:00:00 2001 From: Jaromir Hamala Date: Tue, 14 Jan 2025 12:37:39 +0100 Subject: [PATCH 08/12] implement FROM-TO --- src/questdb_connect/compilers.py | 6 ++ src/questdb_connect/dml.py | 9 ++- tests/test_dialect.py | 102 +++++++++++++++++++++++++++++++ 3 files changed, 116 insertions(+), 1 deletion(-) diff --git a/src/questdb_connect/compilers.py b/src/questdb_connect/compilers.py index ef002ac..c428163 100644 --- a/src/questdb_connect/compilers.py +++ b/src/questdb_connect/compilers.py @@ -44,6 +44,12 @@ def visit_sample_by(self, sample_by, **kw): else: text = f"SAMPLE BY {sample_by.value}" + if sample_by.from_timestamp: + # Format datetime to ISO format that QuestDB expects + text += f" FROM '{sample_by.from_timestamp.isoformat()}'" + if sample_by.to_timestamp: + text += f" TO '{sample_by.to_timestamp.isoformat()}'" + # Add FILL if specified if sample_by.fill is not None: if isinstance(sample_by.fill, str): diff --git a/src/questdb_connect/dml.py b/src/questdb_connect/dml.py index 000652e..9a54207 100644 --- a/src/questdb_connect/dml.py +++ b/src/questdb_connect/dml.py @@ -1,5 +1,6 @@ from __future__ import annotations +from datetime import datetime, date from typing import TYPE_CHECKING, Any, Optional, Sequence, Union from sqlalchemy import select as sa_select @@ -24,6 +25,8 @@ def __init__( align_to: str = "CALENDAR", # default per docs timezone: Optional[str] = None, offset: Optional[str] = None, + from_timestamp: Optional[Union[datetime, date]] = None, + to_timestamp: Optional[Union[datetime, date]] = None ): self.value = value self.unit = unit.lower() if unit else None @@ -31,6 +34,8 @@ def __init__( self.align_to = align_to.upper() self.timezone = timezone self.offset = offset + self.from_timestamp = from_timestamp + self.to_timestamp = to_timestamp def __str__(self) -> str: if self.unit: @@ -67,6 +72,8 @@ def sample_by( align_to: str = "CALENDAR", timezone: Optional[str] = None, offset: Optional[str] = None, + from_timestamp: Optional[Union[datetime, date]] = None, + to_timestamp: Optional[Union[datetime, date]] = None, ) -> QDBSelect: """Add a SAMPLE BY clause. @@ -84,7 +91,7 @@ def sample_by( # Set the sample by clause s._sample_by_clause = SampleByClause( - value, unit, fill, align_to, timezone, offset + value, unit, fill, align_to, timezone, offset, from_timestamp, to_timestamp ) return s diff --git a/tests/test_dialect.py b/tests/test_dialect.py index 388a11d..b68219e 100644 --- a/tests/test_dialect.py +++ b/tests/test_dialect.py @@ -416,6 +416,108 @@ def test_bulk_insert(test_engine, test_model): assert collect_select_all_raw_connection(test_engine, expected_rows=num_rows) == expected +def test_sample_by_from_to(test_engine, test_model): + """Test SAMPLE BY with FROM-TO extension.""" + base_ts = datetime.datetime(2023, 4, 12, 0, 0, 0) + day_before = base_ts - datetime.timedelta(days=1) + day_after = base_ts + datetime.timedelta(days=1) + session = Session(test_engine) + try: + num_rows = 6 # 6 hours only + models = [ + test_model( + col_int=idx, + col_ts=base_ts + datetime.timedelta(hours=idx), + ) for idx in range(num_rows) + ] + + session.bulk_save_objects(models) + session.commit() + + metadata = sqla.MetaData() + table = sqla.Table(ALL_TYPES_TABLE_NAME, metadata, autoload_with=test_engine) + wait_until_table_is_ready(test_engine, ALL_TYPES_TABLE_NAME, len(models)) + + with test_engine.connect() as conn: + # Test FROM-TO with FILL + query = ( + questdb_connect.select( + table.c.col_ts, + sqla.func.avg(table.c.col_int).label('avg_int') + ) + .sample_by( + 1, 'h', + fill="NULL", + from_timestamp=day_before, # day before data starts + to_timestamp=day_after # day after data ends + ) + ) + result = conn.execute(query) + rows = result.fetchall() + + assert len(rows) == 48 # 48 hours in total + + # First rows should be NULL (before our data starts) + assert rows[0].avg_int is None + assert rows[1].avg_int is None + assert rows[2].avg_int is None + assert rows[3].avg_int is None + + # Middle rows should have data + assert any(row.avg_int is not None for row in rows[4:-4]) + + # Last rows should be NULL (after our data ends) + assert rows[-4].avg_int is None + assert rows[-3].avg_int is None + assert rows[-2].avg_int is None + assert rows[-1].avg_int is None + + # Test FROM only + query = ( + questdb_connect.select( + table.c.col_ts, + sqla.func.avg(table.c.col_int).label('avg_int') + ) + .sample_by( + 1, 'h', + fill="NULL", + from_timestamp=day_before # day before data starts + ) + ) + result = conn.execute(query) + rows = result.fetchall() + + # First rows should be NULL + assert rows[0].avg_int is None + assert rows[1].avg_int is None + assert rows[2].avg_int is None + assert rows[3].avg_int is None + + # Test TO only + query = ( + questdb_connect.select( + table.c.col_ts, + sqla.func.avg(table.c.col_int).label('avg_int') + ) + .sample_by( + 1, 'h', + fill="NULL", + to_timestamp=day_after # day after data ends + ) + ) + result = conn.execute(query) + rows = result.fetchall() + + # Last rows should be NULL + assert rows[-4].avg_int is None + assert rows[-3].avg_int is None + assert rows[-2].avg_int is None + assert rows[-1].avg_int is None + + finally: + if session: + session.close() + def test_sample_by_options(test_engine, test_model): """Test SAMPLE BY with ALIGN TO and FILL options.""" base_ts = datetime.datetime(2023, 4, 12, 0, 0, 0) From 9edd4616e7f2f54fa267c9f770fb3a2e13a6e8aa Mon Sep 17 00:00:00 2001 From: Jaromir Hamala Date: Tue, 14 Jan 2025 12:43:03 +0100 Subject: [PATCH 09/12] forgotten doc --- src/questdb_connect/dml.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/questdb_connect/dml.py b/src/questdb_connect/dml.py index 9a54207..6dfe660 100644 --- a/src/questdb_connect/dml.py +++ b/src/questdb_connect/dml.py @@ -83,6 +83,8 @@ def sample_by( :param align_to: CALENDAR or FIRST OBSERVATION :param timezone: Optional timezone for calendar alignment :param offset: Optional offset in format '+/-HH:mm' + :param from_timestamp: Optional start timestamp for the sample + :param to_timestamp: Optional end timestamp for the sample """ # Create a copy of our object with _generative From 81b99c1d0c81c8397dc3c949841fb82a36d3d3d2 Mon Sep 17 00:00:00 2001 From: Jaromir Hamala Date: Tue, 14 Jan 2025 14:07:15 +0100 Subject: [PATCH 10/12] styles --- src/questdb_connect/dml.py | 3 ++- tests/test_dialect.py | 2 -- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/src/questdb_connect/dml.py b/src/questdb_connect/dml.py index 6dfe660..506bb00 100644 --- a/src/questdb_connect/dml.py +++ b/src/questdb_connect/dml.py @@ -1,6 +1,5 @@ from __future__ import annotations -from datetime import datetime, date from typing import TYPE_CHECKING, Any, Optional, Sequence, Union from sqlalchemy import select as sa_select @@ -8,6 +7,8 @@ from sqlalchemy.sql import Select as StandardSelect if TYPE_CHECKING: + from datetime import date, datetime + from sqlalchemy.sql.visitors import Visitable diff --git a/tests/test_dialect.py b/tests/test_dialect.py index b68219e..c7a6183 100644 --- a/tests/test_dialect.py +++ b/tests/test_dialect.py @@ -1,7 +1,5 @@ import datetime -from sqlglot import subquery - import questdb_connect import questdb_connect as qdbc import sqlalchemy as sqla From 3f85db118fef0fe8c8958747613eb577facf0d51 Mon Sep 17 00:00:00 2001 From: Jaromir Hamala Date: Tue, 14 Jan 2025 15:41:30 +0100 Subject: [PATCH 11/12] fix core api with plain old select --- src/questdb_connect/compilers.py | 7 +++++-- tests/test_dialect.py | 34 ++++++++++++++++++++++++++++++++ 2 files changed, 39 insertions(+), 2 deletions(-) diff --git a/src/questdb_connect/compilers.py b/src/questdb_connect/compilers.py index c428163..7d5de56 100644 --- a/src/questdb_connect/compilers.py +++ b/src/questdb_connect/compilers.py @@ -75,7 +75,7 @@ def group_by_clause(self, select, **kw): text = "" # Add SAMPLE BY first if present - if select._sample_by_clause is not None: + if _has_sample_by(select): text += " " + self.process(select._sample_by_clause, **kw) # Use parent's GROUP BY implementation @@ -91,7 +91,7 @@ def visit_select(self, select, **kw): # If we have SAMPLE BY but no GROUP BY, # add a dummy GROUP BY clause to trigger the rendering if ( - select._sample_by_clause is not None + _has_sample_by(select) and not select._group_by_clauses ): select = select._clone() @@ -136,3 +136,6 @@ def limit_clause(self, select, **kw): text += f"{self.process(offset, **kw)},{self.BIGINT_MAX}" return text + +def _has_sample_by(select): + return hasattr(select, '_sample_by_clause') and select._sample_by_clause is not None \ No newline at end of file diff --git a/tests/test_dialect.py b/tests/test_dialect.py index c7a6183..aaf9ce5 100644 --- a/tests/test_dialect.py +++ b/tests/test_dialect.py @@ -516,6 +516,40 @@ def test_sample_by_from_to(test_engine, test_model): if session: session.close() +def test_plain_select_core_api(test_engine, test_model): + """ + Test plain select with core API. Plain select means select implementation from sqlalchemy.sql.selectable, + not from questdb_connect. + """ + + session = Session(test_engine) + try: + num_rows = 3 + models = [ + test_model( + col_int=idx, + col_ts=datetime.datetime(2023, 4, 12, 0, 0, 0) + datetime.timedelta(hours=idx), + ) for idx in range(num_rows) + ] + session.bulk_save_objects(models) + session.commit() + + metadata = sqla.MetaData() + table = sqla.Table(ALL_TYPES_TABLE_NAME, metadata, autoload_with=test_engine) + wait_until_table_is_ready(test_engine, ALL_TYPES_TABLE_NAME, len(models)) + + with test_engine.connect() as conn: + query = ( + # important: use sqla.select, not questdb_connect.select! + sqla.select(table.c.col_ts, table.c.col_int) + ) + result = conn.execute(query) + rows = result.fetchall() + assert len(rows) == 3 + finally: + if session: + session.close() + def test_sample_by_options(test_engine, test_model): """Test SAMPLE BY with ALIGN TO and FILL options.""" base_ts = datetime.datetime(2023, 4, 12, 0, 0, 0) From 26b266b20affcdca5bce5cf2600ec63fe60b8f92 Mon Sep 17 00:00:00 2001 From: Jaromir Hamala Date: Mon, 12 May 2025 14:28:58 +0200 Subject: [PATCH 12/12] cannot combine GROUP BY and SAMPLE BY --- tests/test_dialect.py | 24 ------------------------ 1 file changed, 24 deletions(-) diff --git a/tests/test_dialect.py b/tests/test_dialect.py index aaf9ce5..d1df8a4 100644 --- a/tests/test_dialect.py +++ b/tests/test_dialect.py @@ -177,30 +177,6 @@ def test_sample_by_clause(test_engine, test_model): result = conn.execute(query) rows = result.fetchall() assert len(rows) == 3 # Should limit to first 3 samples - - # SAMPLE BY with GROUP BY, ORDER BY and LIMIT - query = ( - questdb_connect.select(table.c.col_ts, sqla.func.avg(table.c.col_int).label('avg_int')) - .sample_by(30, 'm') # 15 minute samples - .group_by(table.c.col_int) - .order_by(table.c.col_int.desc()) - .limit(3) - ) - result = conn.execute(query) - rows = result.fetchall() - assert len(rows) == 3 # Should limit to first 3 samples - # expected results: - # 2023-04-12T01:30:00.000000Z, 119 - # 2023-04-12T01:30:00.000000Z, 118 - # 2023-04-12T01:30:00.000000Z, 117 - assert rows[0].col_ts == datetime.datetime(2023, 4, 12, 1, 30) - assert rows[0].avg_int == 119 - - assert rows[1].col_ts == datetime.datetime(2023, 4, 12, 1, 30) - assert rows[1].avg_int == 118 - - assert rows[2].col_ts == datetime.datetime(2023, 4, 12, 1, 30) - assert rows[2].avg_int == 117 finally: if session: session.close()