diff --git a/field_synthesis/field_synthesis.py b/field_synthesis/field_synthesis.py new file mode 100644 index 0000000..e305180 --- /dev/null +++ b/field_synthesis/field_synthesis.py @@ -0,0 +1,129 @@ +import numpy as np +import math +from scipy.stats.qmc import PoissonDisk +from scipy.spatial import cKDTree + + +class FieldSynthesis(): + + def __init__(self, area_size: float, count_points: int, num_source: int, dimension: int = 2, seed: int = 42): + self.area_size = area_size + self.count_points = count_points + self.num_source = num_source + self.dimension = dimension + self.seed = seed + self.rng = np.random.default_rng(seed) + + # Hodnoty, které se dopočítají/vygenerují + self.min_distance = self.calc_distance(self) + self.anchor_points = None + self.field_indices = None + + def calc_distance(self, free_space_ratio: float = 0.4) -> float: + """ + Odhad distance D pro zadaný počet bodů a stranu krychle. + """ + if self.count_points <= 0 or self.area_size <= 0: + return 0.0 + + total_vol = self.area_size ** self.dimension + occupied_ratio = 1.0 - free_space_ratio + vol_per_point = (total_vol * occupied_ratio) / self.count_points + + if self.dimension == 2: + return math.sqrt(vol_per_point) + else: + return vol_per_point ** (1/self.dimension) + + + + def generate_points(self): + """ + Funkce vygeneruje pole nahodnych bodu. + + :param count_points: pocet nahodnych bodu. + :param min_distance: minimalni vzdalenost mezi bodami. + :param area_size: velokost matici. + :return: vrati pole koordinat [[x1, y1], [x2, y2], ...] + """ + if self.count_points <= 0 or self.area_size <= 0: + return np.zeros((0, 2)) + + if self.min_distance < 0: + self.min_distance = 0 + + radius = self.min_distance/self.area_size + + if radius > 1: + radius = 0.99 + try: + engine = PoissonDisk(d=2, radius=radius, seed=42) + + #ten algorytm je nakladny na cas a resurs pocitace + # points = engine.fill_space() * area_size + + # if (len(points) > count_points): + # return points[:count_points] + + self.anchor_points = engine.random(self.count_points) * self.area_size + return self.anchor_points + except Exception: + self.anchor_points = np.zeros((0, 2)) + return self.anchor_points + + def assign_source_fields(self): + """ + Každému bodu přiřadíme náhodně jedno ze zdrojových N polí. + """ + if self.anchor_points is None or len(self.anchor_points) == 0: + self.field_indices = np.array([], dtype=int) + else: + self.field_indices = self.rng.integers(0, self.num_source, size=len(self.anchor_points)) + + return self.field_indices + + + def spatial_points(self, target_points, k_neighbors=5): + """ + Vektorizované vyhledání sousedů pomocí cKDTree. + """ + if self.anchor_points is None: + self.generate_points() + + tree = cKDTree(self.anchor_points) + R_LIMIT = 2 * self.min_distance + + # Vektorizovaný dotaz + distances, indices = tree.query(target_points, k=k_neighbors) + + # Vektorizovaná maska vzdálenosti + valid_neighbor_mask = distances <= R_LIMIT + + final_result_indices = [] + for i in range(len(target_points)): + # Výběr platných indexů pro každý cílový bod + current_valid = indices[i][valid_neighbor_mask[i]] + final_result_indices.append(current_valid) + + return final_result_indices + + + def mix_fields(self, target_points): + """ + Finální míchání polí (průměrování). + """ + + neighbor_indices_list = self.spatial_points(target_points) + + mixed_results = [] + for neighbors in neighbor_indices_list: + if len(neighbors) == 0: + mixed_results.append(np.nan) + elif len(neighbors) == 1: + mixed_results.append(self.field_indices[neighbors[0]]) + else: + # "Provedeme průměr, pokud jich zbyde více + values = self.field_indices[neighbors] + mixed_results.append(np.mean(values)) + + return np.array(mixed_results) \ No newline at end of file diff --git a/field_synthesis/functions/calc_distance.py b/field_synthesis/functions/calc_distance.py index a68ec57..02cbd00 100644 --- a/field_synthesis/functions/calc_distance.py +++ b/field_synthesis/functions/calc_distance.py @@ -1,19 +1,19 @@ import math -def calc_distance(count_points: int, area_size: float, free_space_ratio: float = 0.4) -> float: +def calc_distance(count_points: int, area_size: float, rank: int, free_space_ratio: float = 0.4) -> float: """ - Odhadne optimální minimální vzdálenost bodů ve 2D prostoru + Odhadne optimální minimální vzdálenost bodů ve prostoru s ohledem na procento požadovaného volného prostoru. """ if count_points <= 0 or area_size <= 0: return 0.0 - total_area = area_size ** 2 + total_area = area_size ** rank occupied_ratio = 1.0 - free_space_ratio occupied_area = total_area * occupied_ratio area_per_point = occupied_area / count_points - min_distance = math.sqrt(area_per_point) + min_distance = area_per_point ** (1/rank) return min_distance \ No newline at end of file diff --git a/field_synthesis/functions/spatial_points.py b/field_synthesis/functions/spatial_points.py new file mode 100644 index 0000000..9808ed2 --- /dev/null +++ b/field_synthesis/functions/spatial_points.py @@ -0,0 +1,26 @@ +import numpy as np +from scipy.spatial import cKDTree + +def spatial_points(points_k, points_m, min_distance): + tree = cKDTree(points_m) + result = [] + + __, idx = tree.query(points_k, k=5) + + for i in range(len(points_k)): + current_neighbor_indices = idx[i] + current_neighbor_points = points_m[current_neighbor_indices] + + valid_indices = [] + for __, p_idx in enumerate(current_neighbor_indices): + p_coord = points_m[p_idx] + + diffs = current_neighbor_points - p_coord + dist_to_others = np.linalg.norm(diffs, axis=1) + + if np.all(dist_to_others <= min_distance): + valid_indices.append(p_idx) + + result.append(valid_indices) + + return result \ No newline at end of file diff --git a/field_synthesis/tests/test_distance.py b/field_synthesis/tests/test_distance.py index 317ed49..40cf0a7 100644 --- a/field_synthesis/tests/test_distance.py +++ b/field_synthesis/tests/test_distance.py @@ -3,22 +3,24 @@ from field_synthesis.functions.calc_distance import calc_distance def test_estimate_distance_standard(): - """Testuje standardní výpočet se 40% volným prostorem.""" - # 50 bodů, plocha 100x100 (10000), 60% obsazeno = 6000. 6000/50 = 120. Odmocnina z 120 je cca 10.954 - result = calc_distance(50, 100.0, 0.4) + """Testuje standardní výpočet se 40% volným prostorem v 2D.""" + # count_points=50, area_size=100.0, prostor=2, free_space_ratio=0.4 + # Výpočet: (100^2 * 0.6) / 50 = 120 -> sqrt(120) = 10.95445 + result = calc_distance(50, 100.0, 2, 0.4) assert math.isclose(result, 10.95445, rel_tol=1e-4) -@pytest.mark.parametrize("points, area, free_space, expected", [ - (10, 50.0, 0.5, 11.1803), - (100, 10.0, 0.1, 0.9486) +@pytest.mark.parametrize("points, area, prostor, free_space, expected", [ + (10, 50.0, 2, 0.5, 11.1803), # (2500 * 0.5) / 10 = 125 -> sqrt(125) = 11.1803 + (100, 10.0, 2, 0.1, 0.9486) # (100 * 0.9) / 100 = 0.9 -> sqrt(0.9) = 0.9486 ]) -def test_estimate_distance_parametrized(points, area, free_space, expected): - """Testuje různé poměry a velikosti.""" - result = calc_distance(points, area, free_space) +def test_estimate_distance_parametrized(points, area, prostor, free_space, expected): + """Testuje různé poměry a velikosti v n-rozměrném prostoru.""" + result = calc_distance(points, area, prostor, free_space) assert math.isclose(result, expected, rel_tol=1e-4) def test_estimate_distance_invalid_inputs(): - """Testuje chování při neplatných vstupech (nula nebo záporná čísla).""" - assert calc_distance(0, 100.0, 0.4) == 0.0 - assert calc_distance(10, 0.0, 0.4) == 0.0 - assert calc_distance(-5, 100.0, 0.4) == 0.0 \ No newline at end of file + """Testuje chování při neplatných vstupech.""" + # Přidán parametr '2' pro prostor, aby sedělo pořadí argumentů + assert calc_distance(0, 100.0, 2, 0.4) == 0.0 + assert calc_distance(10, 0.0, 2, 0.4) == 0.0 + assert calc_distance(-5, 100.0, 2, 0.4) == 0.0 \ No newline at end of file diff --git a/field_synthesis/tests/test_spatial_points.py b/field_synthesis/tests/test_spatial_points.py new file mode 100644 index 0000000..16289d9 --- /dev/null +++ b/field_synthesis/tests/test_spatial_points.py @@ -0,0 +1,34 @@ +import numpy as np +from field_synthesis.functions.spatial_points import spatial_points + + +def test_spatial_points_filtering(): + # Vytvoříme body K (cíle) a M (zdroje) + points_k = np.array([[10, 10]]) # Jeden bod uprostřed + + # M body: vytvoříme shluk 5 bodů velmi blízko sebe + points_m = np.array([ + [10.1, 10.1], + [10.2, 10.2], + [10.1, 10.2], + [10.2, 10.1], + [10.15, 10.15], + [50.0, 50.0] # Jeden bod hodně daleko + ]) + + # Pokud nastavíme min_distance dostatečně velkou, + # mělo by nám to vrátit indexy těch 5 blízkých bodů. + # Vzdálenost mezi nimi je cca 0.14, takže limit 1.0 je v pohodě. + result = spatial_points(points_k, points_m, min_distance=1.0) + + assert len(result) == 1 + assert len(result[0]) == 5 # Našlo to všech 5 v clusteru + assert 5 not in result[0] # Bod na [50, 50] tam nesmí být + +def test_spatial_points_too_far(): + points_k = np.array([[0, 0]]) + points_m = np.array([[10, 10], [11, 11], [12, 12], [13, 13], [14, 14]]) + + # Pokud je limit 0.1, ale body jsou od sebe 1.4, nikdo neprojde filtrem + result = spatial_points(points_k, points_m, min_distance=0.1) + assert result == [[]] \ No newline at end of file