From 534cd5b7c1309847145e6dabb14ddfbc459f21fd Mon Sep 17 00:00:00 2001 From: PMCLSF Date: Mon, 16 Feb 2026 01:49:54 -0100 Subject: [PATCH 1/2] fix: eliminate critical/high security vulnerabilities - Remove numpy.load(allow_pickle=True) in compress_octree.py and training_pipeline.py to prevent arbitrary code execution via crafted .npy/.npz files. Metadata now saved as JSON sidecar; optimizer variables saved as individual .npy files with numeric dtypes. - Add path validation in training_pipeline.py (traversal guard) and evaluation_pipeline.py (existence check) for checkpoint loading. - Replace model.save() with model.save_weights() in cli_train.py to avoid full SavedModel format vulnerable to Keras deserialization RCE. Co-Authored-By: Claude Opus 4.6 --- src/cli_train.py | 4 ++-- src/compress_octree.py | 35 ++++++++++++++++++++++++++------- src/evaluation_pipeline.py | 5 ++++- src/training_pipeline.py | 25 ++++++++++++----------- tests/test_compress_octree.py | 8 +++++++- tests/test_training_pipeline.py | 5 +++++ 6 files changed, 59 insertions(+), 23 deletions(-) diff --git a/src/cli_train.py b/src/cli_train.py index 9eb21a00e..36ad2c44a 100644 --- a/src/cli_train.py +++ b/src/cli_train.py @@ -70,7 +70,7 @@ def tune_hyperparameters(input_dir, output_dir, num_epochs=10): best_hps = tuner.get_best_hyperparameters(num_trials=1)[0] print("Best Hyperparameters:", best_hps.values) - best_model.save(os.path.join(output_dir, 'best_model')) + best_model.save_weights(os.path.join(output_dir, 'best_model.weights.h5')) def main(): parser = argparse.ArgumentParser(description="Train a point cloud compression model with hyperparameter tuning.") @@ -94,7 +94,7 @@ def main(): model.compile(optimizer='adam', loss='mean_squared_error') dataset = load_and_preprocess_data(args.input_dir, args.batch_size) model.fit(dataset, epochs=args.num_epochs) - model.save(os.path.join(args.output_dir, 'trained_model')) + model.save_weights(os.path.join(args.output_dir, 'trained_model.weights.h5')) if __name__ == "__main__": main() diff --git a/src/compress_octree.py b/src/compress_octree.py index dd738ac38..cc88e9caa 100644 --- a/src/compress_octree.py +++ b/src/compress_octree.py @@ -186,14 +186,35 @@ def _save_debug_info(self, stage: str, data: Dict[str, Any]) -> None: def save_compressed(self, grid: np.ndarray, metadata: Dict[str, Any], filename: str) -> None: """Save compressed data with metadata.""" - os.makedirs(os.path.dirname(os.path.abspath(filename)), exist_ok=True) - np.savez_compressed(filename, grid=grid, metadata=metadata) + import json - if self.debug_output: - debug_path = f"{filename}.debug.npz" - np.savez_compressed(debug_path, **metadata) + os.makedirs(os.path.dirname(os.path.abspath(filename)), exist_ok=True) + # Save grid without pickle (bool array, no object dtype) + np.savez_compressed(filename, grid=grid) + # Save metadata as JSON sidecar (safe, no arbitrary code execution) + meta_path = filename + '.meta.json' + serializable = {} + for k, v in metadata.items(): + if isinstance(v, np.ndarray): + serializable[k] = v.tolist() + elif isinstance(v, (np.floating, np.integer)): + serializable[k] = v.item() + else: + serializable[k] = v + with open(meta_path, 'w') as f: + json.dump(serializable, f) def load_compressed(self, filename: str) -> Tuple[np.ndarray, Dict[str, Any]]: """Load compressed data with metadata.""" - data = np.load(filename, allow_pickle=True) - return data['grid'], data['metadata'].item() + import json + + data = np.load(filename, allow_pickle=False) + grid = data['grid'] + meta_path = filename + '.meta.json' + with open(meta_path, 'r') as f: + metadata = json.load(f) + # Convert lists back to numpy arrays for known array fields + for key in ('min_bounds', 'max_bounds', 'ranges', 'normal_grid'): + if key in metadata: + metadata[key] = np.array(metadata[key]) + return grid, metadata diff --git a/src/evaluation_pipeline.py b/src/evaluation_pipeline.py index a1787110b..2ede7926c 100644 --- a/src/evaluation_pipeline.py +++ b/src/evaluation_pipeline.py @@ -52,7 +52,10 @@ def _load_model(self) -> DeepCompressModel: # Load weights if checkpoint provided checkpoint_path = self.config.get('checkpoint_path') if checkpoint_path: - model.load_weights(checkpoint_path) + resolved = Path(checkpoint_path).resolve() + if not resolved.exists(): + raise FileNotFoundError(f"Checkpoint not found: {resolved}") + model.load_weights(str(resolved)) return model diff --git a/src/training_pipeline.py b/src/training_pipeline.py index ffdb1070f..18dc67162 100644 --- a/src/training_pipeline.py +++ b/src/training_pipeline.py @@ -155,26 +155,27 @@ def save_checkpoint(self, name: str): for opt_name, optimizer in self.optimizers.items(): if optimizer.variables: - opt_weights = [v.numpy() for v in optimizer.variables] - np.save( - str(checkpoint_path / f'{opt_name}_optimizer.npy'), - np.array(opt_weights, dtype=object), - allow_pickle=True, - ) + opt_dir = checkpoint_path / f'{opt_name}_optimizer' + opt_dir.mkdir(parents=True, exist_ok=True) + for i, v in enumerate(optimizer.variables): + np.save(str(opt_dir / f'{i}.npy'), v.numpy()) self.logger.info(f"Saved checkpoint: {name}") def load_checkpoint(self, name: str): - checkpoint_path = self.checkpoint_dir / name + checkpoint_path = (self.checkpoint_dir / name).resolve() + if not str(checkpoint_path).startswith(str(self.checkpoint_dir.resolve())): + raise ValueError(f"Checkpoint path escapes checkpoint directory: {name}") self.model.load_weights(str(checkpoint_path / 'model.weights.h5')) self.entropy_model.load_weights(str(checkpoint_path / 'entropy.weights.h5')) for opt_name, optimizer in self.optimizers.items(): - opt_path = checkpoint_path / f'{opt_name}_optimizer.npy' - if opt_path.exists() and optimizer.variables: - opt_weights = np.load(str(opt_path), allow_pickle=True) - for var, w in zip(optimizer.variables, opt_weights): - var.assign(w) + opt_dir = checkpoint_path / f'{opt_name}_optimizer' + if opt_dir.exists() and optimizer.variables: + for i, var in enumerate(optimizer.variables): + path = opt_dir / f'{i}.npy' + if path.exists(): + var.assign(np.load(str(path), allow_pickle=False)) self.logger.info(f"Loaded checkpoint: {name}") diff --git a/tests/test_compress_octree.py b/tests/test_compress_octree.py index 87ae819f7..5f8fdafb4 100644 --- a/tests/test_compress_octree.py +++ b/tests/test_compress_octree.py @@ -116,6 +116,7 @@ def test_octree_partitioning(self): def test_save_and_load(self): """Test saving and loading functionality.""" save_path = Path(self.test_env['tmp_path']) / "test_compressed.npz" + meta_path = Path(str(save_path) + '.meta.json') # Compress and save grid, metadata = self.compressor.compress( @@ -124,8 +125,9 @@ def test_save_and_load(self): ) self.compressor.save_compressed(grid, metadata, str(save_path)) - # Verify file exists + # Verify both files exist self.assertTrue(save_path.exists()) + self.assertTrue(meta_path.exists()) # Load and verify loaded_grid, loaded_metadata = self.compressor.load_compressed(str(save_path)) @@ -137,6 +139,10 @@ def test_save_and_load(self): for key in ['min_bounds', 'max_bounds', 'ranges', 'has_normals']: self.assertIn(key, loaded_metadata) + # Check array fields are numpy arrays after load + for key in ['min_bounds', 'max_bounds', 'ranges']: + self.assertIsInstance(loaded_metadata[key], np.ndarray) + def test_error_handling(self): """Test error handling.""" # Test empty point cloud diff --git a/tests/test_training_pipeline.py b/tests/test_training_pipeline.py index 4a5b4290a..6967dff08 100644 --- a/tests/test_training_pipeline.py +++ b/tests/test_training_pipeline.py @@ -99,6 +99,11 @@ def test_save_load_checkpoint(self, pipeline, tmp_path): checkpoint_dir = Path(pipeline.checkpoint_dir) / checkpoint_name assert (checkpoint_dir / 'model.weights.h5').exists() assert (checkpoint_dir / 'entropy.weights.h5').exists() + # Optimizer variables saved as individual .npy files in subdirectories + for opt_name in pipeline.optimizers: + opt_dir = checkpoint_dir / f'{opt_name}_optimizer' + if pipeline.optimizers[opt_name].variables: + assert opt_dir.exists() new_pipeline = TrainingPipeline(pipeline.config_path) # Build the new model before loading weights From 2dfa8bfffaa11f7255f05f329b80a34a3be0fea7 Mon Sep 17 00:00:00 2001 From: PMCLSF Date: Mon, 16 Feb 2026 15:02:01 -0100 Subject: [PATCH 2/2] test: add 26 tests for security fixes + fix 3 bugs found during coverage analysis MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Bug fixes in source: - compress_octree: _save_debug_info no longer pickles dicts (only saves ndarrays) - compress_octree: save_compressed converts NaN/Inf scalars to None for valid JSON - training_pipeline: path validation uses Path.relative_to() to prevent prefix collision bypass (e.g. checkpoints_evil matching checkpoints prefix) New tests (26 total, 213 → 239): - test_compress_octree (13): NaN/Inf metadata, empty grid, no-normals roundtrip, missing sidecar/grid files, debug pickle prevention, metadata value fidelity, numpy scalar types, dtype change documentation, E2E quality check - test_training_pipeline (9): path traversal/absolute/prefix-collision rejection, NaN in optimizer vars, save before training, missing weights, partial optimizer files, old pickle format ignored, optimizer state value fidelity - test_evaluation_pipeline (3): no checkpoint configured, empty string checkpoint, missing checkpoint raises FileNotFoundError - test_integration (1): checkpoint resume preserves eval loss Co-Authored-By: Claude Opus 4.6 --- src/compress_octree.py | 11 +- src/training_pipeline.py | 4 +- tests/test_compress_octree.py | 213 ++++++++++++++++++++++++++++++ tests/test_evaluation_pipeline.py | 58 ++++++++ tests/test_integration.py | 34 +++++ tests/test_training_pipeline.py | 187 ++++++++++++++++++++++++++ 6 files changed, 504 insertions(+), 3 deletions(-) diff --git a/src/compress_octree.py b/src/compress_octree.py index cc88e9caa..08d83d681 100644 --- a/src/compress_octree.py +++ b/src/compress_octree.py @@ -181,12 +181,13 @@ def _save_debug_info(self, stage: str, data: Dict[str, Any]) -> None: os.makedirs(debug_dir, exist_ok=True) for name, array in data.items(): - if isinstance(array, (np.ndarray, dict)): + if isinstance(array, np.ndarray): np.save(os.path.join(debug_dir, f"{name}.npy"), array) def save_compressed(self, grid: np.ndarray, metadata: Dict[str, Any], filename: str) -> None: """Save compressed data with metadata.""" import json + import math os.makedirs(os.path.dirname(os.path.abspath(filename)), exist_ok=True) # Save grid without pickle (bool array, no object dtype) @@ -198,7 +199,13 @@ def save_compressed(self, grid: np.ndarray, metadata: Dict[str, Any], filename: if isinstance(v, np.ndarray): serializable[k] = v.tolist() elif isinstance(v, (np.floating, np.integer)): - serializable[k] = v.item() + val = v.item() + if isinstance(val, float) and (math.isnan(val) or math.isinf(val)): + serializable[k] = None + else: + serializable[k] = val + elif isinstance(v, float) and (math.isnan(v) or math.isinf(v)): + serializable[k] = None else: serializable[k] = v with open(meta_path, 'w') as f: diff --git a/src/training_pipeline.py b/src/training_pipeline.py index 18dc67162..ab92b923c 100644 --- a/src/training_pipeline.py +++ b/src/training_pipeline.py @@ -164,7 +164,9 @@ def save_checkpoint(self, name: str): def load_checkpoint(self, name: str): checkpoint_path = (self.checkpoint_dir / name).resolve() - if not str(checkpoint_path).startswith(str(self.checkpoint_dir.resolve())): + try: + checkpoint_path.relative_to(self.checkpoint_dir.resolve()) + except ValueError: raise ValueError(f"Checkpoint path escapes checkpoint directory: {name}") self.model.load_weights(str(checkpoint_path / 'model.weights.h5')) self.entropy_model.load_weights(str(checkpoint_path / 'entropy.weights.h5')) diff --git a/tests/test_compress_octree.py b/tests/test_compress_octree.py index 5f8fdafb4..d66347908 100644 --- a/tests/test_compress_octree.py +++ b/tests/test_compress_octree.py @@ -162,5 +162,218 @@ def test_error_handling(self): with self.assertRaisesRegex(ValueError, "shape must match"): self.compressor.compress(self.point_cloud, normals=wrong_shape_normals) + # --- NaN / Inf / degenerate value tests --- + + def test_save_load_metadata_with_nan_and_inf(self): + """NaN and Inf scalar values in metadata are converted to None.""" + save_path = Path(self.test_env['tmp_path']) / "special_values.npz" + grid = np.zeros((64, 64, 64), dtype=bool) + grid[0, 0, 0] = True + metadata = { + 'min_bounds': np.array([0.0, 0.0, 0.0]), + 'max_bounds': np.array([1.0, 1.0, 1.0]), + 'ranges': np.array([1.0, 1.0, 1.0]), + 'has_normals': False, + 'nan_value': float('nan'), + 'inf_value': float('inf'), + 'neg_inf_value': float('-inf'), + } + self.compressor.save_compressed(grid, metadata, str(save_path)) + _, loaded = self.compressor.load_compressed(str(save_path)) + self.assertIsNone(loaded['nan_value']) + self.assertIsNone(loaded['inf_value']) + self.assertIsNone(loaded['neg_inf_value']) + + def test_save_load_metadata_with_numpy_nan(self): + """NaN from np.floating scalar is also converted to None.""" + save_path = Path(self.test_env['tmp_path']) / "np_nan.npz" + grid = np.zeros((64, 64, 64), dtype=bool) + grid[0, 0, 0] = True + metadata = { + 'min_bounds': np.array([0.0, 0.0, 0.0]), + 'max_bounds': np.array([1.0, 1.0, 1.0]), + 'ranges': np.array([1.0, 1.0, 1.0]), + 'has_normals': False, + 'compression_error': np.float64('nan'), + } + self.compressor.save_compressed(grid, metadata, str(save_path)) + _, loaded = self.compressor.load_compressed(str(save_path)) + self.assertIsNone(loaded['compression_error']) + + def test_compress_all_points_same_voxel(self): + """All identical points compress to single occupied voxel.""" + same_points = np.full((100, 3), 5.0, dtype=np.float32) + grid, metadata = self.compressor.compress(same_points, validate=False) + self.assertEqual(np.sum(grid), 1) + np.testing.assert_allclose(metadata['ranges'], [1e-6, 1e-6, 1e-6]) + + # --- Zero / empty / boundary tests --- + + def test_save_load_empty_grid(self): + """All-False grid saves and loads correctly.""" + save_path = Path(self.test_env['tmp_path']) / "empty_grid.npz" + grid = np.zeros((64, 64, 64), dtype=bool) + metadata = { + 'min_bounds': np.array([0.0, 0.0, 0.0]), + 'max_bounds': np.array([1.0, 1.0, 1.0]), + 'ranges': np.array([1.0, 1.0, 1.0]), + 'has_normals': False, + } + self.compressor.save_compressed(grid, metadata, str(save_path)) + loaded_grid, loaded_metadata = self.compressor.load_compressed(str(save_path)) + self.assertEqual(np.sum(loaded_grid), 0) + self.assertFalse(loaded_metadata['has_normals']) + + def test_save_load_without_normals(self): + """Metadata without normal_grid round-trips correctly.""" + save_path = Path(self.test_env['tmp_path']) / "no_normals.npz" + grid, metadata = self.compressor.compress(self.point_cloud, validate=False) + self.assertFalse(metadata['has_normals']) + self.assertNotIn('normal_grid', metadata) + + self.compressor.save_compressed(grid, metadata, str(save_path)) + loaded_grid, loaded_metadata = self.compressor.load_compressed(str(save_path)) + np.testing.assert_array_equal(grid, loaded_grid) + self.assertFalse(loaded_metadata['has_normals']) + self.assertNotIn('normal_grid', loaded_metadata) + + # --- Negative / error path tests --- + + def test_load_compressed_missing_metadata_file(self): + """Missing .meta.json sidecar raises FileNotFoundError.""" + save_path = Path(self.test_env['tmp_path']) / "partial_write.npz" + grid = np.zeros((64, 64, 64), dtype=bool) + metadata = { + 'min_bounds': np.array([0.0, 0.0, 0.0]), + 'max_bounds': np.array([1.0, 1.0, 1.0]), + 'ranges': np.array([1.0, 1.0, 1.0]), + 'has_normals': False, + } + self.compressor.save_compressed(grid, metadata, str(save_path)) + + # Simulate partial write: delete the sidecar + meta_path = Path(str(save_path) + '.meta.json') + meta_path.unlink() + + with self.assertRaises(FileNotFoundError): + self.compressor.load_compressed(str(save_path)) + + def test_load_compressed_missing_grid_file(self): + """Missing .npz grid file raises error.""" + missing_path = Path(self.test_env['tmp_path']) / "nonexistent.npz" + with self.assertRaises(FileNotFoundError): + self.compressor.load_compressed(str(missing_path)) + + # --- Debug output security test --- + + def test_debug_info_does_not_pickle_dicts(self): + """Debug output skips dict values, only saves numpy arrays.""" + self.compressor.compress(self.point_cloud, validate=False) + + debug_dir = Path(self.test_env['tmp_path']) / 'debug' / 'grid_creation' + self.assertTrue(debug_dir.exists()) + + # 'metadata' (a dict) should NOT be saved as .npy + self.assertFalse((debug_dir / 'metadata.npy').exists()) + + # 'grid' and 'scaled_points' (arrays) SHOULD be saved + self.assertTrue((debug_dir / 'grid.npy').exists()) + self.assertTrue((debug_dir / 'scaled_points.npy').exists()) + + # All saved .npy files must be loadable without pickle + for npy_file in debug_dir.glob('*.npy'): + np.load(str(npy_file), allow_pickle=False) + + # --- Regression / format fidelity tests --- + + def test_save_load_metadata_values_roundtrip(self): + """Numeric metadata values are preserved after JSON round-trip.""" + save_path = Path(self.test_env['tmp_path']) / "fidelity.npz" + grid, metadata = self.compressor.compress(self.point_cloud) + self.compressor.save_compressed(grid, metadata, str(save_path)) + _, loaded = self.compressor.load_compressed(str(save_path)) + + np.testing.assert_allclose( + loaded['min_bounds'], metadata['min_bounds'], rtol=1e-6 + ) + np.testing.assert_allclose( + loaded['max_bounds'], metadata['max_bounds'], rtol=1e-6 + ) + np.testing.assert_allclose( + loaded['ranges'], metadata['ranges'], rtol=1e-6 + ) + self.assertAlmostEqual( + loaded['compression_error'], metadata['compression_error'], places=6 + ) + + def test_save_load_numpy_scalar_metadata(self): + """np.float64 and np.int32 scalars survive type conversion.""" + save_path = Path(self.test_env['tmp_path']) / "scalar_types.npz" + grid = np.zeros((64, 64, 64), dtype=bool) + grid[0, 0, 0] = True + metadata = { + 'min_bounds': np.array([0.0, 0.0, 0.0]), + 'max_bounds': np.array([1.0, 1.0, 1.0]), + 'ranges': np.array([1.0, 1.0, 1.0]), + 'has_normals': False, + 'float_scalar': np.float64(3.14), + 'int_scalar': np.int32(42), + } + self.compressor.save_compressed(grid, metadata, str(save_path)) + _, loaded = self.compressor.load_compressed(str(save_path)) + self.assertAlmostEqual(loaded['float_scalar'], 3.14, places=10) + self.assertEqual(loaded['int_scalar'], 42) + + def test_save_load_dtype_after_roundtrip(self): + """Documents that float32 arrays become float64 after JSON round-trip.""" + save_path = Path(self.test_env['tmp_path']) / "dtype_test.npz" + grid, metadata = self.compressor.compress(self.point_cloud, validate=False) + # Original is float32 from np.min on float32 input + self.assertEqual(metadata['min_bounds'].dtype, np.float32) + + self.compressor.save_compressed(grid, metadata, str(save_path)) + _, loaded = self.compressor.load_compressed(str(save_path)) + # After JSON round-trip, np.array() defaults to float64 + self.assertEqual(loaded['min_bounds'].dtype, np.float64) + + def test_decompress_after_save_load_matches_direct(self): + """Decompress from loaded metadata produces same points as from original.""" + save_path = Path(self.test_env['tmp_path']) / "roundtrip_quality.npz" + grid, metadata = self.compressor.compress(self.point_cloud, validate=False) + + # Decompress directly from original metadata + direct_points, _ = self.compressor.decompress(grid, metadata) + + # Save, load, decompress + self.compressor.save_compressed(grid, metadata, str(save_path)) + loaded_grid, loaded_metadata = self.compressor.load_compressed(str(save_path)) + loaded_points, _ = self.compressor.decompress(loaded_grid, loaded_metadata) + + # Points should match despite dtype change (float32 vs float64) + np.testing.assert_allclose( + loaded_points, direct_points.astype(np.float64), rtol=1e-5 + ) + + # --- E2E test --- + + @pytest.mark.e2e + def test_compress_save_load_decompress_quality(self): + """Full pipeline: compress, save, load, decompress, verify quality.""" + save_path = Path(self.test_env['tmp_path']) / "e2e.npz" + + grid, metadata = self.compressor.compress(self.point_cloud) + original_error = metadata['compression_error'] + self.compressor.save_compressed(grid, metadata, str(save_path)) + + loaded_grid, loaded_metadata = self.compressor.load_compressed(str(save_path)) + decompressed, _ = self.compressor.decompress(loaded_grid, loaded_metadata) + + # Decompressed point count should be reasonable + self.assertGreater(len(decompressed), 0) + # Reconstruction error should match original + self.assertAlmostEqual( + loaded_metadata['compression_error'], original_error, places=6 + ) + if __name__ == "__main__": tf.test.main() diff --git a/tests/test_evaluation_pipeline.py b/tests/test_evaluation_pipeline.py index dc254a2f6..1608406ae 100644 --- a/tests/test_evaluation_pipeline.py +++ b/tests/test_evaluation_pipeline.py @@ -102,5 +102,63 @@ def test_generate_report(self, pipeline, tmp_path): assert 'aggregate_metrics' in report_data assert len(report_data['model_performance']) == len(results) + def test_load_model_no_checkpoint_configured(self, config_path): + """Pipeline initializes when config has no checkpoint_path.""" + pipeline = EvaluationPipeline(config_path) + assert pipeline.model is not None + assert pipeline.config.get('checkpoint_path') is None + + def test_load_model_empty_string_checkpoint(self, tmp_path): + """Empty string checkpoint_path is treated as no checkpoint.""" + config = { + 'data': { + 'modelnet40_path': str(tmp_path / 'modelnet40'), + 'ivfb_path': str(tmp_path / '8ivfb') + }, + 'model': { + 'filters': 32, + 'activation': 'cenic_gdn', + 'conv_type': 'separable' + }, + 'evaluation': { + 'metrics': ['psnr'], + 'output_dir': str(tmp_path / 'results'), + 'visualize': True + }, + 'checkpoint_path': '' + } + config_file = tmp_path / 'config_empty_ckpt.yml' + with open(config_file, 'w') as f: + yaml.dump(config, f) + + pipeline = EvaluationPipeline(str(config_file)) + assert pipeline.model is not None + + def test_load_model_missing_checkpoint_raises(self, tmp_path): + """Non-existent checkpoint_path raises FileNotFoundError.""" + config = { + 'data': { + 'modelnet40_path': str(tmp_path / 'modelnet40'), + 'ivfb_path': str(tmp_path / '8ivfb') + }, + 'model': { + 'filters': 32, + 'activation': 'cenic_gdn', + 'conv_type': 'separable' + }, + 'evaluation': { + 'metrics': ['psnr'], + 'output_dir': str(tmp_path / 'results'), + 'visualize': True + }, + 'checkpoint_path': str(tmp_path / 'nonexistent' / 'model.weights.h5') + } + config_file = tmp_path / 'config_missing_ckpt.yml' + with open(config_file, 'w') as f: + yaml.dump(config, f) + + with pytest.raises(FileNotFoundError, match="Checkpoint not found"): + EvaluationPipeline(str(config_file)) + if __name__ == '__main__': tf.test.main() diff --git a/tests/test_integration.py b/tests/test_integration.py index 8714810ab..2f3b02e9a 100644 --- a/tests/test_integration.py +++ b/tests/test_integration.py @@ -322,5 +322,39 @@ def test_v2_gaussian_backward_compatible(self): self.assertEqual(x_hat.shape[:-1], input_tensor.shape[:-1]) +class TestCheckpointResumeIntegration(tf.test.TestCase): + """Integration test for checkpoint save/load through new serialization format.""" + + @pytest.fixture(autouse=True) + def setup(self, tmp_path): + self.test_env = setup_test_environment(tmp_path) + self.resolution = 16 + self.batch_size = 1 + + @pytest.mark.integration + def test_training_checkpoint_resume_loss_continuity(self): + """Model state is preserved through checkpoint save/load cycle.""" + pipeline = TrainingPipeline(self.test_env['config_path']) + batch = create_mock_voxel_grid(self.resolution, self.batch_size)[..., 0] + + # Train a few steps to establish non-trivial model + optimizer state + for _ in range(3): + pipeline._train_step(batch, training=True) + + pipeline.save_checkpoint('resume_test') + + # Record eval loss at checkpoint + checkpoint_loss = pipeline._train_step(batch, training=False)['total_loss'] + + # Load into fresh pipeline and verify same eval loss + new_pipeline = TrainingPipeline(pipeline.config_path) + new_pipeline._train_step(batch, training=True) # Build optimizer variables + new_pipeline.load_checkpoint('resume_test') + + resumed_loss = new_pipeline._train_step(batch, training=False)['total_loss'] + + self.assertAllClose(checkpoint_loss, resumed_loss, rtol=1e-4) + + if __name__ == '__main__': tf.test.main() diff --git a/tests/test_training_pipeline.py b/tests/test_training_pipeline.py index 6967dff08..afeb14007 100644 --- a/tests/test_training_pipeline.py +++ b/tests/test_training_pipeline.py @@ -1,6 +1,7 @@ import sys from pathlib import Path +import numpy as np import pytest import tensorflow as tf import yaml @@ -134,3 +135,189 @@ def create_sample_batch(): checkpoint_dir = Path(pipeline.checkpoint_dir) assert len(list(checkpoint_dir.glob('epoch_*'))) > 0 assert (checkpoint_dir / 'best_model').exists() + + # --- Security / path validation tests --- + + def test_load_checkpoint_rejects_path_traversal(self, pipeline): + """Path traversal via ../ is rejected.""" + with pytest.raises(ValueError, match="escapes"): + pipeline.load_checkpoint('../../etc/passwd') + + def test_load_checkpoint_rejects_absolute_path(self, pipeline): + """Absolute path outside checkpoint dir is rejected.""" + with pytest.raises(ValueError, match="escapes"): + pipeline.load_checkpoint('/tmp/evil_checkpoint') + + def test_load_checkpoint_prefix_collision(self, pipeline, tmp_path): + """Sibling directory with prefix-matching name is rejected.""" + # checkpoint_dir is tmp_path / 'checkpoints' + # Create a sibling with a name that is a prefix match + evil_dir = tmp_path / 'checkpoints_evil' + evil_dir.mkdir() + + # '../checkpoints_evil' resolves outside checkpoint_dir but + # starts with the same string prefix — must still be rejected + with pytest.raises(ValueError, match="escapes"): + pipeline.load_checkpoint('../checkpoints_evil') + + # --- NaN / degenerate value tests --- + + def test_checkpoint_nan_in_optimizer_variable(self, pipeline): + """NaN in optimizer variables is preserved through save/load.""" + dummy = tf.zeros((1, 16, 16, 16, 1)) + pipeline.model(dummy, training=False) + y = pipeline.model.analysis(dummy) + pipeline.entropy_model(y, training=False) + + # Train to populate momentum/variance variables + batch = tf.zeros((1, 16, 16, 16)) + pipeline._train_step(batch, training=True) + + opt = pipeline.optimizers['reconstruction'] + # Find a float variable (skip int64 iteration counter) + float_vars = [(i, v) for i, v in enumerate(opt.variables) + if v.dtype == tf.float32] + assert len(float_vars) > 0 + idx, target_var = float_vars[0] + + nan_value = np.full_like(target_var.numpy(), float('nan')) + target_var.assign(nan_value) + + pipeline.save_checkpoint('nan_test') + + # Load into fresh pipeline + new_pipeline = TrainingPipeline(pipeline.config_path) + new_pipeline.model(dummy, training=False) + y2 = new_pipeline.model.analysis(dummy) + new_pipeline.entropy_model(y2, training=False) + new_pipeline._train_step(batch, training=True) + new_pipeline.load_checkpoint('nan_test') + + loaded_var = new_pipeline.optimizers['reconstruction'].variables[idx] + assert np.all(np.isnan(loaded_var.numpy())) + + # --- Zero / empty / boundary tests --- + + def test_save_checkpoint_before_training(self, pipeline): + """Checkpoint saved before training loads without error.""" + dummy = tf.zeros((1, 16, 16, 16, 1)) + pipeline.model(dummy, training=False) + y = pipeline.model.analysis(dummy) + pipeline.entropy_model(y, training=False) + + # No training step — optimizer has only internal state (iteration counter) + pipeline.save_checkpoint('untrained') + + checkpoint_dir = Path(pipeline.checkpoint_dir) / 'untrained' + assert (checkpoint_dir / 'model.weights.h5').exists() + assert (checkpoint_dir / 'entropy.weights.h5').exists() + + # Loading the untrained checkpoint should not crash + new_pipeline = TrainingPipeline(pipeline.config_path) + new_pipeline.model(dummy, training=False) + y2 = new_pipeline.model.analysis(dummy) + new_pipeline.entropy_model(y2, training=False) + new_pipeline.load_checkpoint('untrained') + + # --- Negative / error path tests --- + + def test_load_checkpoint_missing_weights_file(self, pipeline): + """Missing model weights file raises error on load.""" + dummy = tf.zeros((1, 16, 16, 16, 1)) + pipeline.model(dummy, training=False) + y = pipeline.model.analysis(dummy) + pipeline.entropy_model(y, training=False) + + pipeline.save_checkpoint('incomplete') + + # Delete the model weights file + weights_path = Path(pipeline.checkpoint_dir) / 'incomplete' / 'model.weights.h5' + weights_path.unlink() + + new_pipeline = TrainingPipeline(pipeline.config_path) + new_pipeline.model(dummy, training=False) + y2 = new_pipeline.model.analysis(dummy) + new_pipeline.entropy_model(y2, training=False) + + with pytest.raises(Exception): + new_pipeline.load_checkpoint('incomplete') + + def test_checkpoint_partial_optimizer_files(self, pipeline): + """Missing optimizer .npy files are silently skipped.""" + dummy = tf.zeros((1, 16, 16, 16, 1)) + pipeline.model(dummy, training=False) + y = pipeline.model.analysis(dummy) + pipeline.entropy_model(y, training=False) + + batch = tf.zeros((1, 16, 16, 16)) + pipeline._train_step(batch, training=True) + pipeline.save_checkpoint('partial_test') + + # Delete the last .npy file from an optimizer dir + opt_dir = Path(pipeline.checkpoint_dir) / 'partial_test' / 'reconstruction_optimizer' + if opt_dir.exists(): + npy_files = sorted(opt_dir.glob('*.npy')) + if len(npy_files) > 1: + npy_files[-1].unlink() + + # Loading should succeed — missing files silently skipped + new_pipeline = TrainingPipeline(pipeline.config_path) + new_pipeline.model(dummy, training=False) + y2 = new_pipeline.model.analysis(dummy) + new_pipeline.entropy_model(y2, training=False) + new_pipeline._train_step(batch, training=True) + new_pipeline.load_checkpoint('partial_test') + + # --- Regression tests --- + + def test_load_old_format_pickle_file_ignored(self, pipeline): + """Old-style pickle .npy file at checkpoint level is safely ignored.""" + dummy = tf.zeros((1, 16, 16, 16, 1)) + pipeline.model(dummy, training=False) + y = pipeline.model.analysis(dummy) + pipeline.entropy_model(y, training=False) + + pipeline.save_checkpoint('format_test') + + # Place an old-format pickle file alongside new-format directories + checkpoint_dir = Path(pipeline.checkpoint_dir) / 'format_test' + old_file = checkpoint_dir / 'stale_optimizer.npy' + np.save(str(old_file), np.array([np.zeros(5)], dtype=object), + allow_pickle=True) + + # Loading should succeed, ignoring the old file + new_pipeline = TrainingPipeline(pipeline.config_path) + new_pipeline.model(dummy, training=False) + y2 = new_pipeline.model.analysis(dummy) + new_pipeline.entropy_model(y2, training=False) + new_pipeline.load_checkpoint('format_test') + + # --- Integration test --- + + def test_checkpoint_optimizer_state_values_survive_roundtrip(self, pipeline): + """Optimizer variable values are numerically equal after save/load.""" + dummy = tf.zeros((1, 16, 16, 16, 1)) + pipeline.model(dummy, training=False) + y = pipeline.model.analysis(dummy) + pipeline.entropy_model(y, training=False) + + batch = tf.zeros((1, 16, 16, 16)) + for _ in range(3): + pipeline._train_step(batch, training=True) + + opt = pipeline.optimizers['reconstruction'] + original_values = [v.numpy().copy() for v in opt.variables] + + pipeline.save_checkpoint('opt_fidelity') + + new_pipeline = TrainingPipeline(pipeline.config_path) + new_pipeline.model(dummy, training=False) + y2 = new_pipeline.model.analysis(dummy) + new_pipeline.entropy_model(y2, training=False) + new_pipeline._train_step(batch, training=True) + new_pipeline.load_checkpoint('opt_fidelity') + + new_opt = new_pipeline.optimizers['reconstruction'] + for orig, loaded in zip(original_values, + [v.numpy() for v in new_opt.variables]): + np.testing.assert_array_equal(orig, loaded)