Add deduplication filters to selective freeze of partitions#301
Add deduplication filters to selective freeze of partitions#301MikhailBurdukov wants to merge 1 commit intomainfrom
Conversation
Reviewer's GuideImplements partition-level deduplication-aware freezing for backups by tracking per-part hash and partition_id, filtering which partitions actually need FREEZE, and reusing unchanged parts; also wires these dedup filters through metadata and adds integration coverage using ClickHouse query_log. Sequence diagram for selective partition freezing during backupsequenceDiagram
participant TB as TableBackup
participant Ctx as BackupContext
participant CHCTL as ClickhouseCTL
participant BL as BackupLayout
participant CH as ClickHouseServer
TB->>Ctx: _backup(...)
TB->>Ctx: use ch_ctl
TB->>CHCTL: _filter_partition_for_freeze(tables)
loop per_table
CHCTL->>CH: FILTER_DEDUP_PARTITION_SQL (get_filtered_dedup_partitions)
CH-->>CHCTL: unchanged_partition_ids
CHCTL->>CH: list_partitions(table)
CH-->>CHCTL: all_partition_ids
CHCTL-->>TB: FilteredPartitionsForFreeze
end
TB->>TB: submit _freeze_table(table, partitions_filters, ...)
loop per_table in ThreadExecPool
TB->>TB: _freeze_table(...)
TB->>TB: partitions_filters.unchanged_partitions
loop per_unchanged_partition
TB->>Ctx: deduplicated_parts_by_partition(database, table, partition_id)
Ctx->>CHCTL: get_deduplication_info_by_partition(...)
CHCTL->>CH: GET_DEDUPLICATED_PARTS_BY_PARTITION_SQL
CH-->>CHCTL: dedup_rows
CHCTL-->>Ctx: rows
Ctx->>BL: check_data_part(backup_path, PartMetadata)
BL-->>Ctx: valid_or_not
alt part_invalid
TB->>TB: partitions_filters.add_partition_to_freeze(partition_id)
else part_valid
TB->>TB: collect unchanged_parts
end
end
alt partitions_filters.need_freeze_all_partitions()
TB->>CHCTL: freeze_table(backup_name, table, ...)
CHCTL->>CH: ALTER TABLE FREEZE (full)
CH-->>CHCTL: ok
else partial_freeze
TB->>CHCTL: freeze_partitions(backup_name, table, partitions_to_freeze, threads)
CHCTL->>CH: ALTER TABLE FREEZE PARTITION ... (per partition)
CH-->>CHCTL: ok
end
TB-->>TB: return (table, unchanged_parts)
end
loop per_freeze_result
TB->>Ctx: _backup_freezed_table(..., unchanged_parts, ...)
TB->>Ctx: backup_meta.add_part(part) for part in unchanged_parts
end
Sequence diagram for populating deduplication filters into backup metadatasequenceDiagram
participant TB as TableBackup
participant Ctx as BackupContext
participant CHCTL as ClickhouseCTL
participant BM as BackupMetadata
participant TM as TableMetadata
participant CH as ClickHouseServer
TB->>Ctx: deduplicate_parts_in_batch(...)
TB->>TB: _populate_parts_meta_with_dedup_filters(database, table)
TB->>CHCTL: get_active_parts(database, table)
CHCTL->>CH: GET_ACTIVE_PARTS_FOR_TABLE
CH-->>CHCTL: rows[name, hash_of_all_files, partition_id]
CHCTL-->>TB: active_parts
loop for_each_part
TB->>BM: add_part_dedup_filter(database, table, name, hash_of_all_files, partition_id)
BM->>BM: get_table(database, table)
BM-->>TB: TableMetadata
TB->>TM: add_part_dedup_filter(name, partition_id, hash_of_all_files)
alt part_exists_and_not_populated
TM->>TM: set hash_of_all_files, partition_id
else already_has_filters
TM->>TM: skip_update
end
end
ER diagram for deduplication info with partition and hash filterserDiagram
system_parts {
String database
String table
String name
String hash_of_all_files
String partition_id
UInt8 active
}
deduplication_info {
String database
String table
String name
String backup_path
String checksum
UInt64 size
Array_String files
UInt8 tarball
String disk_name
UInt8 verified
UInt8 encrypted
String hash_of_all_files
String partition_id
}
system_parts ||--o{ deduplication_info : matches_by_database_table_name_and_hash
Class diagram for deduplication-aware partition freezing and metadataclassDiagram
class ClickhouseCTL {
+get_active_parts(database: str, table: str) List~dict~
+freeze_partitions(backup_name: str, table: Table, partitions: List~str~, freeze_partition_threads: int) None
+freeze_table(backup_name: str, table: Table, parallelize_freeze_in_ch: bool, freeze_partition_threads: int, freeze_table_query_max_threads: int) None
+get_filtered_dedup_partitions(table: Table) List~str~
+get_deduplication_info_by_partition(database: str, table: str, partition_id: str) List~Dict~
}
class FilteredPartitionsForFreeze {
-partitions_to_freeze: Set~str~
-partitions: Set~str~
+__init__(partitions_to_freeze: List~str~, partitions: List~str~)
+need_freeze_all_partitions() bool
+add_partition_to_freeze(partition: str) None
+unchanged_partitions List~str~
}
class TableBackup {
+_filter_partition_for_freeze(context: BackupContext, tables: List~Table~) List~Tuple~
+_backup(context: BackupContext, db: Database, tables_: List~Table~, backup_name: str, schema_only: bool, multiprocessing_config: dict) None
+_freeze_table(context: BackupContext, db: Database, table: Table, partitions_filters: FilteredPartitionsForFreeze, backup_name: str, schema_only: bool, parallelize_freeze_in_ch: bool, freeze_partition_threads: int, freeze_table_query_max_threads: int) Optional~Tuple~
+_backup_freezed_table(context: BackupContext, db: Database, table: Table, unchanged_parts: List~PartMetadata~, backup_name: str, schema_only: bool, change_times: Dict~str,TableMetadataChangeTime~) None
+_populate_parts_meta_with_dedup_filters(context: BackupContext, database: str, table: str) None
}
class PartDedupInfo {
+database: str
+table: str
+name: str
+backup_path: str
+checksum: str
+size: int
+files: List~str~
+tarball: bool
+disk_name: str
+verified: bool
+encrypted: bool
+hash_of_all_files: str
+partition_id: str
+__init__(database: str, table: str, name: str, backup_path: str, checksum: str, size: int, files: List~str~, tarball: bool, disk_name: str, verified: bool, encrypted: bool, hash_of_all_files: str, partition_id: str)
+to_sql() str
}
class RawMetadata {
+checksum: str
+size: int
+files: List~str~
+tarball: bool
+link: str
+disk_name: str
+encrypted: bool
+hash_of_all_files: str
+partition_id: str
+__init__(checksum: str, size: int, files: List~str~, tarball: bool, link: str, disk_name: str, encrypted: bool, hash_of_all_files: str, partition_id: str)
}
class PartMetadata {
+database: str
+table: str
+name: str
-raw_metadata: RawMetadata
+__init__(database: str, table: str, name: str, checksum: str, size: int, files: List~str~, tarball: bool, link: str, disk_name: str, encrypted: bool, hash_of_all_files: str, partition_id: str)
+checksum str
+size int
+files List~str~
+tarball bool
+hash_of_all_files str
+partition_id str
+load(db_name: str, table_name: str, part_name: str, raw_metadata: dict) PartMetadata
}
class BackupMetadata {
+add_part(part: PartMetadata) None
+add_part_dedup_filter(database: str, table: str, name: str, hash_of_all_files: str, partition_id: str) None
}
class TableMetadata {
+raw_metadata: dict
+add_part(part: PartMetadata) None
+add_part_dedup_filter(part_name: str, partition_id: str, hash_of_all_files: str) None
+load(database: str, name: str, raw_metadata: dict) TableMetadata
}
class BackupContext {
+ch_ctl: ClickhouseCTL
+backup_layout: BackupLayout
+backup_meta: BackupMetadata
}
class BackupLayout {
+check_data_part(path: str, part_metadata: PartMetadata) bool
}
ClickhouseCTL --> PartDedupInfo : inserts_into
TableBackup --> ClickhouseCTL : uses
TableBackup --> FilteredPartitionsForFreeze : creates
TableBackup --> PartMetadata : collects
TableBackup --> BackupContext : uses
PartMetadata --> RawMetadata : composes
BackupMetadata --> TableMetadata : manages
BackupContext --> BackupMetadata : has
BackupContext --> ClickhouseCTL : has
BackupContext --> BackupLayout : has
TableMetadata --> PartMetadata : stores
BackupLayout --> PartMetadata : validates
File-Level Changes
Tips and commandsInteracting with Sourcery
Customizing Your ExperienceAccess your dashboard to:
Getting Help
|
There was a problem hiding this comment.
Hey - I've found 4 issues, and left some high level feedback:
- In
FILTER_DEDUP_PARTITION_SQLyou interpolate{database}and{table}withoutescape(), unlike other queries; this can break for databases/tables with special characters and is inconsistent with the rest of the code, so consider escaping those placeholders as well. - The semantics of
get_filtered_dedup_partitionsandFilteredPartitionsForFreezeare confusing: the SQL anti-joins against_deduplication_info(which appears to produce partitions that need freezing), but the method/docstring say "partitions that remains the same" and_freeze_tabletreatsunchanged_partitionsas the complement; please double-check and either adjust the SQL or rename/flip the logic so the naming matches the actual behavior.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- In `FILTER_DEDUP_PARTITION_SQL` you interpolate `{database}` and `{table}` without `escape()`, unlike other queries; this can break for databases/tables with special characters and is inconsistent with the rest of the code, so consider escaping those placeholders as well.
- The semantics of `get_filtered_dedup_partitions` and `FilteredPartitionsForFreeze` are confusing: the SQL anti-joins against `_deduplication_info` (which appears to produce partitions that *need* freezing), but the method/docstring say "partitions that remains the same" and `_freeze_table` treats `unchanged_partitions` as the complement; please double-check and either adjust the SQL or rename/flip the logic so the naming matches the actual behavior.
## Individual Comments
### Comment 1
<location path="ch_backup/clickhouse/control.py" line_range="372-373" />
<code_context>
+ ),
+ dedup AS (
+ SELECT database, table, name, hash_of_all_files
+ FROM `{sustem_db}`._deduplication_info
+ WHERE database='{database}' AND table = '{table}'
+ )
+ SELECT partition_id
</code_context>
<issue_to_address>
**🚨 issue (security):** Database/table placeholders in FILTER_DEDUP_PARTITION_SQL are not escaped, which can break the query with unusual names and opens the door to injection if inputs are not fully trusted.
This query interpolates `{database}` and `{table}` directly instead of using `escape()`, unlike `GET_ACTIVE_PARTS_FOR_TABLE` and other queries. That makes it fragile for unusual names (backticks, quotes, other special chars) and increases SQL injection risk if these values ever become user-controlled. Please use `escape(database)` and `escape(table)` in the template and update the placeholders accordingly so it matches the surrounding pattern.
</issue_to_address>
### Comment 2
<location path="ch_backup/clickhouse/control.py" line_range="639-648" />
<code_context>
self._ch_client.query(query_sql)
+ def get_active_parts(self, database: str, table: str) -> List[dict]:
+ """
+ Get list of active parts.
+ """
+ query_sql = GET_ACTIVE_PARTS_FOR_TABLE.format(
+ db_name=escape(database),
+ table_name=escape(table),
+ )
+ # Is it not too much?
+ # Probably no, have tested with the max length of database and table name the size of each row is less than 1KB
+ # In ordinary case the number of dataparts is about 10k-20k, so it is 20MB
+ # In the worst case the number is 200k-300k(numbers based on my experience); it is 200MB
+ # Looks fine.
+ return self._ch_client.query(query_sql)["data"]
+
def kill_old_freeze_queries(self):
</code_context>
<issue_to_address>
**suggestion:** Return type annotation for get_active_parts does not match the JSONCompact result structure.
`GET_ACTIVE_PARTS_FOR_TABLE` uses `FORMAT JSONCompact`, which returns rows as `List[List[Any]]`, but the function is typed as returning `List[dict]`. The caller also treats each row as a positional sequence (`part_name, hash_of_all_files, parition_id = row`), which matches `List[List[Any]]`, not dicts. Please update the return type (e.g., to `List[List[Any]]` or a more specific tuple type) so it matches the actual structure and doesn’t mislead static checkers or future readers.
Suggested implementation:
```python
def get_active_parts(self, database: str, table: str) -> List[List[Any]]:
"""
Get list of active parts as rows (JSONCompact: List[List[Any]]).
"""
```
```python
# Is it not too much?
# Probably no, have tested with the max length of database and table name the size of each row is less than 1KB
# In ordinary case the number of dataparts is about 10k-20k, so it is 20MB
# In the worst case the number is 200k-300k(numbers based on my experience); it is 200MB
# Looks fine.
# JSONCompact returns data as List[List[Any]], where each inner list is a row.
return self._ch_client.query(query_sql)["data"]
```
Ensure `Any` is imported from `typing` at the top of `ch_backup/clickhouse/control.py`, e.g.:
- If you have `from typing import List`, change it to `from typing import Any, List`.
</issue_to_address>
### Comment 3
<location path="ch_backup/backup/deduplication.py" line_range="265-267" />
<code_context>
+ )
+ if not part["verified"]:
+ if not layout.check_data_part(part["backup_path"], part_metadata):
+ logging.debug(
+ 'Part "{}" found in "{}", but it\'s invalid, skipping',
+ part_metadata.name,
+ part["backup_path"],
+ )
</code_context>
<issue_to_address>
**issue (bug_risk):** logging.debug uses `{}` placeholders instead of `%s`, which will raise formatting errors at runtime.
Python’s `logging` API expects `%`-style placeholders, but these calls use `{}` with extra positional arguments:
```python
logging.debug(
'Part "{}" found in "{}", but it\'s invalid, skipping',
part_metadata.name,
part["backup_path"],
)
logging.debug("deduplicated parts {}", parts_metadata)
```
This will raise a `TypeError` at runtime. Please switch to `%s` placeholders (or build the string beforehand and pass a single argument), e.g.:
```python
logging.debug('Part "%s" found in "%s", but it\'s invalid, skipping', part_metadata.name, part["backup_path"])
logging.debug("deduplicated parts %s", parts_metadata)
```
</issue_to_address>
### Comment 4
<location path="ch_backup/backup/metadata/table_metadata.py" line_range="116-120" />
<code_context>
+ """
+ Populate data part with deduplication filters.
+ """
+ if part := self.raw_metadata["parts"].get(part_name):
+ if part.get("hash_of_all_files", None) != "":
+ return
+ part["hash_of_all_files"] = hash_of_all_files
+ part["partition_id"] = partition_id
+
@classmethod
</code_context>
<issue_to_address>
**issue (bug_risk):** add_part_dedup_filter condition prevents dedup filters from ever being set for new parts.
Because `part.get("hash_of_all_files", None) != ""` is true when the key is missing (returns `None`), the function returns before setting any fields, so new parts never get dedup filters. The condition should only return when a non-empty hash already exists, e.g.:
```python
if part.get("hash_of_all_files") not in (None, ""):
return
```
(or equivalently, flip the comparison in your current check).
</issue_to_address>Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
| FROM `{sustem_db}`._deduplication_info | ||
| WHERE database='{database}' AND table = '{table}' |
There was a problem hiding this comment.
🚨 issue (security): Database/table placeholders in FILTER_DEDUP_PARTITION_SQL are not escaped, which can break the query with unusual names and opens the door to injection if inputs are not fully trusted.
This query interpolates {database} and {table} directly instead of using escape(), unlike GET_ACTIVE_PARTS_FOR_TABLE and other queries. That makes it fragile for unusual names (backticks, quotes, other special chars) and increases SQL injection risk if these values ever become user-controlled. Please use escape(database) and escape(table) in the template and update the placeholders accordingly so it matches the surrounding pattern.
| def get_active_parts(self, database: str, table: str) -> List[dict]: | ||
| """ | ||
| Get list of active parts. | ||
| """ | ||
| query_sql = GET_ACTIVE_PARTS_FOR_TABLE.format( | ||
| db_name=escape(database), | ||
| table_name=escape(table), | ||
| ) | ||
| # Is it not too much? | ||
| # Probably no, have tested with the max length of database and table name the size of each row is less than 1KB |
There was a problem hiding this comment.
suggestion: Return type annotation for get_active_parts does not match the JSONCompact result structure.
GET_ACTIVE_PARTS_FOR_TABLE uses FORMAT JSONCompact, which returns rows as List[List[Any]], but the function is typed as returning List[dict]. The caller also treats each row as a positional sequence (part_name, hash_of_all_files, parition_id = row), which matches List[List[Any]], not dicts. Please update the return type (e.g., to List[List[Any]] or a more specific tuple type) so it matches the actual structure and doesn’t mislead static checkers or future readers.
Suggested implementation:
def get_active_parts(self, database: str, table: str) -> List[List[Any]]:
"""
Get list of active parts as rows (JSONCompact: List[List[Any]]).
""" # Is it not too much?
# Probably no, have tested with the max length of database and table name the size of each row is less than 1KB
# In ordinary case the number of dataparts is about 10k-20k, so it is 20MB
# In the worst case the number is 200k-300k(numbers based on my experience); it is 200MB
# Looks fine.
# JSONCompact returns data as List[List[Any]], where each inner list is a row.
return self._ch_client.query(query_sql)["data"]Ensure Any is imported from typing at the top of ch_backup/clickhouse/control.py, e.g.:
- If you have
from typing import List, change it tofrom typing import Any, List.
| logging.debug( | ||
| 'Part "{}" found in "{}", but it\'s invalid, skipping', | ||
| part_metadata.name, |
There was a problem hiding this comment.
issue (bug_risk): logging.debug uses {} placeholders instead of %s, which will raise formatting errors at runtime.
Python’s logging API expects %-style placeholders, but these calls use {} with extra positional arguments:
logging.debug(
'Part "{}" found in "{}", but it\'s invalid, skipping',
part_metadata.name,
part["backup_path"],
)
logging.debug("deduplicated parts {}", parts_metadata)This will raise a TypeError at runtime. Please switch to %s placeholders (or build the string beforehand and pass a single argument), e.g.:
logging.debug('Part "%s" found in "%s", but it\'s invalid, skipping', part_metadata.name, part["backup_path"])
logging.debug("deduplicated parts %s", parts_metadata)| if part := self.raw_metadata["parts"].get(part_name): | ||
| if part.get("hash_of_all_files", None) != "": | ||
| return | ||
| part["hash_of_all_files"] = hash_of_all_files | ||
| part["partition_id"] = partition_id |
There was a problem hiding this comment.
issue (bug_risk): add_part_dedup_filter condition prevents dedup filters from ever being set for new parts.
Because part.get("hash_of_all_files", None) != "" is true when the key is missing (returns None), the function returns before setting any fields, so new parts never get dedup filters. The condition should only return when a non-empty hash already exists, e.g.:
if part.get("hash_of_all_files") not in (None, ""):
return(or equivalently, flip the comparison in your current check).
Summary by Sourcery
Introduce partition-aware deduplication metadata and use it to selectively freeze only changed partitions during backups, while preserving unchanged parts via metadata.
New Features:
Enhancements:
Tests: