Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/cli_train.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ def tune_hyperparameters(input_dir, output_dir, num_epochs=10):
best_hps = tuner.get_best_hyperparameters(num_trials=1)[0]

print("Best Hyperparameters:", best_hps.values)
best_model.save(os.path.join(output_dir, 'best_model'))
best_model.save_weights(os.path.join(output_dir, 'best_model.weights.h5'))

def main():
parser = argparse.ArgumentParser(description="Train a point cloud compression model with hyperparameter tuning.")
Expand All @@ -94,7 +94,7 @@ def main():
model.compile(optimizer='adam', loss='mean_squared_error')
dataset = load_and_preprocess_data(args.input_dir, args.batch_size)
model.fit(dataset, epochs=args.num_epochs)
model.save(os.path.join(args.output_dir, 'trained_model'))
model.save_weights(os.path.join(args.output_dir, 'trained_model.weights.h5'))

if __name__ == "__main__":
main()
44 changes: 36 additions & 8 deletions src/compress_octree.py
Original file line number Diff line number Diff line change
Expand Up @@ -181,19 +181,47 @@ def _save_debug_info(self, stage: str, data: Dict[str, Any]) -> None:
os.makedirs(debug_dir, exist_ok=True)

for name, array in data.items():
if isinstance(array, (np.ndarray, dict)):
if isinstance(array, np.ndarray):
np.save(os.path.join(debug_dir, f"{name}.npy"), array)

def save_compressed(self, grid: np.ndarray, metadata: Dict[str, Any], filename: str) -> None:
"""Save compressed data with metadata."""
os.makedirs(os.path.dirname(os.path.abspath(filename)), exist_ok=True)
np.savez_compressed(filename, grid=grid, metadata=metadata)
import json
import math

if self.debug_output:
debug_path = f"{filename}.debug.npz"
np.savez_compressed(debug_path, **metadata)
os.makedirs(os.path.dirname(os.path.abspath(filename)), exist_ok=True)
# Save grid without pickle (bool array, no object dtype)
np.savez_compressed(filename, grid=grid)
# Save metadata as JSON sidecar (safe, no arbitrary code execution)
meta_path = filename + '.meta.json'
serializable = {}
for k, v in metadata.items():
if isinstance(v, np.ndarray):
serializable[k] = v.tolist()
elif isinstance(v, (np.floating, np.integer)):
val = v.item()
if isinstance(val, float) and (math.isnan(val) or math.isinf(val)):
serializable[k] = None
else:
serializable[k] = val
elif isinstance(v, float) and (math.isnan(v) or math.isinf(v)):
serializable[k] = None
else:
serializable[k] = v
with open(meta_path, 'w') as f:
json.dump(serializable, f)

def load_compressed(self, filename: str) -> Tuple[np.ndarray, Dict[str, Any]]:
"""Load compressed data with metadata."""
data = np.load(filename, allow_pickle=True)
return data['grid'], data['metadata'].item()
import json

data = np.load(filename, allow_pickle=False)
grid = data['grid']
meta_path = filename + '.meta.json'
with open(meta_path, 'r') as f:
metadata = json.load(f)
# Convert lists back to numpy arrays for known array fields
for key in ('min_bounds', 'max_bounds', 'ranges', 'normal_grid'):
if key in metadata:
metadata[key] = np.array(metadata[key])
return grid, metadata
5 changes: 4 additions & 1 deletion src/evaluation_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,10 @@ def _load_model(self) -> DeepCompressModel:
# Load weights if checkpoint provided
checkpoint_path = self.config.get('checkpoint_path')
if checkpoint_path:
model.load_weights(checkpoint_path)
resolved = Path(checkpoint_path).resolve()
if not resolved.exists():
raise FileNotFoundError(f"Checkpoint not found: {resolved}")
model.load_weights(str(resolved))

return model

Expand Down
27 changes: 15 additions & 12 deletions src/training_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,26 +155,29 @@ def save_checkpoint(self, name: str):

for opt_name, optimizer in self.optimizers.items():
if optimizer.variables:
opt_weights = [v.numpy() for v in optimizer.variables]
np.save(
str(checkpoint_path / f'{opt_name}_optimizer.npy'),
np.array(opt_weights, dtype=object),
allow_pickle=True,
)
opt_dir = checkpoint_path / f'{opt_name}_optimizer'
opt_dir.mkdir(parents=True, exist_ok=True)
for i, v in enumerate(optimizer.variables):
np.save(str(opt_dir / f'{i}.npy'), v.numpy())

self.logger.info(f"Saved checkpoint: {name}")

def load_checkpoint(self, name: str):
checkpoint_path = self.checkpoint_dir / name
checkpoint_path = (self.checkpoint_dir / name).resolve()
try:
checkpoint_path.relative_to(self.checkpoint_dir.resolve())
except ValueError:
raise ValueError(f"Checkpoint path escapes checkpoint directory: {name}")
self.model.load_weights(str(checkpoint_path / 'model.weights.h5'))
self.entropy_model.load_weights(str(checkpoint_path / 'entropy.weights.h5'))

for opt_name, optimizer in self.optimizers.items():
opt_path = checkpoint_path / f'{opt_name}_optimizer.npy'
if opt_path.exists() and optimizer.variables:
opt_weights = np.load(str(opt_path), allow_pickle=True)
for var, w in zip(optimizer.variables, opt_weights):
var.assign(w)
opt_dir = checkpoint_path / f'{opt_name}_optimizer'
if opt_dir.exists() and optimizer.variables:
for i, var in enumerate(optimizer.variables):
path = opt_dir / f'{i}.npy'
if path.exists():
var.assign(np.load(str(path), allow_pickle=False))

self.logger.info(f"Loaded checkpoint: {name}")

Expand Down
221 changes: 220 additions & 1 deletion tests/test_compress_octree.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ def test_octree_partitioning(self):
def test_save_and_load(self):
"""Test saving and loading functionality."""
save_path = Path(self.test_env['tmp_path']) / "test_compressed.npz"
meta_path = Path(str(save_path) + '.meta.json')

# Compress and save
grid, metadata = self.compressor.compress(
Expand All @@ -124,8 +125,9 @@ def test_save_and_load(self):
)
self.compressor.save_compressed(grid, metadata, str(save_path))

# Verify file exists
# Verify both files exist
self.assertTrue(save_path.exists())
self.assertTrue(meta_path.exists())

# Load and verify
loaded_grid, loaded_metadata = self.compressor.load_compressed(str(save_path))
Expand All @@ -137,6 +139,10 @@ def test_save_and_load(self):
for key in ['min_bounds', 'max_bounds', 'ranges', 'has_normals']:
self.assertIn(key, loaded_metadata)

# Check array fields are numpy arrays after load
for key in ['min_bounds', 'max_bounds', 'ranges']:
self.assertIsInstance(loaded_metadata[key], np.ndarray)

def test_error_handling(self):
"""Test error handling."""
# Test empty point cloud
Expand All @@ -156,5 +162,218 @@ def test_error_handling(self):
with self.assertRaisesRegex(ValueError, "shape must match"):
self.compressor.compress(self.point_cloud, normals=wrong_shape_normals)

# --- NaN / Inf / degenerate value tests ---

def test_save_load_metadata_with_nan_and_inf(self):
"""NaN and Inf scalar values in metadata are converted to None."""
save_path = Path(self.test_env['tmp_path']) / "special_values.npz"
grid = np.zeros((64, 64, 64), dtype=bool)
grid[0, 0, 0] = True
metadata = {
'min_bounds': np.array([0.0, 0.0, 0.0]),
'max_bounds': np.array([1.0, 1.0, 1.0]),
'ranges': np.array([1.0, 1.0, 1.0]),
'has_normals': False,
'nan_value': float('nan'),
'inf_value': float('inf'),
'neg_inf_value': float('-inf'),
}
self.compressor.save_compressed(grid, metadata, str(save_path))
_, loaded = self.compressor.load_compressed(str(save_path))
self.assertIsNone(loaded['nan_value'])
self.assertIsNone(loaded['inf_value'])
self.assertIsNone(loaded['neg_inf_value'])

def test_save_load_metadata_with_numpy_nan(self):
"""NaN from np.floating scalar is also converted to None."""
save_path = Path(self.test_env['tmp_path']) / "np_nan.npz"
grid = np.zeros((64, 64, 64), dtype=bool)
grid[0, 0, 0] = True
metadata = {
'min_bounds': np.array([0.0, 0.0, 0.0]),
'max_bounds': np.array([1.0, 1.0, 1.0]),
'ranges': np.array([1.0, 1.0, 1.0]),
'has_normals': False,
'compression_error': np.float64('nan'),
}
self.compressor.save_compressed(grid, metadata, str(save_path))
_, loaded = self.compressor.load_compressed(str(save_path))
self.assertIsNone(loaded['compression_error'])

def test_compress_all_points_same_voxel(self):
"""All identical points compress to single occupied voxel."""
same_points = np.full((100, 3), 5.0, dtype=np.float32)
grid, metadata = self.compressor.compress(same_points, validate=False)
self.assertEqual(np.sum(grid), 1)
np.testing.assert_allclose(metadata['ranges'], [1e-6, 1e-6, 1e-6])

# --- Zero / empty / boundary tests ---

def test_save_load_empty_grid(self):
"""All-False grid saves and loads correctly."""
save_path = Path(self.test_env['tmp_path']) / "empty_grid.npz"
grid = np.zeros((64, 64, 64), dtype=bool)
metadata = {
'min_bounds': np.array([0.0, 0.0, 0.0]),
'max_bounds': np.array([1.0, 1.0, 1.0]),
'ranges': np.array([1.0, 1.0, 1.0]),
'has_normals': False,
}
self.compressor.save_compressed(grid, metadata, str(save_path))
loaded_grid, loaded_metadata = self.compressor.load_compressed(str(save_path))
self.assertEqual(np.sum(loaded_grid), 0)
self.assertFalse(loaded_metadata['has_normals'])

def test_save_load_without_normals(self):
"""Metadata without normal_grid round-trips correctly."""
save_path = Path(self.test_env['tmp_path']) / "no_normals.npz"
grid, metadata = self.compressor.compress(self.point_cloud, validate=False)
self.assertFalse(metadata['has_normals'])
self.assertNotIn('normal_grid', metadata)

self.compressor.save_compressed(grid, metadata, str(save_path))
loaded_grid, loaded_metadata = self.compressor.load_compressed(str(save_path))
np.testing.assert_array_equal(grid, loaded_grid)
self.assertFalse(loaded_metadata['has_normals'])
self.assertNotIn('normal_grid', loaded_metadata)

# --- Negative / error path tests ---

def test_load_compressed_missing_metadata_file(self):
"""Missing .meta.json sidecar raises FileNotFoundError."""
save_path = Path(self.test_env['tmp_path']) / "partial_write.npz"
grid = np.zeros((64, 64, 64), dtype=bool)
metadata = {
'min_bounds': np.array([0.0, 0.0, 0.0]),
'max_bounds': np.array([1.0, 1.0, 1.0]),
'ranges': np.array([1.0, 1.0, 1.0]),
'has_normals': False,
}
self.compressor.save_compressed(grid, metadata, str(save_path))

# Simulate partial write: delete the sidecar
meta_path = Path(str(save_path) + '.meta.json')
meta_path.unlink()

with self.assertRaises(FileNotFoundError):
self.compressor.load_compressed(str(save_path))

def test_load_compressed_missing_grid_file(self):
"""Missing .npz grid file raises error."""
missing_path = Path(self.test_env['tmp_path']) / "nonexistent.npz"
with self.assertRaises(FileNotFoundError):
self.compressor.load_compressed(str(missing_path))

# --- Debug output security test ---

def test_debug_info_does_not_pickle_dicts(self):
"""Debug output skips dict values, only saves numpy arrays."""
self.compressor.compress(self.point_cloud, validate=False)

debug_dir = Path(self.test_env['tmp_path']) / 'debug' / 'grid_creation'
self.assertTrue(debug_dir.exists())

# 'metadata' (a dict) should NOT be saved as .npy
self.assertFalse((debug_dir / 'metadata.npy').exists())

# 'grid' and 'scaled_points' (arrays) SHOULD be saved
self.assertTrue((debug_dir / 'grid.npy').exists())
self.assertTrue((debug_dir / 'scaled_points.npy').exists())

# All saved .npy files must be loadable without pickle
for npy_file in debug_dir.glob('*.npy'):
np.load(str(npy_file), allow_pickle=False)

# --- Regression / format fidelity tests ---

def test_save_load_metadata_values_roundtrip(self):
"""Numeric metadata values are preserved after JSON round-trip."""
save_path = Path(self.test_env['tmp_path']) / "fidelity.npz"
grid, metadata = self.compressor.compress(self.point_cloud)
self.compressor.save_compressed(grid, metadata, str(save_path))
_, loaded = self.compressor.load_compressed(str(save_path))

np.testing.assert_allclose(
loaded['min_bounds'], metadata['min_bounds'], rtol=1e-6
)
np.testing.assert_allclose(
loaded['max_bounds'], metadata['max_bounds'], rtol=1e-6
)
np.testing.assert_allclose(
loaded['ranges'], metadata['ranges'], rtol=1e-6
)
self.assertAlmostEqual(
loaded['compression_error'], metadata['compression_error'], places=6
)

def test_save_load_numpy_scalar_metadata(self):
"""np.float64 and np.int32 scalars survive type conversion."""
save_path = Path(self.test_env['tmp_path']) / "scalar_types.npz"
grid = np.zeros((64, 64, 64), dtype=bool)
grid[0, 0, 0] = True
metadata = {
'min_bounds': np.array([0.0, 0.0, 0.0]),
'max_bounds': np.array([1.0, 1.0, 1.0]),
'ranges': np.array([1.0, 1.0, 1.0]),
'has_normals': False,
'float_scalar': np.float64(3.14),
'int_scalar': np.int32(42),
}
self.compressor.save_compressed(grid, metadata, str(save_path))
_, loaded = self.compressor.load_compressed(str(save_path))
self.assertAlmostEqual(loaded['float_scalar'], 3.14, places=10)
self.assertEqual(loaded['int_scalar'], 42)

def test_save_load_dtype_after_roundtrip(self):
"""Documents that float32 arrays become float64 after JSON round-trip."""
save_path = Path(self.test_env['tmp_path']) / "dtype_test.npz"
grid, metadata = self.compressor.compress(self.point_cloud, validate=False)
# Original is float32 from np.min on float32 input
self.assertEqual(metadata['min_bounds'].dtype, np.float32)

self.compressor.save_compressed(grid, metadata, str(save_path))
_, loaded = self.compressor.load_compressed(str(save_path))
# After JSON round-trip, np.array() defaults to float64
self.assertEqual(loaded['min_bounds'].dtype, np.float64)

def test_decompress_after_save_load_matches_direct(self):
"""Decompress from loaded metadata produces same points as from original."""
save_path = Path(self.test_env['tmp_path']) / "roundtrip_quality.npz"
grid, metadata = self.compressor.compress(self.point_cloud, validate=False)

# Decompress directly from original metadata
direct_points, _ = self.compressor.decompress(grid, metadata)

# Save, load, decompress
self.compressor.save_compressed(grid, metadata, str(save_path))
loaded_grid, loaded_metadata = self.compressor.load_compressed(str(save_path))
loaded_points, _ = self.compressor.decompress(loaded_grid, loaded_metadata)

# Points should match despite dtype change (float32 vs float64)
np.testing.assert_allclose(
loaded_points, direct_points.astype(np.float64), rtol=1e-5
)

# --- E2E test ---

@pytest.mark.e2e
def test_compress_save_load_decompress_quality(self):
"""Full pipeline: compress, save, load, decompress, verify quality."""
save_path = Path(self.test_env['tmp_path']) / "e2e.npz"

grid, metadata = self.compressor.compress(self.point_cloud)
original_error = metadata['compression_error']
self.compressor.save_compressed(grid, metadata, str(save_path))

loaded_grid, loaded_metadata = self.compressor.load_compressed(str(save_path))
decompressed, _ = self.compressor.decompress(loaded_grid, loaded_metadata)

# Decompressed point count should be reasonable
self.assertGreater(len(decompressed), 0)
# Reconstruction error should match original
self.assertAlmostEqual(
loaded_metadata['compression_error'], original_error, places=6
)

if __name__ == "__main__":
tf.test.main()
Loading