-
Notifications
You must be signed in to change notification settings - Fork 0
udelal upravu calc_distance, a napsal funkce spatial_points a pytest … #3
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
39f8adb
5018d63
228583e
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,129 @@ | ||
| import numpy as np | ||
| import math | ||
| from scipy.stats.qmc import PoissonDisk | ||
| from scipy.spatial import cKDTree | ||
|
|
||
|
|
||
| class FieldSynthesis(): | ||
|
|
||
| def __init__(self, area_size: float, count_points: int, num_source: int, dimension: int = 2, seed: int = 42): | ||
| self.area_size = area_size | ||
| self.count_points = count_points | ||
| self.num_source = num_source | ||
| self.dimension = dimension | ||
| self.seed = seed | ||
| self.rng = np.random.default_rng(seed) | ||
|
|
||
| # Hodnoty, které se dopočítají/vygenerují | ||
| self.min_distance = self.calc_distance(self) | ||
| self.anchor_points = None | ||
| self.field_indices = None | ||
|
|
||
| def calc_distance(self, free_space_ratio: float = 0.4) -> float: | ||
| """ | ||
| Odhad distance D pro zadaný počet bodů a stranu krychle. | ||
| """ | ||
| if self.count_points <= 0 or self.area_size <= 0: | ||
| return 0.0 | ||
|
|
||
| total_vol = self.area_size ** self.dimension | ||
| occupied_ratio = 1.0 - free_space_ratio | ||
| vol_per_point = (total_vol * occupied_ratio) / self.count_points | ||
|
|
||
| if self.dimension == 2: | ||
| return math.sqrt(vol_per_point) | ||
| else: | ||
| return vol_per_point ** (1/self.dimension) | ||
|
|
||
|
|
||
|
|
||
| def generate_points(self): | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Refactor as @cached_property |
||
| """ | ||
| Funkce vygeneruje pole nahodnych bodu. | ||
|
|
||
| :param count_points: pocet nahodnych bodu. | ||
| :param min_distance: minimalni vzdalenost mezi bodami. | ||
| :param area_size: velokost matici. | ||
| :return: vrati pole koordinat [[x1, y1], [x2, y2], ...] | ||
| """ | ||
| if self.count_points <= 0 or self.area_size <= 0: | ||
| return np.zeros((0, 2)) | ||
|
|
||
| if self.min_distance < 0: | ||
| self.min_distance = 0 | ||
|
|
||
| radius = self.min_distance/self.area_size | ||
|
|
||
| if radius > 1: | ||
| radius = 0.99 | ||
| try: | ||
| engine = PoissonDisk(d=2, radius=radius, seed=42) | ||
|
|
||
| #ten algorytm je nakladny na cas a resurs pocitace | ||
| # points = engine.fill_space() * area_size | ||
|
|
||
| # if (len(points) > count_points): | ||
| # return points[:count_points] | ||
|
|
||
| self.anchor_points = engine.random(self.count_points) * self.area_size | ||
| return self.anchor_points | ||
| except Exception: | ||
| self.anchor_points = np.zeros((0, 2)) | ||
| return self.anchor_points | ||
|
|
||
| def assign_source_fields(self): | ||
| """ | ||
| Každému bodu přiřadíme náhodně jedno ze zdrojových N polí. | ||
| """ | ||
| if self.anchor_points is None or len(self.anchor_points) == 0: | ||
| self.field_indices = np.array([], dtype=int) | ||
| else: | ||
| self.field_indices = self.rng.integers(0, self.num_source, size=len(self.anchor_points)) | ||
|
|
||
| return self.field_indices | ||
|
|
||
|
|
||
| def spatial_points(self, target_points, k_neighbors=5): | ||
| """ | ||
| Vektorizované vyhledání sousedů pomocí cKDTree. | ||
| """ | ||
| if self.anchor_points is None: | ||
| self.generate_points() | ||
|
|
||
| tree = cKDTree(self.anchor_points) | ||
| R_LIMIT = 2 * self.min_distance | ||
|
|
||
| # Vektorizovaný dotaz | ||
| distances, indices = tree.query(target_points, k=k_neighbors) | ||
|
|
||
| # Vektorizovaná maska vzdálenosti | ||
| valid_neighbor_mask = distances <= R_LIMIT | ||
|
|
||
| final_result_indices = [] | ||
| for i in range(len(target_points)): | ||
| # Výběr platných indexů pro každý cílový bod | ||
| current_valid = indices[i][valid_neighbor_mask[i]] | ||
| final_result_indices.append(current_valid) | ||
|
|
||
| return final_result_indices | ||
|
|
||
|
|
||
| def mix_fields(self, target_points): | ||
| """ | ||
| Finální míchání polí (průměrování). | ||
| """ | ||
|
|
||
| neighbor_indices_list = self.spatial_points(target_points) | ||
|
|
||
| mixed_results = [] | ||
| for neighbors in neighbor_indices_list: | ||
| if len(neighbors) == 0: | ||
| mixed_results.append(np.nan) | ||
| elif len(neighbors) == 1: | ||
| mixed_results.append(self.field_indices[neighbors[0]]) | ||
| else: | ||
| # "Provedeme průměr, pokud jich zbyde více | ||
| values = self.field_indices[neighbors] | ||
| mixed_results.append(np.mean(values)) | ||
|
|
||
| return np.array(mixed_results) | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,19 +1,19 @@ | ||
| import math | ||
|
|
||
| def calc_distance(count_points: int, area_size: float, free_space_ratio: float = 0.4) -> float: | ||
| def calc_distance(count_points: int, area_size: float, rank: int, free_space_ratio: float = 0.4) -> float: | ||
| """ | ||
| Odhadne optimální minimální vzdálenost bodů ve 2D prostoru | ||
| Odhadne optimální minimální vzdálenost bodů ve prostoru | ||
| s ohledem na procento požadovaného volného prostoru. | ||
| """ | ||
| if count_points <= 0 or area_size <= 0: | ||
| return 0.0 | ||
|
|
||
| total_area = area_size ** 2 | ||
| total_area = area_size ** rank | ||
| occupied_ratio = 1.0 - free_space_ratio | ||
| occupied_area = total_area * occupied_ratio | ||
|
|
||
| area_per_point = occupied_area / count_points | ||
|
|
||
| min_distance = math.sqrt(area_per_point) | ||
| min_distance = area_per_point ** (1/rank) | ||
|
|
||
| return min_distance |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,26 @@ | ||
| import numpy as np | ||
| from scipy.spatial import cKDTree | ||
|
|
||
| def spatial_points(points_k, points_m, min_distance): | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. points_k -> target_points |
||
| tree = cKDTree(points_m) | ||
| result = [] | ||
|
|
||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. max_neighbours = 5 |
||
| __, idx = tree.query(points_k, k=5) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Comment shape : (len(points_k), max_neighbours) |
||
|
|
||
| for i in range(len(points_k)): | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Vectorize the loops: point_k shape = ( K, dim); idx.shape = (5, K)diff = point_k[None, :, :] - points_m[idx[:, :], :] |
||
| current_neighbor_indices = idx[i] | ||
| current_neighbor_points = points_m[current_neighbor_indices] | ||
|
|
||
| valid_indices = [] | ||
| for __, p_idx in enumerate(current_neighbor_indices): | ||
| p_coord = points_m[p_idx] | ||
|
|
||
| diffs = current_neighbor_points - p_coord | ||
| dist_to_others = np.linalg.norm(diffs, axis=1) | ||
|
|
||
| if np.all(dist_to_others <= min_distance): | ||
| valid_indices.append(p_idx) | ||
|
|
||
| result.append(valid_indices) | ||
|
|
||
| return result | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,34 @@ | ||
| import numpy as np | ||
| from field_synthesis.functions.spatial_points import spatial_points | ||
|
|
||
|
|
||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Zkusit změřit před a po vektorizaci. |
||
| def test_spatial_points_filtering(): | ||
| # Vytvoříme body K (cíle) a M (zdroje) | ||
| points_k = np.array([[10, 10]]) # Jeden bod uprostřed | ||
|
|
||
| # M body: vytvoříme shluk 5 bodů velmi blízko sebe | ||
| points_m = np.array([ | ||
| [10.1, 10.1], | ||
| [10.2, 10.2], | ||
| [10.1, 10.2], | ||
| [10.2, 10.1], | ||
| [10.15, 10.15], | ||
| [50.0, 50.0] # Jeden bod hodně daleko | ||
| ]) | ||
|
|
||
| # Pokud nastavíme min_distance dostatečně velkou, | ||
| # mělo by nám to vrátit indexy těch 5 blízkých bodů. | ||
| # Vzdálenost mezi nimi je cca 0.14, takže limit 1.0 je v pohodě. | ||
| result = spatial_points(points_k, points_m, min_distance=1.0) | ||
|
|
||
| assert len(result) == 1 | ||
| assert len(result[0]) == 5 # Našlo to všech 5 v clusteru | ||
| assert 5 not in result[0] # Bod na [50, 50] tam nesmí být | ||
|
|
||
| def test_spatial_points_too_far(): | ||
| points_k = np.array([[0, 0]]) | ||
| points_m = np.array([[10, 10], [11, 11], [12, 12], [13, 13], [14, 14]]) | ||
|
|
||
| # Pokud je limit 0.1, ale body jsou od sebe 1.4, nikdo neprojde filtrem | ||
| result = spatial_points(points_k, points_m, min_distance=0.1) | ||
| assert result == [[]] | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Dokumentovat atributy.