Skip to content

Commit a582a07

Browse files
committed
Added compression policies
1 parent 240deae commit a582a07

18 files changed

Lines changed: 443 additions & 232 deletions

src/timescaledb/compression/add.py

Lines changed: 31 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -4,30 +4,44 @@
44
from sqlmodel import Session, SQLModel
55

66
from timescaledb.compression import sql_statements as sql
7+
from timescaledb.compression.validators import (
8+
validate_compress_orderby_field,
9+
validate_compress_segmentby_field,
10+
validate_unique_segmentby_and_orderby_fields,
11+
)
712

813

9-
def add_compression_policy(session: Session, model: Type[SQLModel]) -> None:
14+
def add_compression_policy(
15+
session: Session, model: Type[SQLModel], commit: bool = True
16+
) -> None:
1017
"""
1118
Enable compression for a hypertable
1219
"""
13-
enable_compression = model.__enable_compression__
14-
if not enable_compression:
20+
enable_compression = getattr(model, "__enable_compression__", False)
21+
enable_compression_bool = str(enable_compression).lower() == "true"
22+
if not enable_compression_bool:
1523
return
24+
compress_orderby = getattr(model, "__compress_orderby__", None)
25+
valid_orderby = validate_compress_orderby_field(model, compress_orderby)
26+
compress_segmentby = getattr(model, "__compress_segmentby__", None)
27+
valid_segmentby = validate_compress_segmentby_field(model, compress_segmentby)
28+
validate_unique_segmentby_and_orderby_fields(
29+
model, compress_segmentby, compress_orderby
30+
)
31+
params = {}
32+
has_orderby = valid_orderby and compress_orderby is not None
33+
has_segmentby = valid_segmentby and compress_segmentby is not None
34+
if has_orderby:
35+
params["compress_orderby"] = compress_orderby
36+
if has_segmentby:
37+
params["compress_segmentby"] = compress_segmentby
1638

17-
# Build compression parameters dict with optional parameters
18-
compression_params = {
19-
"table_name": model.__tablename__,
20-
"compress": model.__enable_compression__,
21-
"compress_orderby": getattr(model, "__compress_orderby__", None),
22-
"compress_segmentby": getattr(model, "__compress_segmentby__", None),
23-
"compress_chunk_time_interval": getattr(
24-
model, "__compress_chunk_time_interval__", None
25-
),
26-
}
27-
# Use the SQL statement from sql_statements.py
28-
query = sqlalchemy.text(sql.ALTER_COMPRESSION_POLICY_SQL).bindparams(
29-
**compression_params
39+
sql_template = sql.get_alter_compression_policy_sql(
40+
model.__tablename__, with_orderby=has_orderby, with_segmentby=has_segmentby
3041
)
42+
43+
query = sqlalchemy.text(sql_template).bindparams(**params)
3144
compiled_query = str(query.compile(compile_kwargs={"literal_binds": True}))
3245
session.execute(sqlalchemy.text(compiled_query))
33-
session.commit()
46+
if commit:
47+
session.commit()
Lines changed: 24 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,25 @@
1-
ALTER_COMPRESSION_POLICY_SQL = """
2-
ALTER TABLE :table_name SET (
3-
timescaledb.compress = :compress,
4-
timescaledb.compress_orderby = NULLIF(:compress_orderby, NULL),
5-
timescaledb.compress_segmentby = NULLIF(:compress_segmentby, NULL),
6-
timescaledb.compress_chunk_time_interval = NULLIF(:compress_chunk_time_interval, NULL)
1+
from sqlalchemy.sql import quoted_name
2+
3+
4+
def get_alter_compression_policy_sql(
5+
table_name: str, with_orderby: bool = True, with_segmentby: bool = True
6+
):
7+
safe_table_name = quoted_name(table_name, True)
8+
9+
clauses = []
10+
if with_orderby:
11+
clauses.append("timescaledb.compress_orderby = :compress_orderby")
12+
if with_segmentby:
13+
clauses.append("timescaledb.compress_segmentby = :compress_segmentby")
14+
15+
compress_clause = "timescaledb.compress"
16+
if len(clauses) > 0:
17+
compress_clause = "timescaledb.compress,"
18+
# Create the SQL with the safely quoted table name
19+
sql = f"""
20+
ALTER TABLE {safe_table_name} SET (
21+
{compress_clause}
22+
{", ".join(clauses)}
723
);
8-
"""
24+
"""
25+
return sql

src/timescaledb/compression/sync.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,8 @@ def sync_compression_policies(session: Session, *models: Type[SQLModel]) -> None
1919
if getattr(model, "__table__", None) is not None
2020
]
2121
for model in model_list:
22-
add_compression_policy(session, model)
2322
compress_enabled = model.__enable_compression__
2423
if not compress_enabled:
2524
continue
25+
add_compression_policy(session, model, commit=False)
26+
session.commit()
Lines changed: 137 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,137 @@
1+
from datetime import timedelta
2+
from typing import Type
3+
4+
import sqlalchemy
5+
from sqlmodel import SQLModel
6+
from sqlmodel.sql.sqltypes import AutoString
7+
8+
from timescaledb import exceptions
9+
10+
11+
def validate_compress_segmentby_field(
12+
model: Type[SQLModel], segmentby_field: str = None
13+
) -> bool:
14+
"""
15+
Verify if the specified field is a valid segmentby field.
16+
Valid types include String, Integer, Boolean, and other scalar types.
17+
Arrays and JSON types are not supported for segmentby.
18+
19+
Column list on which to key the compressed segments.
20+
An identifier representing the source of the data such as device_id or tags_id is usually a good candidate.
21+
The default is no segment by columns.
22+
"""
23+
if segmentby_field is None:
24+
return True
25+
column = model.__table__.columns.get(segmentby_field)
26+
if column is None:
27+
raise exceptions.InvalidSegmentByField(
28+
f"Field '{segmentby_field}' not found in model {model.__name__}"
29+
)
30+
31+
# Types that are valid for segmentby
32+
valid_types = (
33+
AutoString,
34+
sqlalchemy.String,
35+
sqlalchemy.Integer,
36+
sqlalchemy.SmallInteger,
37+
sqlalchemy.BigInteger,
38+
sqlalchemy.Boolean,
39+
sqlalchemy.Date,
40+
sqlalchemy.DateTime,
41+
sqlalchemy.Enum,
42+
sqlalchemy.Float,
43+
sqlalchemy.Numeric,
44+
)
45+
46+
column_type = type(column.type)
47+
if not issubclass(column_type, valid_types):
48+
raise exceptions.InvalidSegmentByField(
49+
f"Field '{segmentby_field}' in model {model.__name__} has invalid type {column_type.__name__}. "
50+
f"Must be one of: {', '.join(t.__name__ for t in valid_types)}"
51+
)
52+
53+
return True
54+
55+
56+
def validate_compress_orderby_field(
57+
model: Type[SQLModel], orderby_field: str = None
58+
) -> bool:
59+
"""
60+
Order used by compression, specified in the same way as the ORDER BY clause in a SELECT query.
61+
The default is the descending order of the hypertable's time column.
62+
63+
orderby_field: format is '<column_name> [ASC | DESC] [ NULLS { FIRST | LAST } ] [, ...]',
64+
"""
65+
if orderby_field is None:
66+
return True
67+
# Split on commas to handle multiple orderby fields
68+
for field_spec in orderby_field.split(","):
69+
field_spec = field_spec.strip()
70+
orderby_parts = field_spec.split()
71+
72+
if not orderby_parts:
73+
raise exceptions.InvalidOrderByField("Empty orderby field specification")
74+
75+
orderby_column_name = orderby_parts[0]
76+
column = model.__table__.columns.get(orderby_column_name)
77+
if column is None:
78+
raise exceptions.InvalidOrderByField(
79+
f"Field '{orderby_column_name}' not found in model {model.__name__}"
80+
)
81+
82+
# Types that are not valid for orderby
83+
invalid_types = (
84+
sqlalchemy.JSON,
85+
sqlalchemy.ARRAY,
86+
sqlalchemy.PickleType,
87+
)
88+
89+
column_type = type(column.type)
90+
if issubclass(column_type, invalid_types):
91+
raise exceptions.InvalidOrderByField(
92+
f"Field '{orderby_column_name}' in model {model.__name__} has invalid type {column_type.__name__}. "
93+
"JSON, ARRAY, and PickleType are not supported for orderby fields"
94+
)
95+
96+
# Validate direction if specified
97+
if len(orderby_parts) > 1:
98+
direction = orderby_parts[1].upper()
99+
if direction not in ("ASC", "DESC"):
100+
raise exceptions.InvalidOrderByField(
101+
f"Invalid direction '{direction}' in orderby field '{field_spec}'. "
102+
"Must be one of: ASC, DESC"
103+
)
104+
105+
# Validate NULLS FIRST/LAST if specified
106+
if len(orderby_parts) > 2:
107+
if len(orderby_parts) < 4:
108+
raise exceptions.InvalidOrderByField(
109+
f"Invalid NULLS specification in '{field_spec}'. "
110+
"Must be 'NULLS FIRST' or 'NULLS LAST'"
111+
)
112+
nulls_keyword = orderby_parts[2].upper()
113+
nulls_position = orderby_parts[3].upper()
114+
if nulls_keyword != "NULLS" or nulls_position not in ("FIRST", "LAST"):
115+
raise exceptions.InvalidOrderByField(
116+
f"Invalid NULLS specification in '{field_spec}'. "
117+
"Must be 'NULLS FIRST' or 'NULLS LAST'"
118+
)
119+
120+
return True
121+
122+
123+
def validate_unique_segmentby_and_orderby_fields(
124+
model: Type[SQLModel], segmentby_field: str = None, orderby_field: str = None
125+
) -> bool:
126+
"""
127+
Validate that the segmentby and orderby fields are unique.
128+
"""
129+
if segmentby_field is None or orderby_field is None:
130+
return True
131+
orderby_fields = orderby_field.split(" ")
132+
orderby_column = orderby_fields[0]
133+
if orderby_column == segmentby_field:
134+
raise exceptions.InvalidCompressionFields(
135+
"Segmentby and orderby fields must be different"
136+
)
137+
return True

src/timescaledb/defaults.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
TIME_COLUMN = "time"
2-
CHUNK_TIME_INTERVAL = "7 days"
2+
CHUNK_TIME_INTERVAL = "INTERVAL 7 days"
33
COMPRESS_SEGMENTBY = "identifier"
4-
COMPRESS_AFTER = "7 days"
5-
COMPRESS_ORDERBY = "created_at DESC"
6-
DROP_AFTER = "3 months"
4+
COMPRESS_AFTER = "INTERVAL 7 days"
5+
COMPRESS_ORDERBY = "time DESC"
6+
DROP_AFTER = "INTERVAL 3 months"

src/timescaledb/exceptions.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,9 @@ class InvalidTimeColumnType(Exception):
1414
pass
1515

1616

17-
class InvalidTimeInterval(Exception):
17+
class InvalidChunkTimeInterval(Exception):
1818
"""
19-
Exception raised when the time interval is invalid
19+
Exception raised when the chunk time interval is invalid
2020
"""
2121

2222
pass
@@ -44,3 +44,9 @@ class InvalidOrderByField(Exception):
4444
"""
4545

4646
pass
47+
48+
49+
class InvalidCompressionFields(Exception):
50+
"""
51+
Exception raised when the compression fields are invalid
52+
"""

src/timescaledb/hypertables/create.py

Lines changed: 36 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
from datetime import timedelta
12
from typing import Type
23

34
import sqlalchemy
@@ -6,6 +7,11 @@
67
from timescaledb.hypertables import sql_statements as sql
78
from timescaledb.hypertables import validators
89

10+
HYPERTABLE_INTERVAL_TYPE_SQL = {
11+
"INTERVAL": sql.CREATE_HYPERTABLE_SQL_VIA_INTERVAL,
12+
"TIMESTAMP": sql.CREATE_HYPERTABLE_SQL_VIA_TIMESTAMP,
13+
}
14+
915

1016
def create_hypertable(
1117
session: Session,
@@ -17,20 +23,42 @@ def create_hypertable(
1723
"""
1824
Create a hypertable from a SQLModel class
1925
"""
20-
time_field = model.__time_column__
21-
validators.validate_time_column(model, time_field)
22-
time_interval = model.__chunk_time_interval__
23-
validators.validate_time_interval(model, time_interval)
26+
time_column = getattr(model, "__time_column__", None)
27+
validators.validate_time_column(model, time_column)
28+
interval = getattr(model, "__chunk_time_interval__", None)
29+
validators.validate_chunk_time_interval(model, time_column, interval)
30+
sql_template = None
31+
cleaned_interval = None
32+
if isinstance(interval, timedelta):
33+
# Convert to microseconds
34+
cleaned_interval = int(interval.total_seconds())
35+
sql_template = HYPERTABLE_INTERVAL_TYPE_SQL["TIMESTAMP"]
36+
elif isinstance(interval, int):
37+
# Microseconds
38+
cleaned_interval = interval
39+
sql_template = HYPERTABLE_INTERVAL_TYPE_SQL["TIMESTAMP"]
40+
elif isinstance(interval, str):
41+
# Such as INTERVAL 1 day
42+
# or INTERVAL '2 weeks'
43+
# pop the term "INTERVAL"
44+
cleaned_interval = interval.replace("INTERVAL", "").strip()
45+
# remove any extra quotes
46+
cleaned_interval = cleaned_interval.replace("'", "").replace('"', "")
47+
sql_template = HYPERTABLE_INTERVAL_TYPE_SQL["INTERVAL"]
48+
else:
49+
raise ValueError("Invalid interval type")
50+
if sql_template is None or cleaned_interval is None:
51+
raise ValueError("Invalid interval type")
2452

25-
table_name = model.__tablename__
53+
table_name = getattr(model, "__tablename__", None)
2654
params = {
2755
"table_name": table_name,
28-
"time_field": time_field,
29-
"time_interval": time_interval,
56+
"time_column": time_column,
57+
"chunk_time_interval": cleaned_interval,
3058
"if_not_exists": "true" if if_not_exists else "false",
3159
"migrate_data": "true" if migrate_data else "false",
3260
}
33-
query = sqlalchemy.text(sql.CREATE_HYPERTABLE_SQL).bindparams(**params)
61+
query = sqlalchemy.text(sql_template).bindparams(**params)
3462
compiled_query = str(query.compile(compile_kwargs={"literal_binds": True}))
3563
session.execute(sqlalchemy.text(compiled_query))
3664
if commit:

src/timescaledb/hypertables/list.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,4 +17,4 @@ def list_hypertables(session: Session) -> List[HyperTableSchema]:
1717
rows = session.execute(
1818
sqlalchemy.text(sql.LIST_AVAILABLE_HYPERTABLES_SQL)
1919
).fetchall()
20-
return [HyperTableSchema(**dict(row._mapping)) for row in rows]
20+
return [HyperTableSchema(**dict(row._mapping)) for row in rows]
Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,23 @@
1-
CREATE_HYPERTABLE_SQL = """
1+
CREATE_HYPERTABLE_SQL_VIA_INTERVAL = """
22
SELECT create_hypertable(
33
:table_name,
4-
by_range(:time_field, INTERVAL :time_interval),
4+
by_range(:time_column, INTERVAL :chunk_time_interval),
55
if_not_exists => :if_not_exists,
66
migrate_data => :migrate_data
77
);
88
"""
99

10+
11+
CREATE_HYPERTABLE_SQL_VIA_TIMESTAMP = """
12+
SELECT create_hypertable(
13+
:table_name,
14+
by_range(:time_column, :chunk_time_interval),
15+
if_not_exists => :if_not_exists,
16+
migrate_data => :migrate_data
17+
);
18+
"""
19+
20+
1021
LIST_AVAILABLE_HYPERTABLES_SQL = """
1122
SELECT * FROM timescaledb_information.hypertables;
1223
"""

src/timescaledb/hypertables/sync.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,4 +25,11 @@ def sync_all_hypertables(session: Session, *models: Type[SQLModel]) -> None:
2525
if getattr(model, "__table__", None) is not None
2626
]
2727
for model in model_list:
28-
create_hypertable(session, model, if_not_exists=True, migrate_data=True)
28+
create_hypertable(
29+
session,
30+
model,
31+
if_not_exists=True,
32+
migrate_data=True,
33+
commit=False,
34+
)
35+
session.commit()

0 commit comments

Comments
 (0)