Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
129 changes: 129 additions & 0 deletions field_synthesis/field_synthesis.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
import numpy as np
import math
from scipy.stats.qmc import PoissonDisk
from scipy.spatial import cKDTree


class FieldSynthesis():

def __init__(self, area_size: float, count_points: int, num_source: int, dimension: int = 2, seed: int = 42):
self.area_size = area_size
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Dokumentovat atributy.

self.count_points = count_points
self.num_source = num_source
self.dimension = dimension
self.seed = seed
self.rng = np.random.default_rng(seed)

# Hodnoty, které se dopočítají/vygenerují
self.min_distance = self.calc_distance(self)
self.anchor_points = None
self.field_indices = None

def calc_distance(self, free_space_ratio: float = 0.4) -> float:
"""
Odhad distance D pro zadaný počet bodů a stranu krychle.
"""
if self.count_points <= 0 or self.area_size <= 0:
return 0.0

total_vol = self.area_size ** self.dimension
occupied_ratio = 1.0 - free_space_ratio
vol_per_point = (total_vol * occupied_ratio) / self.count_points

if self.dimension == 2:
return math.sqrt(vol_per_point)
else:
return vol_per_point ** (1/self.dimension)



def generate_points(self):
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Refactor as @cached_property:

@cached_property
def anchor_points(self):
...

"""
Funkce vygeneruje pole nahodnych bodu.

:param count_points: pocet nahodnych bodu.
:param min_distance: minimalni vzdalenost mezi bodami.
:param area_size: velokost matici.
:return: vrati pole koordinat [[x1, y1], [x2, y2], ...]
"""
if self.count_points <= 0 or self.area_size <= 0:
return np.zeros((0, 2))

if self.min_distance < 0:
self.min_distance = 0

radius = self.min_distance/self.area_size

if radius > 1:
radius = 0.99
try:
engine = PoissonDisk(d=2, radius=radius, seed=42)

#ten algorytm je nakladny na cas a resurs pocitace
# points = engine.fill_space() * area_size

# if (len(points) > count_points):
# return points[:count_points]

self.anchor_points = engine.random(self.count_points) * self.area_size
return self.anchor_points
except Exception:
self.anchor_points = np.zeros((0, 2))
return self.anchor_points

def assign_source_fields(self):
"""
Každému bodu přiřadíme náhodně jedno ze zdrojových N polí.
"""
if self.anchor_points is None or len(self.anchor_points) == 0:
self.field_indices = np.array([], dtype=int)
else:
self.field_indices = self.rng.integers(0, self.num_source, size=len(self.anchor_points))

return self.field_indices


def spatial_points(self, target_points, k_neighbors=5):
"""
Vektorizované vyhledání sousedů pomocí cKDTree.
"""
if self.anchor_points is None:
self.generate_points()

tree = cKDTree(self.anchor_points)
R_LIMIT = 2 * self.min_distance

# Vektorizovaný dotaz
distances, indices = tree.query(target_points, k=k_neighbors)

# Vektorizovaná maska vzdálenosti
valid_neighbor_mask = distances <= R_LIMIT

final_result_indices = []
for i in range(len(target_points)):
# Výběr platných indexů pro každý cílový bod
current_valid = indices[i][valid_neighbor_mask[i]]
final_result_indices.append(current_valid)

return final_result_indices


def mix_fields(self, target_points):
"""
Finální míchání polí (průměrování).
"""

neighbor_indices_list = self.spatial_points(target_points)

mixed_results = []
for neighbors in neighbor_indices_list:
if len(neighbors) == 0:
mixed_results.append(np.nan)
elif len(neighbors) == 1:
mixed_results.append(self.field_indices[neighbors[0]])
else:
# "Provedeme průměr, pokud jich zbyde více
values = self.field_indices[neighbors]
mixed_results.append(np.mean(values))

return np.array(mixed_results)
8 changes: 4 additions & 4 deletions field_synthesis/functions/calc_distance.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,19 @@
import math

def calc_distance(count_points: int, area_size: float, free_space_ratio: float = 0.4) -> float:
def calc_distance(count_points: int, area_size: float, rank: int, free_space_ratio: float = 0.4) -> float:
"""
Odhadne optimální minimální vzdálenost bodů ve 2D prostoru
Odhadne optimální minimální vzdálenost bodů ve prostoru
s ohledem na procento požadovaného volného prostoru.
"""
if count_points <= 0 or area_size <= 0:
return 0.0

total_area = area_size ** 2
total_area = area_size ** rank
occupied_ratio = 1.0 - free_space_ratio
occupied_area = total_area * occupied_ratio

area_per_point = occupied_area / count_points

min_distance = math.sqrt(area_per_point)
min_distance = area_per_point ** (1/rank)

return min_distance
26 changes: 26 additions & 0 deletions field_synthesis/functions/spatial_points.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
import numpy as np
from scipy.spatial import cKDTree

def spatial_points(points_k, points_m, min_distance):
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

points_k -> target_points
points_m -> mixing_points
... i jinde

tree = cKDTree(points_m)
result = []

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

max_neighbours = 5

__, idx = tree.query(points_k, k=5)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment shape : (len(points_k), max_neighbours)


for i in range(len(points_k)):
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Vectorize the loops:

point_k shape = ( K, dim); idx.shape = (5, K)

diff = point_k[None, :, :] - points_m[idx[:, :], :]
dist = np.linalg.norm(diff, axis = -1)

current_neighbor_indices = idx[i]
current_neighbor_points = points_m[current_neighbor_indices]

valid_indices = []
for __, p_idx in enumerate(current_neighbor_indices):
p_coord = points_m[p_idx]

diffs = current_neighbor_points - p_coord
dist_to_others = np.linalg.norm(diffs, axis=1)

if np.all(dist_to_others <= min_distance):
valid_indices.append(p_idx)

result.append(valid_indices)

return result
28 changes: 15 additions & 13 deletions field_synthesis/tests/test_distance.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,22 +3,24 @@
from field_synthesis.functions.calc_distance import calc_distance

def test_estimate_distance_standard():
"""Testuje standardní výpočet se 40% volným prostorem."""
# 50 bodů, plocha 100x100 (10000), 60% obsazeno = 6000. 6000/50 = 120. Odmocnina z 120 je cca 10.954
result = calc_distance(50, 100.0, 0.4)
"""Testuje standardní výpočet se 40% volným prostorem v 2D."""
# count_points=50, area_size=100.0, prostor=2, free_space_ratio=0.4
# Výpočet: (100^2 * 0.6) / 50 = 120 -> sqrt(120) = 10.95445
result = calc_distance(50, 100.0, 2, 0.4)
assert math.isclose(result, 10.95445, rel_tol=1e-4)

@pytest.mark.parametrize("points, area, free_space, expected", [
(10, 50.0, 0.5, 11.1803),
(100, 10.0, 0.1, 0.9486)
@pytest.mark.parametrize("points, area, prostor, free_space, expected", [
(10, 50.0, 2, 0.5, 11.1803), # (2500 * 0.5) / 10 = 125 -> sqrt(125) = 11.1803
(100, 10.0, 2, 0.1, 0.9486) # (100 * 0.9) / 100 = 0.9 -> sqrt(0.9) = 0.9486
])
def test_estimate_distance_parametrized(points, area, free_space, expected):
"""Testuje různé poměry a velikosti."""
result = calc_distance(points, area, free_space)
def test_estimate_distance_parametrized(points, area, prostor, free_space, expected):
"""Testuje různé poměry a velikosti v n-rozměrném prostoru."""
result = calc_distance(points, area, prostor, free_space)
assert math.isclose(result, expected, rel_tol=1e-4)

def test_estimate_distance_invalid_inputs():
"""Testuje chování při neplatných vstupech (nula nebo záporná čísla)."""
assert calc_distance(0, 100.0, 0.4) == 0.0
assert calc_distance(10, 0.0, 0.4) == 0.0
assert calc_distance(-5, 100.0, 0.4) == 0.0
"""Testuje chování při neplatných vstupech."""
# Přidán parametr '2' pro prostor, aby sedělo pořadí argumentů
assert calc_distance(0, 100.0, 2, 0.4) == 0.0
assert calc_distance(10, 0.0, 2, 0.4) == 0.0
assert calc_distance(-5, 100.0, 2, 0.4) == 0.0
34 changes: 34 additions & 0 deletions field_synthesis/tests/test_spatial_points.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
import numpy as np
from field_synthesis.functions.spatial_points import spatial_points


Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Zkusit změřit před a po vektorizaci.

def test_spatial_points_filtering():
# Vytvoříme body K (cíle) a M (zdroje)
points_k = np.array([[10, 10]]) # Jeden bod uprostřed

# M body: vytvoříme shluk 5 bodů velmi blízko sebe
points_m = np.array([
[10.1, 10.1],
[10.2, 10.2],
[10.1, 10.2],
[10.2, 10.1],
[10.15, 10.15],
[50.0, 50.0] # Jeden bod hodně daleko
])

# Pokud nastavíme min_distance dostatečně velkou,
# mělo by nám to vrátit indexy těch 5 blízkých bodů.
# Vzdálenost mezi nimi je cca 0.14, takže limit 1.0 je v pohodě.
result = spatial_points(points_k, points_m, min_distance=1.0)

assert len(result) == 1
assert len(result[0]) == 5 # Našlo to všech 5 v clusteru
assert 5 not in result[0] # Bod na [50, 50] tam nesmí být

def test_spatial_points_too_far():
points_k = np.array([[0, 0]])
points_m = np.array([[10, 10], [11, 11], [12, 12], [13, 13], [14, 14]])

# Pokud je limit 0.1, ale body jsou od sebe 1.4, nikdo neprojde filtrem
result = spatial_points(points_k, points_m, min_distance=0.1)
assert result == [[]]