Skip to content
Merged
105 changes: 71 additions & 34 deletions ch_backup/backup/deduplication.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from collections import defaultdict
from copy import copy
from datetime import timedelta
from typing import Dict, List, Sequence, Set
from typing import Dict, List, Optional, Sequence, Set

from ch_backup import logging
from ch_backup.backup.layout import BackupLayout
Expand Down Expand Up @@ -33,6 +33,8 @@ class PartDedupInfo(Slotted):
"disk_name",
"verified",
"encrypted",
"hash_of_all_files",
"partition_id",
)

# pylint: disable=too-many-arguments,too-many-positional-arguments
Expand All @@ -49,6 +51,8 @@ def __init__(
disk_name: str,
verified: bool,
encrypted: bool,
hash_of_all_files: str,
partition_id: str,
) -> None:
self.database = database
self.table = table
Expand All @@ -61,13 +65,15 @@ def __init__(
self.disk_name = disk_name
self.verified = verified
self.encrypted = encrypted
self.hash_of_all_files = hash_of_all_files
self.partition_id = partition_id

def to_sql(self):
"""
Convert to string to use it in insert query
"""
files_array = "[" + ",".join(f"'{file}'" for file in self.files) + "]"
return f"('{self.database}','{self.table}','{self.name}','{self.backup_path}','{self.checksum}',{self.size},{files_array},{int(self.tarball)},'{self.disk_name}',{int(self.verified)}, {int(self.encrypted)})"
return f"('{self.database}','{self.table}','{self.name}','{self.backup_path}','{self.checksum}',{self.size},{files_array},{int(self.tarball)},'{self.disk_name}',{int(self.verified)}, {int(self.encrypted)}, '{self.hash_of_all_files}', '{self.partition_id}')"


TableDedupReferences = Set[str]
Expand Down Expand Up @@ -209,6 +215,8 @@ def _populate_dedup_info(
disk_name=part.disk_name,
verified=verified,
encrypted=part.encrypted,
hash_of_all_files=part.hash_of_all_files,
partition_id=part.partition_id,
)

table_dedup_info.add(part.name)
Expand All @@ -225,6 +233,64 @@ def _populate_dedup_info(
break


def _deduplication_process_raw_parts_info(
layout: BackupLayout,
database: str,
table: str,
parts_raw: List[Dict],
ignore_invalid: bool,
) -> Optional[Dict[str, PartMetadata]]:
parts_metadata: Dict[str, PartMetadata] = {}
for part in parts_raw:
part_metadata = PartMetadata(
database=database,
table=table,
name=part["name"],
checksum=part["checksum"],
size=int(part["size"]),
link=part["backup_path"],
files=part["files"],
tarball=part["tarball"],
disk_name=part["disk_name"],
encrypted=part.get("encrypted", True),
hash_of_all_files=part.get("hash_of_all_files", ""),
partition_id=part.get("partition_id", ""),
)
if not part["verified"]:
if not layout.check_data_part(part["backup_path"], part_metadata):
logging.debug(
'Part "{}" found in "{}", but it\'s invalid, skipping',
part_metadata.name,
Comment on lines +261 to +263
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (bug_risk): logging.debug uses {} placeholders instead of %s, which will raise formatting errors at runtime.

Python’s logging API expects %-style placeholders, but these calls use {} with extra positional arguments:

logging.debug(
    'Part "{}" found in "{}", but it\'s invalid, skipping',
    part_metadata.name,
    part["backup_path"],
)
logging.debug("deduplicated parts {}", parts_metadata)

This will raise a TypeError at runtime. Please switch to %s placeholders (or build the string beforehand and pass a single argument), e.g.:

logging.debug('Part "%s" found in "%s", but it\'s invalid, skipping', part_metadata.name, part["backup_path"])
logging.debug("deduplicated parts %s", parts_metadata)

part["backup_path"],
)
if not ignore_invalid:
return None
continue

parts_metadata[part_metadata.name] = part_metadata
logging.debug(
'Part "{}" found in "{}", reusing', part_metadata.name, part["backup_path"]
)

return parts_metadata


def collect_verified_parts_by_partition(
context: BackupContext, database: str, table: str, partition_id: str
) -> Optional[List[PartMetadata]]:
"""
Deduplicate parts by partition.
"""
parts = context.ch_ctl.get_deduplication_info_by_partition(
database, table, partition_id
)
layout = context.backup_layout
result = _deduplication_process_raw_parts_info(
layout, database, table, parts, False
)
return list(result.values()) if result else None


def deduplicate_parts(
context: BackupContext,
database: str,
Expand All @@ -239,38 +305,9 @@ def deduplicate_parts(
existing_parts = context.ch_ctl.get_deduplication_info(
database, table, frozen_parts
)
deduplicated_parts: Dict[str, PartMetadata] = {}

for existing_part in existing_parts:
part = PartMetadata(
database=database,
table=table,
name=existing_part["name"],
checksum=existing_part["checksum"],
size=int(existing_part["size"]),
link=existing_part["backup_path"],
files=existing_part["files"],
tarball=existing_part["tarball"],
disk_name=existing_part["disk_name"],
encrypted=existing_part.get("encrypted", True),
)

if not existing_part["verified"]:
if not layout.check_data_part(existing_part["backup_path"], part):
logging.debug(
'Part "{}" found in "{}", but it\'s invalid, skipping',
part.name,
existing_part["backup_path"],
)
continue

deduplicated_parts[part.name] = part

logging.debug(
'Part "{}" found in "{}", reusing', part.name, existing_part["backup_path"]
)

return deduplicated_parts
return _deduplication_process_raw_parts_info( # type: ignore
layout, database, table, existing_parts, True
)


def collect_dedup_references_for_batch_backup_deletion(
Expand Down
15 changes: 15 additions & 0 deletions ch_backup/backup/metadata/backup_metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,21 @@ def add_part(self, part: PartMetadata) -> None:
if not part.link:
self.real_size += part.size

def add_part_dedup_filter(
self,
database: str,
table: str,
name: str,
hash_of_all_files: str,
partition_id: str,
) -> None:
"""
Populate part with deuduplication filters
"""
self.get_table(database, table).add_part_dedup_filter(
name, partition_id, hash_of_all_files
)

def remove_parts(self, table: TableMetadata, parts: List[PartMetadata]) -> None:
"""
Remove data parts from backup metadata.
Expand Down
44 changes: 42 additions & 2 deletions ch_backup/backup/metadata/part_metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,17 @@ class RawMetadata(Slotted):
Raw metadata for ClickHouse data part.
"""

__slots__ = "checksum", "size", "files", "tarball", "link", "disk_name", "encrypted"
__slots__ = (
"checksum",
"size",
"files",
"tarball",
"link",
"disk_name",
"encrypted",
"hash_of_all_files",
"partition_id",
)

# pylint: disable=too-many-positional-arguments
def __init__(
Expand All @@ -25,6 +35,8 @@ def __init__(
link: str = None,
disk_name: str = None,
encrypted: bool = True,
hash_of_all_files: str = "",
partition_id: str = "",
) -> None:
self.checksum = checksum
self.size = size
Expand All @@ -33,6 +45,8 @@ def __init__(
self.link = link
self.disk_name = disk_name
self.encrypted = encrypted
self.hash_of_all_files = hash_of_all_files
self.partition_id = partition_id


class PartMetadata(Slotted):
Expand All @@ -55,12 +69,22 @@ def __init__(
link: str = None,
disk_name: str = None,
encrypted: bool = True,
hash_of_all_files: str = "",
partition_id: str = "",
) -> None:
self.database: str = database
self.table: str = table
self.name: str = name
self.raw_metadata: RawMetadata = RawMetadata(
checksum, size, files, tarball, link, disk_name, encrypted
checksum,
size,
files,
tarball,
link,
disk_name,
encrypted,
hash_of_all_files,
partition_id,
)

@property
Expand Down Expand Up @@ -112,6 +136,20 @@ def tarball(self) -> bool:
"""
return self.raw_metadata.tarball

@property
def hash_of_all_files(self) -> str:
"""
Returns hash_of_all_files of the part.
"""
return self.raw_metadata.hash_of_all_files

@property
def partition_id(self) -> str:
"""
Returns partition_id of the part.
"""
return self.raw_metadata.partition_id

@classmethod
def load(
cls, db_name: str, table_name: str, part_name: str, raw_metadata: dict
Expand All @@ -130,6 +168,8 @@ def load(
link=raw_metadata["link"],
disk_name=raw_metadata.get("disk_name", "default"),
encrypted=raw_metadata.get("encrypted", True),
hash_of_all_files=raw_metadata.get("hash_of_all_files", ""),
partition_id=raw_metadata.get("partition_id", ""),
)

@classmethod
Expand Down
14 changes: 14 additions & 0 deletions ch_backup/backup/metadata/table_metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,22 @@ def add_part(self, part: PartMetadata) -> None:
"tarball": part.tarball,
"disk_name": part.disk_name,
"encrypted": part.encrypted,
"hash_of_all_files": part.hash_of_all_files,
"partition_id": part.partition_id,
}

def add_part_dedup_filter(
self, part_name: str, partition_id: str, hash_of_all_files: str
) -> None:
"""
Populate data part with deduplication filters.
"""
if part := self.raw_metadata["parts"].get(part_name):
if part.get("hash_of_all_files", "") != "":
return
part["hash_of_all_files"] = hash_of_all_files
part["partition_id"] = partition_id
Comment thread
sourcery-ai[bot] marked this conversation as resolved.

@classmethod
def load(cls, database: str, name: str, raw_metadata: dict) -> "TableMetadata":
"""
Expand Down
Loading
Loading