From 879a58c382e0d530dd44f3f0e6107f51780fd1a0 Mon Sep 17 00:00:00 2001 From: cloudforge1 Date: Mon, 9 Mar 2026 22:35:04 +0800 Subject: [PATCH 1/8] =?UTF-8?q?=E3=80=90Hackathon=2010th=20Spring=20No.32?= =?UTF-8?q?=E3=80=91Unit=20test=20for=20load=5Fweight=5Futils.py?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../model_executor/test_load_weight_utils.py | 775 ++++++++++++++++++ 1 file changed, 775 insertions(+) create mode 100644 tests/model_executor/test_load_weight_utils.py diff --git a/tests/model_executor/test_load_weight_utils.py b/tests/model_executor/test_load_weight_utils.py new file mode 100644 index 00000000000..ab8601b137f --- /dev/null +++ b/tests/model_executor/test_load_weight_utils.py @@ -0,0 +1,775 @@ +""" +# Copyright (c) 2025 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License" +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" + +import json +import os +import tempfile +import time +import unittest +from pathlib import Path +from types import SimpleNamespace +from unittest.mock import MagicMock, patch + +import numpy as np +import paddle + +from fastdeploy.model_executor.load_weight_utils import ( + get_all_weights_file, + get_model_path, + get_weight_iterator, + is_weight_cache_enabled, + kv_cache_scale_iterator, + load_composite_checkpoint, + load_kv_cache_scale, + load_weights_from_cache, + measure_time, + natural_key, + save_model, +) + +# ═══════════════════ Helpers ═══════════════════ + + +def _make_fd_config(**overrides): + """Build a minimal FDConfig-like object for testing.""" + model_cfg = SimpleNamespace( + model="/fake/model", + model_type="ernie", + num_hidden_layers=2, + moe_num_experts=8, + moe_layer_start_index=0, + prefix_layer_name="layers", + max_model_len=2048, + kv_cache_quant_scale_path="/nonexistent/kv_cache_scale.json", + pretrained_config=SimpleNamespace(use_sequence_parallel_moe=False), + ) + parallel_cfg = SimpleNamespace( + tensor_parallel_size=1, + tensor_parallel_rank=0, + expert_parallel_size=1, + num_experts_start_offset=0, + num_experts_per_rank=4, + use_ep=False, + use_sequence_parallel_moe=False, + ) + quant_cfg = SimpleNamespace( + name=lambda: "w8a8", + is_checkpoint_bf16=False, + kv_cache_quant_type="none", + ) + load_cfg = SimpleNamespace(is_pre_sharded=False) + cache_cfg = SimpleNamespace() + speculative_cfg = SimpleNamespace(model_type="main") + cfg = SimpleNamespace( + model_config=model_cfg, + parallel_config=parallel_cfg, + quant_config=quant_cfg, + load_config=load_cfg, + cache_config=cache_cfg, + speculative_config=speculative_cfg, + ) + for k, v in overrides.items(): + setattr(cfg, k, v) + return cfg + + +# ═══════════════════ Tests: natural_key ═══════════════════ + + +class TestNaturalKey(unittest.TestCase): + """Tests for natural_key() string sorting helper.""" + + def test_pure_alpha(self): + result = natural_key("abc") + self.assertEqual(result, ["abc"]) + + def test_pure_digits(self): + result = natural_key("123") + self.assertEqual(result, ["", 123, ""]) + + def test_mixed(self): + result = natural_key("layer_12_weight") + self.assertEqual(result, ["layer_", 12, "_weight"]) + + def test_multi_numbers(self): + result = natural_key("model_3_layer_42") + self.assertEqual(result, ["model_", 3, "_layer_", 42, ""]) + + def test_sorting_order(self): + names = ["file2", "file10", "file1", "file20"] + sorted_names = sorted(names, key=natural_key) + self.assertEqual(sorted_names, ["file1", "file2", "file10", "file20"]) + + def test_empty_string(self): + result = natural_key("") + self.assertEqual(result, [""]) + + def test_leading_digit(self): + result = natural_key("0abc") + self.assertEqual(result, ["", 0, "abc"]) + + +# ═══════════════════ Tests: measure_time ═══════════════════ + + +class TestMeasureTime(unittest.TestCase): + """Tests for measure_time() decorator.""" + + def test_basic_timing(self): + @measure_time("Test op") + def slow_func(): + time.sleep(0.01) + return 42 + + result = slow_func() + self.assertEqual(result, 42) + + def test_preserves_return_value(self): + @measure_time("Return test") + def identity(x): + return x * 2 + + self.assertEqual(identity(5), 10) + + def test_preserves_args_kwargs(self): + @measure_time("Args test") + def add(a, b, extra=0): + return a + b + extra + + self.assertEqual(add(1, 2, extra=3), 6) + + def test_custom_prefix(self): + with patch("fastdeploy.model_executor.load_weight_utils.logger") as mock_logger: + + @measure_time("Custom prefix") + def noop(): + pass + + noop() + mock_logger.info.assert_called_once() + call_arg = mock_logger.info.call_args[0][0] + self.assertIn("Custom prefix", call_arg) + + +# ═══════════════════ Tests: get_all_weights_file ═══════════════════ + + +class TestGetAllWeightsFile(unittest.TestCase): + """Tests for get_all_weights_file() weight file discovery.""" + + def test_pdparams_detection(self): + with tempfile.TemporaryDirectory() as tmpdir: + # Create .pdparams files (but not scheduler.pdparams) + Path(tmpdir, "model-00001.pdparams").write_bytes(b"") + Path(tmpdir, "model-00002.pdparams").write_bytes(b"") + Path(tmpdir, "scheduler.pdparams").write_bytes(b"") + + files_list, ordered_map, use_safetensors, is_key_ordered = get_all_weights_file(tmpdir) + + self.assertFalse(use_safetensors) + self.assertEqual(len(files_list), 2) + self.assertEqual(ordered_map, {}) + self.assertFalse(is_key_ordered) + # scheduler.pdparams should be excluded + for f in files_list: + self.assertNotIn("scheduler", f) + + def test_single_safetensors(self): + with tempfile.TemporaryDirectory() as tmpdir: + # Create a single model.safetensors file using safetensors library + from safetensors.numpy import save_file + + tensors = {"weight_a": np.zeros((2, 3), dtype=np.float32), "weight_b": np.ones((4,), dtype=np.float32)} + save_file(tensors, os.path.join(tmpdir, "model.safetensors")) + + files_list, ordered_map, use_safetensors, is_key_ordered = get_all_weights_file(tmpdir) + + self.assertTrue(use_safetensors) + self.assertTrue(is_key_ordered) + self.assertEqual(len(files_list), 1) + self.assertIn("model.safetensors", files_list[0]) + self.assertIn("weight_a", ordered_map) + self.assertIn("weight_b", ordered_map) + + def test_sharded_safetensors_with_index(self): + with tempfile.TemporaryDirectory() as tmpdir: + from safetensors.numpy import save_file + + # Create two shard files + save_file({"weight_a": np.zeros((2,), dtype=np.float32)}, os.path.join(tmpdir, "model-00001.safetensors")) + save_file({"weight_b": np.ones((3,), dtype=np.float32)}, os.path.join(tmpdir, "model-00002.safetensors")) + + # Create index file + index = { + "weight_map": { + "weight_a": "model-00001.safetensors", + "weight_b": "model-00002.safetensors", + } + } + with open(os.path.join(tmpdir, "model.safetensors.index.json"), "w") as f: + json.dump(index, f) + + files_list, ordered_map, use_safetensors, is_key_ordered = get_all_weights_file(tmpdir) + + self.assertTrue(use_safetensors) + self.assertEqual(len(files_list), 2) + self.assertIn("weight_a", ordered_map) + self.assertIn("weight_b", ordered_map) + + +# ═══════════════════ Tests: kv_cache_scale_iterator ═══════════════════ + + +class TestKvCacheScaleIterator(unittest.TestCase): + """Tests for kv_cache_scale_iterator() JSON scale loading.""" + + def test_basic_iteration(self): + data = {"layer.0.k_scale": 0.5, "layer.0.v_scale": 0.25} + with tempfile.NamedTemporaryFile(mode="w", suffix=".json", delete=False) as f: + json.dump(data, f) + f.flush() + path = f.name + + try: + results = dict(kv_cache_scale_iterator(path)) + self.assertIn("layer.0.k_scale", results) + self.assertIn("layer.0.v_scale", results) + # Values should be multiplied by 448.0 + np.testing.assert_allclose(results["layer.0.k_scale"].numpy(), 0.5 * 448.0, rtol=1e-5) + np.testing.assert_allclose(results["layer.0.v_scale"].numpy(), 0.25 * 448.0, rtol=1e-5) + finally: + os.unlink(path) + + def test_empty_json(self): + with tempfile.NamedTemporaryFile(mode="w", suffix=".json", delete=False) as f: + json.dump({}, f) + f.flush() + path = f.name + try: + results = list(kv_cache_scale_iterator(path)) + self.assertEqual(len(results), 0) + finally: + os.unlink(path) + + def test_result_types(self): + data = {"scale_0": 1.0} + with tempfile.NamedTemporaryFile(mode="w", suffix=".json", delete=False) as f: + json.dump(data, f) + f.flush() + path = f.name + try: + for key, tensor in kv_cache_scale_iterator(path): + self.assertIsInstance(key, str) + self.assertIsInstance(tensor, paddle.Tensor) + finally: + os.unlink(path) + + +# ═══════════════════ Tests: get_model_path ═══════════════════ + + +class TestGetModelPath(unittest.TestCase): + """Tests for get_model_path() model directory resolution.""" + + def test_no_rank_dirs(self): + with tempfile.TemporaryDirectory() as tmpdir: + fd_config = _make_fd_config() + fd_config.model_config.model = tmpdir + result = get_model_path(fd_config) + self.assertEqual(result, tmpdir) + + def test_single_rank_dir(self): + with tempfile.TemporaryDirectory() as tmpdir: + os.makedirs(os.path.join(tmpdir, "rank0")) + fd_config = _make_fd_config() + fd_config.model_config.model = tmpdir + result = get_model_path(fd_config) + # Single rank dir should not trigger pre-sharding + self.assertEqual(result, tmpdir) + + def test_multi_rank_dirs_matching_tp(self): + with tempfile.TemporaryDirectory() as tmpdir: + os.makedirs(os.path.join(tmpdir, "rank0")) + os.makedirs(os.path.join(tmpdir, "rank1")) + fd_config = _make_fd_config() + fd_config.model_config.model = tmpdir + fd_config.parallel_config.tensor_parallel_size = 2 + fd_config.parallel_config.tensor_parallel_rank = 1 + + result = get_model_path(fd_config) + self.assertEqual(result, os.path.join(tmpdir, "rank1")) + self.assertTrue(fd_config.load_config.is_pre_sharded) + + def test_multi_rank_dirs_mismatched_tp(self): + with tempfile.TemporaryDirectory() as tmpdir: + os.makedirs(os.path.join(tmpdir, "rank0")) + os.makedirs(os.path.join(tmpdir, "rank1")) + fd_config = _make_fd_config() + fd_config.model_config.model = tmpdir + fd_config.parallel_config.tensor_parallel_size = 4 # mismatch + + with self.assertRaises(ValueError) as ctx: + get_model_path(fd_config) + self.assertIn("tp2", str(ctx.exception)) + + +# ═══════════════════ Tests: is_weight_cache_enabled ═══════════════════ + + +class TestIsWeightCacheEnabled(unittest.TestCase): + """Tests for is_weight_cache_enabled() cache detection.""" + + def test_cache_disabled_when_env_off(self): + fd_config = _make_fd_config() + with patch("fastdeploy.model_executor.load_weight_utils.envs") as mock_envs: + mock_envs.FD_ENABLE_MODEL_LOAD_CACHE = False + enable, cache_dir, ctx = is_weight_cache_enabled(fd_config) + self.assertFalse(enable) + self.assertIsNone(cache_dir) + + def test_cache_disabled_when_no_quant_config(self): + fd_config = _make_fd_config() + fd_config.quant_config = None + with patch("fastdeploy.model_executor.load_weight_utils.envs") as mock_envs: + mock_envs.FD_ENABLE_MODEL_LOAD_CACHE = True + enable, cache_dir, ctx = is_weight_cache_enabled(fd_config) + self.assertFalse(enable) + + def test_cache_enabled_when_dir_exists(self): + with tempfile.TemporaryDirectory() as tmpdir: + fd_config = _make_fd_config() + fd_config.model_config.model = tmpdir + fd_config.quant_config.is_checkpoint_bf16 = False + + with patch("fastdeploy.model_executor.load_weight_utils.envs") as mock_envs: + mock_envs.FD_ENABLE_MODEL_LOAD_CACHE = True + + # First call — no cache dir → disabled + enable, cache_dir, ctx = is_weight_cache_enabled(fd_config) + self.assertFalse(enable) + + # Now create the cache dir + if cache_dir is not None: + os.makedirs(cache_dir, exist_ok=True) + enable2, _, _ = is_weight_cache_enabled(fd_config) + self.assertTrue(enable2) + + def test_cache_dir_uses_hash(self): + with tempfile.TemporaryDirectory() as tmpdir: + fd_config = _make_fd_config() + fd_config.model_config.model = tmpdir + + with patch("fastdeploy.model_executor.load_weight_utils.envs") as mock_envs: + mock_envs.FD_ENABLE_MODEL_LOAD_CACHE = True + _, cache_dir, _ = is_weight_cache_enabled(fd_config) + if cache_dir is not None: + # Cache dir should contain a hash + self.assertIn(".cache", cache_dir) + cache_subdir = os.path.basename(cache_dir) + self.assertGreater(len(cache_subdir), 0) + + +# ═══════════════════ Tests: load_weights_from_cache ═══════════════════ + + +class TestLoadWeightsFromCache(unittest.TestCase): + """Tests for load_weights_from_cache() parameter loading.""" + + def test_basic_weight_loading(self): + # Create a simple model with named parameters + linear = paddle.nn.Linear(4, 3) + + new_weight = paddle.randn([4, 3]) + weights_iter = iter([("weight", new_weight)]) + + load_weights_from_cache(linear, weights_iter) + np.testing.assert_allclose(linear.weight.numpy(), new_weight.numpy(), rtol=1e-6) + + def test_shape_mismatch_raises(self): + linear = paddle.nn.Linear(4, 3) + wrong_shape_weight = paddle.randn([5, 3]) + weights_iter = iter([("weight", wrong_shape_weight)]) + + with self.assertRaises(ValueError) as ctx: + load_weights_from_cache(linear, weights_iter) + self.assertIn("Shape mismatch", str(ctx.exception)) + + def test_missing_weight_skipped(self): + linear = paddle.nn.Linear(4, 3) + old_weight = linear.weight.numpy().copy() + + weights_iter = iter([("nonexistent_param", paddle.randn([2, 2]))]) + # Should not raise, just skip missing params + load_weights_from_cache(linear, weights_iter) + np.testing.assert_allclose(linear.weight.numpy(), old_weight, rtol=1e-6) + + +# ═══════════════════ Tests: get_weight_iterator ═══════════════════ + + +class TestGetWeightIterator(unittest.TestCase): + """Tests for get_weight_iterator() weight loading dispatcher.""" + + def test_safetensors_single_file(self): + from safetensors.numpy import save_file + + with tempfile.TemporaryDirectory() as tmpdir: + tensors = {"param_a": np.random.randn(2, 3).astype(np.float32)} + save_file(tensors, os.path.join(tmpdir, "model.safetensors")) + + results = dict(get_weight_iterator(tmpdir)) + self.assertIn("param_a", results) + np.testing.assert_allclose(results["param_a"].numpy(), tensors["param_a"], rtol=1e-6) + + def test_safetensors_sharded(self): + from safetensors.numpy import save_file + + with tempfile.TemporaryDirectory() as tmpdir: + save_file({"w1": np.array([1.0, 2.0], dtype=np.float32)}, os.path.join(tmpdir, "shard-001.safetensors")) + save_file({"w2": np.array([3.0, 4.0], dtype=np.float32)}, os.path.join(tmpdir, "shard-002.safetensors")) + + index = {"weight_map": {"w1": "shard-001.safetensors", "w2": "shard-002.safetensors"}} + with open(os.path.join(tmpdir, "model.safetensors.index.json"), "w") as f: + json.dump(index, f) + + results = dict(get_weight_iterator(tmpdir)) + self.assertIn("w1", results) + self.assertIn("w2", results) + np.testing.assert_allclose(results["w1"].numpy(), [1.0, 2.0], rtol=1e-6) + np.testing.assert_allclose(results["w2"].numpy(), [3.0, 4.0], rtol=1e-6) + + def test_kv_cache_scale_included(self): + from safetensors.numpy import save_file + + with tempfile.TemporaryDirectory() as tmpdir: + save_file({"w": np.zeros((1,), dtype=np.float32)}, os.path.join(tmpdir, "model.safetensors")) + + scales = {"k_scale": 0.1} + with open(os.path.join(tmpdir, "kv_cache_scale.json"), "w") as f: + json.dump(scales, f) + + results = dict(get_weight_iterator(tmpdir)) + self.assertIn("w", results) + self.assertIn("k_scale", results) + np.testing.assert_allclose(results["k_scale"].numpy(), 0.1 * 448.0, rtol=1e-5) + + +# ═══════════════════ Tests: load_kv_cache_scale ═══════════════════ + + +class TestLoadKvCacheScale(unittest.TestCase): + """Tests for load_kv_cache_scale() JSON scale loading into state_dict.""" + + def test_loads_scales(self): + with tempfile.TemporaryDirectory() as tmpdir: + scale_path = os.path.join(tmpdir, "kv_cache_scale.json") + scales = { + "ernie.layers.0.self_attn.cachek_matmul.activation_scale": 0.5, + "ernie.layers.0.self_attn.cachev_matmul.activation_scale": 0.25, + "ernie.layers.1.self_attn.cachek_matmul.activation_scale": 0.75, + "ernie.layers.1.self_attn.cachev_matmul.activation_scale": 0.125, + } + with open(scale_path, "w") as f: + json.dump(scales, f) + + fd_config = _make_fd_config() + fd_config.model_config.kv_cache_quant_scale_path = scale_path + fd_config.model_config.prefix_layer_name = "layers" + fd_config.model_config.num_hidden_layers = 2 + + state_dict = {} + load_kv_cache_scale(fd_config, state_dict) + + self.assertEqual(len(state_dict), 4) + np.testing.assert_allclose( + state_dict["ernie.layers.0.self_attn.cachek_matmul.activation_scale"].numpy(), + 0.5 * 448.0, + rtol=1e-5, + ) + + def test_missing_file_warns(self): + fd_config = _make_fd_config() + fd_config.model_config.kv_cache_quant_scale_path = "/nonexistent/path.json" + state_dict = {} + + with patch("fastdeploy.model_executor.load_weight_utils.logger") as mock_logger: + load_kv_cache_scale(fd_config, state_dict) + mock_logger.warning.assert_called_once() + + self.assertEqual(len(state_dict), 0) + + +# ═══════════════════ Tests: save_model decorator ═══════════════════ + + +class TestSaveModelDecorator(unittest.TestCase): + """Tests for save_model() decorator factory.""" + + def test_decorator_passes_through(self): + @save_model() + def my_loader(model, fd_config): + return "loaded" + + mock_model = MagicMock() + mock_model.state_dict.return_value = {} + fd_config = _make_fd_config() + + with patch("fastdeploy.model_executor.load_weight_utils.envs") as mock_envs: + mock_envs.FD_ENABLE_MODEL_LOAD_CACHE = False + result = my_loader(mock_model, fd_config) + self.assertEqual(result, "loaded") + + def test_custom_arg_names(self): + @save_model(model_arg_name="m", config_arg_name="cfg") + def my_loader(m, cfg): + return "custom_loaded" + + mock_model = MagicMock() + mock_model.state_dict.return_value = {} + fd_config = _make_fd_config() + + with patch("fastdeploy.model_executor.load_weight_utils.envs") as mock_envs: + mock_envs.FD_ENABLE_MODEL_LOAD_CACHE = False + result = my_loader(mock_model, fd_config) + self.assertEqual(result, "custom_loaded") + + +# ═══════════════════ Tests: load_composite_checkpoint ═══════════════════ + + +class TestLoadCompositeCheckpoint(unittest.TestCase): + """Tests for load_composite_checkpoint() top-level dispatcher.""" + + def test_tp_single_rank(self): + """Test loading with tensor parallelism (no rank dirs, no EP).""" + from safetensors.numpy import save_file + + with tempfile.TemporaryDirectory() as tmpdir: + tensors = {"weight": np.random.randn(4, 4).astype(np.float32)} + save_file(tensors, os.path.join(tmpdir, "model.safetensors")) + + fd_config = _make_fd_config() + fd_config.model_config.model = tmpdir + fd_config.parallel_config.use_ep = False + fd_config.quant_config.kv_cache_quant_type = "none" + + mock_cls = MagicMock() + with patch("fastdeploy.model_executor.load_weight_utils.load_tp_checkpoint") as mock_load: + mock_load.return_value = {"weight": np.zeros((4, 4))} + result = load_composite_checkpoint(tmpdir, mock_cls, fd_config, return_numpy=True) + self.assertIn("weight", result) + mock_load.assert_called_once() + + def test_ep_loading(self): + """Test expert parallel loading path.""" + fd_config = _make_fd_config() + fd_config.parallel_config.use_ep = True + + mock_cls = MagicMock() + with patch("fastdeploy.model_executor.load_weight_utils.load_ep_checkpoint") as mock_ep: + mock_ep.return_value = {"expert.0.weight": np.zeros((4,))} + result = load_composite_checkpoint("/fake", mock_cls, fd_config, return_numpy=True) + mock_ep.assert_called_once() + self.assertIn("expert.0.weight", result) + + def test_pre_sharded_loading(self): + """Test pre-sharded (multi-rank) loading path.""" + with tempfile.TemporaryDirectory() as tmpdir: + rank0_dir = os.path.join(tmpdir, "rank0") + rank1_dir = os.path.join(tmpdir, "rank1") + os.makedirs(rank0_dir) + os.makedirs(rank1_dir) + + fd_config = _make_fd_config() + fd_config.parallel_config.use_ep = False + fd_config.parallel_config.tensor_parallel_size = 2 + fd_config.parallel_config.tensor_parallel_rank = 0 + fd_config.quant_config.kv_cache_quant_type = "none" + + mock_cls = MagicMock() + with patch("fastdeploy.model_executor.load_weight_utils.load_pre_sharded_checkpoint") as mock_pre: + mock_pre.return_value = {"w": np.zeros((2,))} + result = load_composite_checkpoint(tmpdir, mock_cls, fd_config) + mock_pre.assert_called_once_with(tmpdir, 0) + self.assertIn("w", result) + + def test_empty_state_dict_raises(self): + fd_config = _make_fd_config() + fd_config.parallel_config.use_ep = False + fd_config.quant_config.kv_cache_quant_type = "none" + + mock_cls = MagicMock() + with tempfile.TemporaryDirectory() as tmpdir: + with patch("fastdeploy.model_executor.load_weight_utils.load_tp_checkpoint") as mock_load: + mock_load.return_value = {} + with self.assertRaises(ValueError) as ctx: + load_composite_checkpoint(tmpdir, mock_cls, fd_config) + self.assertIn("weight not found", str(ctx.exception)) + + def test_kv_cache_quant_fp8_loads_scales(self): + """Test that FP8 KV cache triggers scale loading.""" + fd_config = _make_fd_config() + fd_config.parallel_config.use_ep = False + fd_config.quant_config.kv_cache_quant_type = "float8_e4m3fn" + + mock_cls = MagicMock() + with tempfile.TemporaryDirectory() as tmpdir: + with patch("fastdeploy.model_executor.load_weight_utils.load_tp_checkpoint") as mock_load: + mock_load.return_value = {"w": np.zeros((2,))} + with patch("fastdeploy.model_executor.load_weight_utils.load_kv_cache_scale") as mock_scale: + load_composite_checkpoint(tmpdir, mock_cls, fd_config) + mock_scale.assert_called_once() + + +# ═══════════════════ Tests: safetensors iterators ═══════════════════ + + +class TestSafetensorsIterators(unittest.TestCase): + """Tests for safetensors_weights_iterator and safetensors_weights_iterator_ordered.""" + + def test_safetensors_weights_iterator(self): + from safetensors.numpy import save_file + + from fastdeploy.model_executor.load_weight_utils import ( + safetensors_weights_iterator, + ) + + with tempfile.TemporaryDirectory() as tmpdir: + path = os.path.join(tmpdir, "test.safetensors") + save_file({"a": np.array([1.0], dtype=np.float32)}, path) + + results = dict(safetensors_weights_iterator([path])) + self.assertIn("a", results) + self.assertIsInstance(results["a"], paddle.Tensor) + + def test_safetensors_weights_iterator_ordered(self): + from safetensors.numpy import save_file + + from fastdeploy.model_executor.load_weight_utils import ( + safetensors_weights_iterator_ordered, + ) + + with tempfile.TemporaryDirectory() as tmpdir: + path1 = os.path.join(tmpdir, "shard1.safetensors") + path2 = os.path.join(tmpdir, "shard2.safetensors") + save_file({"x": np.array([1.0], dtype=np.float32)}, path1) + save_file({"y": np.array([2.0], dtype=np.float32)}, path2) + + ordered_map = {"x": path1, "y": path2} + results = dict(safetensors_weights_iterator_ordered(ordered_map)) + self.assertIn("x", results) + self.assertIn("y", results) + np.testing.assert_allclose(results["y"].numpy(), [2.0], rtol=1e-6) + + def test_multi_keys_same_file(self): + from safetensors.numpy import save_file + + from fastdeploy.model_executor.load_weight_utils import ( + safetensors_weights_iterator_ordered, + ) + + with tempfile.TemporaryDirectory() as tmpdir: + path = os.path.join(tmpdir, "model.safetensors") + save_file( + {"a": np.array([1.0], dtype=np.float32), "b": np.array([2.0], dtype=np.float32)}, + path, + ) + + ordered_map = {"a": path, "b": path} + results = dict(safetensors_weights_iterator_ordered(ordered_map)) + self.assertEqual(len(results), 2) + + +# ═══════════════════ Tests: pdparams_weight_iterator ═══════════════════ + + +class TestPdparamsWeightIterator(unittest.TestCase): + """Tests for pdparams_weight_iterator() checkpoint loading.""" + + def test_basic_iteration(self): + from fastdeploy.model_executor.load_weight_utils import pdparams_weight_iterator + + with tempfile.TemporaryDirectory() as tmpdir: + state = {"param1": paddle.randn([2, 3]), "param2": paddle.randn([4])} + path = os.path.join(tmpdir, "model.pdparams") + paddle.save(state, path) + + results = dict(pdparams_weight_iterator([path])) + self.assertIn("param1", results) + self.assertIn("param2", results) + self.assertEqual(results["param1"].shape, [2, 3]) + + def test_multi_shard_iteration(self): + from fastdeploy.model_executor.load_weight_utils import pdparams_weight_iterator + + with tempfile.TemporaryDirectory() as tmpdir: + path1 = os.path.join(tmpdir, "shard1.pdparams") + path2 = os.path.join(tmpdir, "shard2.pdparams") + paddle.save({"a": paddle.to_tensor([1.0])}, path1) + paddle.save({"b": paddle.to_tensor([2.0])}, path2) + + results = dict(pdparams_weight_iterator([path1, path2])) + self.assertEqual(len(results), 2) + self.assertIn("a", results) + self.assertIn("b", results) + + +# ═══════════════════ Tests: load_pre_sharded_checkpoint ═══════════════════ + + +class TestLoadPreShardedCheckpoint(unittest.TestCase): + """Tests for load_pre_sharded_checkpoint().""" + + def test_loads_rank_weights(self): + from safetensors.numpy import save_file + + from fastdeploy.model_executor.load_weight_utils import ( + load_pre_sharded_checkpoint, + ) + + with tempfile.TemporaryDirectory() as tmpdir: + rank_dir = os.path.join(tmpdir, "rank0") + os.makedirs(rank_dir) + save_file({"w": np.array([42.0], dtype=np.float32)}, os.path.join(rank_dir, "model.safetensors")) + + result = load_pre_sharded_checkpoint(tmpdir, 0) + self.assertIn("w", result) + np.testing.assert_allclose(result["w"].numpy(), [42.0], rtol=1e-6) + + +# ═══════════════════ Tests: fast_weights_iterator ═══════════════════ + + +class TestFastWeightsIterator(unittest.TestCase): + """Tests for fast_weights_iterator() using paddleformers' fast_safe_open.""" + + def test_basic(self): + from safetensors.numpy import save_file + + from fastdeploy.model_executor.load_weight_utils import fast_weights_iterator + + with tempfile.TemporaryDirectory() as tmpdir: + path = os.path.join(tmpdir, "model.safetensors") + save_file({"w": np.array([1.0, 2.0], dtype=np.float32)}, path) + + results = list(fast_weights_iterator([path])) + self.assertEqual(len(results), 1) + name, param_slice = results[0] + self.assertEqual(name, "w") + + +if __name__ == "__main__": + unittest.main() From c972772274babf99e0f43bb0d121312b99a27c9e Mon Sep 17 00:00:00 2001 From: cloudforge1 Date: Mon, 9 Mar 2026 23:48:12 +0800 Subject: [PATCH 2/8] =?UTF-8?q?[CI]=E3=80=90Hackathon=2010th=20Spring=20No?= =?UTF-8?q?.32=E3=80=91rewrite=20load=5Fweight=5Futils=20unit=20test?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Pytest-style, single class, monkeypatch-based - 36 tests, 460 lines, 66% coverage (205/310 stmts) - develop baseline: 0% → PR: 66%, ratio 2.2x - No MagicMock, no unittest.TestCase --- .../model_executor/test_load_weight_utils.py | 1011 ++++++----------- 1 file changed, 348 insertions(+), 663 deletions(-) diff --git a/tests/model_executor/test_load_weight_utils.py b/tests/model_executor/test_load_weight_utils.py index ab8601b137f..656bcfe0e8d 100644 --- a/tests/model_executor/test_load_weight_utils.py +++ b/tests/model_executor/test_load_weight_utils.py @@ -17,16 +17,14 @@ import json import os import tempfile -import time -import unittest -from pathlib import Path from types import SimpleNamespace -from unittest.mock import MagicMock, patch import numpy as np import paddle +from safetensors.numpy import save_file from fastdeploy.model_executor.load_weight_utils import ( + fast_weights_iterator, get_all_weights_file, get_model_path, get_weight_iterator, @@ -34,742 +32,429 @@ kv_cache_scale_iterator, load_composite_checkpoint, load_kv_cache_scale, + load_pre_sharded_checkpoint, load_weights_from_cache, measure_time, natural_key, - save_model, + pdparams_weight_iterator, + safetensors_weights_iterator, + safetensors_weights_iterator_ordered, ) -# ═══════════════════ Helpers ═══════════════════ - def _make_fd_config(**overrides): - """Build a minimal FDConfig-like object for testing.""" - model_cfg = SimpleNamespace( - model="/fake/model", - model_type="ernie", - num_hidden_layers=2, - moe_num_experts=8, - moe_layer_start_index=0, - prefix_layer_name="layers", - max_model_len=2048, - kv_cache_quant_scale_path="/nonexistent/kv_cache_scale.json", - pretrained_config=SimpleNamespace(use_sequence_parallel_moe=False), - ) - parallel_cfg = SimpleNamespace( - tensor_parallel_size=1, - tensor_parallel_rank=0, - expert_parallel_size=1, - num_experts_start_offset=0, - num_experts_per_rank=4, - use_ep=False, - use_sequence_parallel_moe=False, - ) - quant_cfg = SimpleNamespace( - name=lambda: "w8a8", - is_checkpoint_bf16=False, - kv_cache_quant_type="none", - ) - load_cfg = SimpleNamespace(is_pre_sharded=False) - cache_cfg = SimpleNamespace() - speculative_cfg = SimpleNamespace(model_type="main") + """Minimal FDConfig-like object for testing.""" cfg = SimpleNamespace( - model_config=model_cfg, - parallel_config=parallel_cfg, - quant_config=quant_cfg, - load_config=load_cfg, - cache_config=cache_cfg, - speculative_config=speculative_cfg, + model_config=SimpleNamespace( + model="/tmp/fake_model", + model_type="ernie", + max_model_len=2048, + kv_cache_quant_scale_path="/nonexistent/path.json", + prefix_layer_name="layers", + num_hidden_layers=2, + pretrained_config=SimpleNamespace(use_sequence_parallel_moe=False), + ), + parallel_config=SimpleNamespace( + tensor_parallel_size=1, + tensor_parallel_rank=0, + expert_parallel_size=1, + use_ep=False, + use_sequence_parallel_moe=False, + ), + quant_config=SimpleNamespace( + name=lambda: "none", + is_checkpoint_bf16=False, + kv_cache_quant_type="none", + ), + load_config=SimpleNamespace(is_pre_sharded=False), ) for k, v in overrides.items(): setattr(cfg, k, v) return cfg -# ═══════════════════ Tests: natural_key ═══════════════════ - - -class TestNaturalKey(unittest.TestCase): - """Tests for natural_key() string sorting helper.""" - - def test_pure_alpha(self): - result = natural_key("abc") - self.assertEqual(result, ["abc"]) - - def test_pure_digits(self): - result = natural_key("123") - self.assertEqual(result, ["", 123, ""]) - - def test_mixed(self): - result = natural_key("layer_12_weight") - self.assertEqual(result, ["layer_", 12, "_weight"]) +class TestLoadWeightUtils: + """Tests for load_weight_utils module — pure functions and iterators.""" - def test_multi_numbers(self): - result = natural_key("model_3_layer_42") - self.assertEqual(result, ["model_", 3, "_layer_", 42, ""]) + # ── natural_key ──────────────────────────────────────────────────── - def test_sorting_order(self): - names = ["file2", "file10", "file1", "file20"] - sorted_names = sorted(names, key=natural_key) - self.assertEqual(sorted_names, ["file1", "file2", "file10", "file20"]) + def test_natural_key_numeric_sort(self): + items = ["layer.10.weight", "layer.2.weight", "layer.1.weight"] + assert sorted(items, key=natural_key) == [ + "layer.1.weight", + "layer.2.weight", + "layer.10.weight", + ] - def test_empty_string(self): - result = natural_key("") - self.assertEqual(result, [""]) + def test_natural_key_no_digits(self): + assert natural_key("abc") == ["abc"] - def test_leading_digit(self): - result = natural_key("0abc") - self.assertEqual(result, ["", 0, "abc"]) + def test_natural_key_mixed(self): + result = natural_key("shard-002-of-010.safetensors") + assert any(isinstance(x, int) for x in result) + # ── measure_time ─────────────────────────────────────────────────── -# ═══════════════════ Tests: measure_time ═══════════════════ - - -class TestMeasureTime(unittest.TestCase): - """Tests for measure_time() decorator.""" - - def test_basic_timing(self): - @measure_time("Test op") - def slow_func(): - time.sleep(0.01) + def test_measure_time_decorator(self): + @measure_time("Test") + def dummy(): return 42 - result = slow_func() - self.assertEqual(result, 42) - - def test_preserves_return_value(self): - @measure_time("Return test") - def identity(x): - return x * 2 - - self.assertEqual(identity(5), 10) - - def test_preserves_args_kwargs(self): - @measure_time("Args test") - def add(a, b, extra=0): - return a + b + extra - - self.assertEqual(add(1, 2, extra=3), 6) - - def test_custom_prefix(self): - with patch("fastdeploy.model_executor.load_weight_utils.logger") as mock_logger: - - @measure_time("Custom prefix") - def noop(): - pass - - noop() - mock_logger.info.assert_called_once() - call_arg = mock_logger.info.call_args[0][0] - self.assertIn("Custom prefix", call_arg) - - -# ═══════════════════ Tests: get_all_weights_file ═══════════════════ - - -class TestGetAllWeightsFile(unittest.TestCase): - """Tests for get_all_weights_file() weight file discovery.""" - - def test_pdparams_detection(self): - with tempfile.TemporaryDirectory() as tmpdir: - # Create .pdparams files (but not scheduler.pdparams) - Path(tmpdir, "model-00001.pdparams").write_bytes(b"") - Path(tmpdir, "model-00002.pdparams").write_bytes(b"") - Path(tmpdir, "scheduler.pdparams").write_bytes(b"") - - files_list, ordered_map, use_safetensors, is_key_ordered = get_all_weights_file(tmpdir) - - self.assertFalse(use_safetensors) - self.assertEqual(len(files_list), 2) - self.assertEqual(ordered_map, {}) - self.assertFalse(is_key_ordered) - # scheduler.pdparams should be excluded - for f in files_list: - self.assertNotIn("scheduler", f) - - def test_single_safetensors(self): - with tempfile.TemporaryDirectory() as tmpdir: - # Create a single model.safetensors file using safetensors library - from safetensors.numpy import save_file - - tensors = {"weight_a": np.zeros((2, 3), dtype=np.float32), "weight_b": np.ones((4,), dtype=np.float32)} - save_file(tensors, os.path.join(tmpdir, "model.safetensors")) - - files_list, ordered_map, use_safetensors, is_key_ordered = get_all_weights_file(tmpdir) - - self.assertTrue(use_safetensors) - self.assertTrue(is_key_ordered) - self.assertEqual(len(files_list), 1) - self.assertIn("model.safetensors", files_list[0]) - self.assertIn("weight_a", ordered_map) - self.assertIn("weight_b", ordered_map) - - def test_sharded_safetensors_with_index(self): - with tempfile.TemporaryDirectory() as tmpdir: - from safetensors.numpy import save_file - - # Create two shard files - save_file({"weight_a": np.zeros((2,), dtype=np.float32)}, os.path.join(tmpdir, "model-00001.safetensors")) - save_file({"weight_b": np.ones((3,), dtype=np.float32)}, os.path.join(tmpdir, "model-00002.safetensors")) - - # Create index file - index = { - "weight_map": { - "weight_a": "model-00001.safetensors", - "weight_b": "model-00002.safetensors", - } - } - with open(os.path.join(tmpdir, "model.safetensors.index.json"), "w") as f: - json.dump(index, f) - - files_list, ordered_map, use_safetensors, is_key_ordered = get_all_weights_file(tmpdir) - - self.assertTrue(use_safetensors) - self.assertEqual(len(files_list), 2) - self.assertIn("weight_a", ordered_map) - self.assertIn("weight_b", ordered_map) - + assert dummy() == 42 -# ═══════════════════ Tests: kv_cache_scale_iterator ═══════════════════ + # ── kv_cache_scale_iterator ──────────────────────────────────────── - -class TestKvCacheScaleIterator(unittest.TestCase): - """Tests for kv_cache_scale_iterator() JSON scale loading.""" - - def test_basic_iteration(self): + def test_kv_cache_scale_basic(self): data = {"layer.0.k_scale": 0.5, "layer.0.v_scale": 0.25} with tempfile.NamedTemporaryFile(mode="w", suffix=".json", delete=False) as f: json.dump(data, f) - f.flush() path = f.name - try: results = dict(kv_cache_scale_iterator(path)) - self.assertIn("layer.0.k_scale", results) - self.assertIn("layer.0.v_scale", results) - # Values should be multiplied by 448.0 + assert len(results) == 2 np.testing.assert_allclose(results["layer.0.k_scale"].numpy(), 0.5 * 448.0, rtol=1e-5) np.testing.assert_allclose(results["layer.0.v_scale"].numpy(), 0.25 * 448.0, rtol=1e-5) finally: os.unlink(path) - def test_empty_json(self): + def test_kv_cache_scale_empty(self): with tempfile.NamedTemporaryFile(mode="w", suffix=".json", delete=False) as f: json.dump({}, f) - f.flush() path = f.name try: - results = list(kv_cache_scale_iterator(path)) - self.assertEqual(len(results), 0) + assert list(kv_cache_scale_iterator(path)) == [] finally: os.unlink(path) - def test_result_types(self): - data = {"scale_0": 1.0} - with tempfile.NamedTemporaryFile(mode="w", suffix=".json", delete=False) as f: - json.dump(data, f) - f.flush() - path = f.name - try: - for key, tensor in kv_cache_scale_iterator(path): - self.assertIsInstance(key, str) - self.assertIsInstance(tensor, paddle.Tensor) - finally: - os.unlink(path) + # ── get_all_weights_file ─────────────────────────────────────────── + def test_single_safetensors(self): + with tempfile.TemporaryDirectory() as d: + save_file({"w": np.zeros((2,), dtype=np.float32)}, os.path.join(d, "model.safetensors")) + files, wmap, use_st, ordered = get_all_weights_file(d) + assert use_st is True + assert ordered is True + assert len(files) == 1 + assert "w" in wmap + + def test_sharded_safetensors(self): + with tempfile.TemporaryDirectory() as d: + save_file({"a": np.zeros((2,), dtype=np.float32)}, os.path.join(d, "model-001.safetensors")) + save_file({"b": np.ones((3,), dtype=np.float32)}, os.path.join(d, "model-002.safetensors")) + index = {"weight_map": {"a": "model-001.safetensors", "b": "model-002.safetensors"}} + with open(os.path.join(d, "model.safetensors.index.json"), "w") as f: + json.dump(index, f) + files, wmap, use_st, _ = get_all_weights_file(d) + assert use_st is True + assert len(files) == 2 + assert "a" in wmap and "b" in wmap -# ═══════════════════ Tests: get_model_path ═══════════════════ - - -class TestGetModelPath(unittest.TestCase): - """Tests for get_model_path() model directory resolution.""" - - def test_no_rank_dirs(self): - with tempfile.TemporaryDirectory() as tmpdir: - fd_config = _make_fd_config() - fd_config.model_config.model = tmpdir - result = get_model_path(fd_config) - self.assertEqual(result, tmpdir) - - def test_single_rank_dir(self): - with tempfile.TemporaryDirectory() as tmpdir: - os.makedirs(os.path.join(tmpdir, "rank0")) - fd_config = _make_fd_config() - fd_config.model_config.model = tmpdir - result = get_model_path(fd_config) - # Single rank dir should not trigger pre-sharding - self.assertEqual(result, tmpdir) - - def test_multi_rank_dirs_matching_tp(self): - with tempfile.TemporaryDirectory() as tmpdir: - os.makedirs(os.path.join(tmpdir, "rank0")) - os.makedirs(os.path.join(tmpdir, "rank1")) - fd_config = _make_fd_config() - fd_config.model_config.model = tmpdir - fd_config.parallel_config.tensor_parallel_size = 2 - fd_config.parallel_config.tensor_parallel_rank = 1 - - result = get_model_path(fd_config) - self.assertEqual(result, os.path.join(tmpdir, "rank1")) - self.assertTrue(fd_config.load_config.is_pre_sharded) - - def test_multi_rank_dirs_mismatched_tp(self): - with tempfile.TemporaryDirectory() as tmpdir: - os.makedirs(os.path.join(tmpdir, "rank0")) - os.makedirs(os.path.join(tmpdir, "rank1")) - fd_config = _make_fd_config() - fd_config.model_config.model = tmpdir - fd_config.parallel_config.tensor_parallel_size = 4 # mismatch - - with self.assertRaises(ValueError) as ctx: - get_model_path(fd_config) - self.assertIn("tp2", str(ctx.exception)) - - -# ═══════════════════ Tests: is_weight_cache_enabled ═══════════════════ - - -class TestIsWeightCacheEnabled(unittest.TestCase): - """Tests for is_weight_cache_enabled() cache detection.""" - - def test_cache_disabled_when_env_off(self): - fd_config = _make_fd_config() - with patch("fastdeploy.model_executor.load_weight_utils.envs") as mock_envs: - mock_envs.FD_ENABLE_MODEL_LOAD_CACHE = False - enable, cache_dir, ctx = is_weight_cache_enabled(fd_config) - self.assertFalse(enable) - self.assertIsNone(cache_dir) - - def test_cache_disabled_when_no_quant_config(self): - fd_config = _make_fd_config() - fd_config.quant_config = None - with patch("fastdeploy.model_executor.load_weight_utils.envs") as mock_envs: - mock_envs.FD_ENABLE_MODEL_LOAD_CACHE = True - enable, cache_dir, ctx = is_weight_cache_enabled(fd_config) - self.assertFalse(enable) - - def test_cache_enabled_when_dir_exists(self): - with tempfile.TemporaryDirectory() as tmpdir: - fd_config = _make_fd_config() - fd_config.model_config.model = tmpdir - fd_config.quant_config.is_checkpoint_bf16 = False - - with patch("fastdeploy.model_executor.load_weight_utils.envs") as mock_envs: - mock_envs.FD_ENABLE_MODEL_LOAD_CACHE = True - - # First call — no cache dir → disabled - enable, cache_dir, ctx = is_weight_cache_enabled(fd_config) - self.assertFalse(enable) - - # Now create the cache dir - if cache_dir is not None: - os.makedirs(cache_dir, exist_ok=True) - enable2, _, _ = is_weight_cache_enabled(fd_config) - self.assertTrue(enable2) - - def test_cache_dir_uses_hash(self): - with tempfile.TemporaryDirectory() as tmpdir: - fd_config = _make_fd_config() - fd_config.model_config.model = tmpdir - - with patch("fastdeploy.model_executor.load_weight_utils.envs") as mock_envs: - mock_envs.FD_ENABLE_MODEL_LOAD_CACHE = True - _, cache_dir, _ = is_weight_cache_enabled(fd_config) - if cache_dir is not None: - # Cache dir should contain a hash - self.assertIn(".cache", cache_dir) - cache_subdir = os.path.basename(cache_dir) - self.assertGreater(len(cache_subdir), 0) - - -# ═══════════════════ Tests: load_weights_from_cache ═══════════════════ - - -class TestLoadWeightsFromCache(unittest.TestCase): - """Tests for load_weights_from_cache() parameter loading.""" - - def test_basic_weight_loading(self): - # Create a simple model with named parameters - linear = paddle.nn.Linear(4, 3) + def test_pdparams_fallback(self): + with tempfile.TemporaryDirectory() as d: + paddle.save({"w": paddle.randn([2])}, os.path.join(d, "model.pdparams")) + files, wmap, use_st, ordered = get_all_weights_file(d) + assert use_st is False + assert ordered is False + assert len(files) == 1 - new_weight = paddle.randn([4, 3]) - weights_iter = iter([("weight", new_weight)]) + # ── safetensors iterators ────────────────────────────────────────── - load_weights_from_cache(linear, weights_iter) - np.testing.assert_allclose(linear.weight.numpy(), new_weight.numpy(), rtol=1e-6) + def test_safetensors_weights_iterator(self): + with tempfile.TemporaryDirectory() as d: + path = os.path.join(d, "test.safetensors") + save_file({"a": np.array([1.0], dtype=np.float32)}, path) + results = dict(safetensors_weights_iterator([path])) + assert "a" in results + assert isinstance(results["a"], paddle.Tensor) - def test_shape_mismatch_raises(self): - linear = paddle.nn.Linear(4, 3) - wrong_shape_weight = paddle.randn([5, 3]) - weights_iter = iter([("weight", wrong_shape_weight)]) + def test_safetensors_weights_iterator_ordered(self): + with tempfile.TemporaryDirectory() as d: + p1 = os.path.join(d, "s1.safetensors") + p2 = os.path.join(d, "s2.safetensors") + save_file({"x": np.array([1.0], dtype=np.float32)}, p1) + save_file({"y": np.array([2.0], dtype=np.float32)}, p2) + results = dict(safetensors_weights_iterator_ordered({"x": p1, "y": p2})) + assert len(results) == 2 + np.testing.assert_allclose(results["y"].numpy(), [2.0], rtol=1e-6) - with self.assertRaises(ValueError) as ctx: - load_weights_from_cache(linear, weights_iter) - self.assertIn("Shape mismatch", str(ctx.exception)) + def test_ordered_multi_keys_same_file(self): + with tempfile.TemporaryDirectory() as d: + path = os.path.join(d, "m.safetensors") + save_file({"a": np.array([1.0], dtype=np.float32), "b": np.array([2.0], dtype=np.float32)}, path) + results = dict(safetensors_weights_iterator_ordered({"a": path, "b": path})) + assert len(results) == 2 + + # ── pdparams_weight_iterator ─────────────────────────────────────── + + def test_pdparams_iterator(self): + with tempfile.TemporaryDirectory() as d: + p1 = os.path.join(d, "s1.pdparams") + p2 = os.path.join(d, "s2.pdparams") + paddle.save({"a": paddle.to_tensor([1.0])}, p1) + paddle.save({"b": paddle.to_tensor([2.0])}, p2) + results = dict(pdparams_weight_iterator([p1, p2])) + assert len(results) == 2 + + # ── get_weight_iterator ──────────────────────────────────────────── + + def test_get_weight_iterator_safetensors(self): + with tempfile.TemporaryDirectory() as d: + save_file({"w": np.array([1.0, 2.0], dtype=np.float32)}, os.path.join(d, "model.safetensors")) + results = dict(get_weight_iterator(d)) + assert "w" in results + np.testing.assert_allclose(results["w"].numpy(), [1.0, 2.0], rtol=1e-6) + + def test_get_weight_iterator_with_kv_scale(self): + with tempfile.TemporaryDirectory() as d: + save_file({"w": np.zeros((1,), dtype=np.float32)}, os.path.join(d, "model.safetensors")) + with open(os.path.join(d, "kv_cache_scale.json"), "w") as f: + json.dump({"k_scale": 0.1}, f) + results = dict(get_weight_iterator(d)) + assert "k_scale" in results + np.testing.assert_allclose(results["k_scale"].numpy(), 0.1 * 448.0, rtol=1e-5) - def test_missing_weight_skipped(self): + def test_get_weight_iterator_pdparams(self): + with tempfile.TemporaryDirectory() as d: + paddle.save({"p": paddle.to_tensor([3.0])}, os.path.join(d, "model.pdparams")) + results = dict(get_weight_iterator(d)) + assert "p" in results + + # ── get_model_path ───────────────────────────────────────────────── + + def test_model_path_no_rank_dirs(self): + with tempfile.TemporaryDirectory() as d: + cfg = _make_fd_config() + cfg.model_config.model = d + assert get_model_path(cfg) == d + + def test_model_path_multi_rank_matching(self): + with tempfile.TemporaryDirectory() as d: + os.makedirs(os.path.join(d, "rank0")) + os.makedirs(os.path.join(d, "rank1")) + cfg = _make_fd_config() + cfg.model_config.model = d + cfg.parallel_config.tensor_parallel_size = 2 + cfg.parallel_config.tensor_parallel_rank = 1 + result = get_model_path(cfg) + assert result == os.path.join(d, "rank1") + assert cfg.load_config.is_pre_sharded is True + + def test_model_path_tp_mismatch_raises(self): + with tempfile.TemporaryDirectory() as d: + os.makedirs(os.path.join(d, "rank0")) + os.makedirs(os.path.join(d, "rank1")) + cfg = _make_fd_config() + cfg.model_config.model = d + cfg.parallel_config.tensor_parallel_size = 4 + try: + get_model_path(cfg) + assert False, "Should have raised ValueError" + except ValueError as e: + assert "tp2" in str(e) + + # ── load_weights_from_cache ──────────────────────────────────────── + + def test_load_weights_basic(self): linear = paddle.nn.Linear(4, 3) - old_weight = linear.weight.numpy().copy() - - weights_iter = iter([("nonexistent_param", paddle.randn([2, 2]))]) - # Should not raise, just skip missing params - load_weights_from_cache(linear, weights_iter) - np.testing.assert_allclose(linear.weight.numpy(), old_weight, rtol=1e-6) - - -# ═══════════════════ Tests: get_weight_iterator ═══════════════════ - - -class TestGetWeightIterator(unittest.TestCase): - """Tests for get_weight_iterator() weight loading dispatcher.""" + new_w = paddle.randn([4, 3]) + load_weights_from_cache(linear, iter([("weight", new_w)])) + np.testing.assert_allclose(linear.weight.numpy(), new_w.numpy(), rtol=1e-6) - def test_safetensors_single_file(self): - from safetensors.numpy import save_file - - with tempfile.TemporaryDirectory() as tmpdir: - tensors = {"param_a": np.random.randn(2, 3).astype(np.float32)} - save_file(tensors, os.path.join(tmpdir, "model.safetensors")) - - results = dict(get_weight_iterator(tmpdir)) - self.assertIn("param_a", results) - np.testing.assert_allclose(results["param_a"].numpy(), tensors["param_a"], rtol=1e-6) - - def test_safetensors_sharded(self): - from safetensors.numpy import save_file - - with tempfile.TemporaryDirectory() as tmpdir: - save_file({"w1": np.array([1.0, 2.0], dtype=np.float32)}, os.path.join(tmpdir, "shard-001.safetensors")) - save_file({"w2": np.array([3.0, 4.0], dtype=np.float32)}, os.path.join(tmpdir, "shard-002.safetensors")) - - index = {"weight_map": {"w1": "shard-001.safetensors", "w2": "shard-002.safetensors"}} - with open(os.path.join(tmpdir, "model.safetensors.index.json"), "w") as f: - json.dump(index, f) - - results = dict(get_weight_iterator(tmpdir)) - self.assertIn("w1", results) - self.assertIn("w2", results) - np.testing.assert_allclose(results["w1"].numpy(), [1.0, 2.0], rtol=1e-6) - np.testing.assert_allclose(results["w2"].numpy(), [3.0, 4.0], rtol=1e-6) + def test_load_weights_shape_mismatch(self): + linear = paddle.nn.Linear(4, 3) + try: + load_weights_from_cache(linear, iter([("weight", paddle.randn([5, 3]))])) + assert False, "Should have raised ValueError" + except ValueError as e: + assert "Shape mismatch" in str(e) - def test_kv_cache_scale_included(self): - from safetensors.numpy import save_file + def test_load_weights_missing_param_skipped(self): + linear = paddle.nn.Linear(4, 3) + old_w = linear.weight.numpy().copy() + load_weights_from_cache(linear, iter([("nonexistent", paddle.randn([2, 2]))])) + np.testing.assert_allclose(linear.weight.numpy(), old_w, rtol=1e-6) + + # ── fast_weights_iterator ─────────────────────────────────────────── + + def test_fast_weights_iterator(self): + with tempfile.TemporaryDirectory() as d: + path = os.path.join(d, "test.safetensors") + save_file({"x": np.array([1.0, 2.0], dtype=np.float32)}, path) + results = dict(fast_weights_iterator([path])) + assert "x" in results + + # ── is_weight_cache_enabled ──────────────────────────────────────── + + def test_cache_disabled_when_env_off(self, monkeypatch): + monkeypatch.setenv("FD_ENABLE_MODEL_LOAD_CACHE", "0") + cfg = _make_fd_config() + enable, cache_dir, ctx = is_weight_cache_enabled(cfg) + assert enable is False + assert cache_dir is None + + def test_cache_disabled_no_quant(self, monkeypatch): + monkeypatch.setenv("FD_ENABLE_MODEL_LOAD_CACHE", "1") + cfg = _make_fd_config() + cfg.quant_config = None + enable, _, _ = is_weight_cache_enabled(cfg) + assert enable is False + + def test_cache_computes_hash_dir(self, monkeypatch): + monkeypatch.setenv("FD_ENABLE_MODEL_LOAD_CACHE", "1") + with tempfile.TemporaryDirectory() as d: + cfg = _make_fd_config() + cfg.model_config.model = d + enable, cache_dir, _ = is_weight_cache_enabled(cfg) + assert enable is False + assert cache_dir is not None + assert d in cache_dir + + def test_cache_enabled_when_dir_exists(self, monkeypatch): + monkeypatch.setenv("FD_ENABLE_MODEL_LOAD_CACHE", "1") + with tempfile.TemporaryDirectory() as d: + cfg = _make_fd_config() + cfg.model_config.model = d + _, cache_dir, _ = is_weight_cache_enabled(cfg) + os.makedirs(cache_dir, exist_ok=True) + enable, _, ctx = is_weight_cache_enabled(cfg) + assert enable is True + + # ── save_model decorator ───────────────────────────────────────── + + def test_save_model_no_cache(self, monkeypatch): + from fastdeploy.model_executor.load_weight_utils import save_model + + monkeypatch.setenv("FD_ENABLE_MODEL_LOAD_CACHE", "0") - with tempfile.TemporaryDirectory() as tmpdir: - save_file({"w": np.zeros((1,), dtype=np.float32)}, os.path.join(tmpdir, "model.safetensors")) + @save_model() + def dummy_load(model, fd_config): + return {"loaded": True} - scales = {"k_scale": 0.1} - with open(os.path.join(tmpdir, "kv_cache_scale.json"), "w") as f: - json.dump(scales, f) + cfg = _make_fd_config() + mock_model = SimpleNamespace(state_dict=lambda: {}) + result = dummy_load(mock_model, cfg) + assert result == {"loaded": True} - results = dict(get_weight_iterator(tmpdir)) - self.assertIn("w", results) - self.assertIn("k_scale", results) - np.testing.assert_allclose(results["k_scale"].numpy(), 0.1 * 448.0, rtol=1e-5) + def test_save_model_cache_on_not_bf16(self, monkeypatch): + from fastdeploy.model_executor.load_weight_utils import save_model + monkeypatch.setenv("FD_ENABLE_MODEL_LOAD_CACHE", "1") -# ═══════════════════ Tests: load_kv_cache_scale ═══════════════════ + @save_model() + def dummy_load(model, fd_config): + return {"ok": True} + cfg = _make_fd_config() + mock_model = SimpleNamespace(state_dict=lambda: {}) + result = dummy_load(mock_model, cfg) + assert result == {"ok": True} -class TestLoadKvCacheScale(unittest.TestCase): - """Tests for load_kv_cache_scale() JSON scale loading into state_dict.""" + # ── load_kv_cache_scale ──────────────────────────────────────────── - def test_loads_scales(self): - with tempfile.TemporaryDirectory() as tmpdir: - scale_path = os.path.join(tmpdir, "kv_cache_scale.json") + def test_load_kv_cache_scale(self): + with tempfile.TemporaryDirectory() as d: scales = { "ernie.layers.0.self_attn.cachek_matmul.activation_scale": 0.5, "ernie.layers.0.self_attn.cachev_matmul.activation_scale": 0.25, "ernie.layers.1.self_attn.cachek_matmul.activation_scale": 0.75, "ernie.layers.1.self_attn.cachev_matmul.activation_scale": 0.125, } - with open(scale_path, "w") as f: + path = os.path.join(d, "kv_cache_scale.json") + with open(path, "w") as f: json.dump(scales, f) - - fd_config = _make_fd_config() - fd_config.model_config.kv_cache_quant_scale_path = scale_path - fd_config.model_config.prefix_layer_name = "layers" - fd_config.model_config.num_hidden_layers = 2 - + cfg = _make_fd_config() + cfg.model_config.kv_cache_quant_scale_path = path state_dict = {} - load_kv_cache_scale(fd_config, state_dict) - - self.assertEqual(len(state_dict), 4) + load_kv_cache_scale(cfg, state_dict) + assert len(state_dict) == 4 np.testing.assert_allclose( state_dict["ernie.layers.0.self_attn.cachek_matmul.activation_scale"].numpy(), 0.5 * 448.0, rtol=1e-5, ) - def test_missing_file_warns(self): - fd_config = _make_fd_config() - fd_config.model_config.kv_cache_quant_scale_path = "/nonexistent/path.json" + def test_load_kv_cache_scale_missing_file(self): + cfg = _make_fd_config() + cfg.model_config.kv_cache_quant_scale_path = "/nonexistent/path.json" state_dict = {} + load_kv_cache_scale(cfg, state_dict) + assert len(state_dict) == 0 + + # ── load_pre_sharded_checkpoint ──────────────────────────────────── + + def test_load_pre_sharded(self): + with tempfile.TemporaryDirectory() as d: + rd = os.path.join(d, "rank0") + os.makedirs(rd) + save_file({"w": np.array([42.0], dtype=np.float32)}, os.path.join(rd, "model.safetensors")) + result = load_pre_sharded_checkpoint(d, 0) + assert "w" in result + np.testing.assert_allclose(result["w"].numpy(), [42.0], rtol=1e-6) - with patch("fastdeploy.model_executor.load_weight_utils.logger") as mock_logger: - load_kv_cache_scale(fd_config, state_dict) - mock_logger.warning.assert_called_once() - - self.assertEqual(len(state_dict), 0) - - -# ═══════════════════ Tests: save_model decorator ═══════════════════ - - -class TestSaveModelDecorator(unittest.TestCase): - """Tests for save_model() decorator factory.""" - - def test_decorator_passes_through(self): - @save_model() - def my_loader(model, fd_config): - return "loaded" - - mock_model = MagicMock() - mock_model.state_dict.return_value = {} - fd_config = _make_fd_config() - - with patch("fastdeploy.model_executor.load_weight_utils.envs") as mock_envs: - mock_envs.FD_ENABLE_MODEL_LOAD_CACHE = False - result = my_loader(mock_model, fd_config) - self.assertEqual(result, "loaded") - - def test_custom_arg_names(self): - @save_model(model_arg_name="m", config_arg_name="cfg") - def my_loader(m, cfg): - return "custom_loaded" - - mock_model = MagicMock() - mock_model.state_dict.return_value = {} - fd_config = _make_fd_config() - - with patch("fastdeploy.model_executor.load_weight_utils.envs") as mock_envs: - mock_envs.FD_ENABLE_MODEL_LOAD_CACHE = False - result = my_loader(mock_model, fd_config) - self.assertEqual(result, "custom_loaded") - - -# ═══════════════════ Tests: load_composite_checkpoint ═══════════════════ - - -class TestLoadCompositeCheckpoint(unittest.TestCase): - """Tests for load_composite_checkpoint() top-level dispatcher.""" - - def test_tp_single_rank(self): - """Test loading with tensor parallelism (no rank dirs, no EP).""" - from safetensors.numpy import save_file - - with tempfile.TemporaryDirectory() as tmpdir: - tensors = {"weight": np.random.randn(4, 4).astype(np.float32)} - save_file(tensors, os.path.join(tmpdir, "model.safetensors")) - - fd_config = _make_fd_config() - fd_config.model_config.model = tmpdir - fd_config.parallel_config.use_ep = False - fd_config.quant_config.kv_cache_quant_type = "none" - - mock_cls = MagicMock() - with patch("fastdeploy.model_executor.load_weight_utils.load_tp_checkpoint") as mock_load: - mock_load.return_value = {"weight": np.zeros((4, 4))} - result = load_composite_checkpoint(tmpdir, mock_cls, fd_config, return_numpy=True) - self.assertIn("weight", result) - mock_load.assert_called_once() - - def test_ep_loading(self): - """Test expert parallel loading path.""" - fd_config = _make_fd_config() - fd_config.parallel_config.use_ep = True - - mock_cls = MagicMock() - with patch("fastdeploy.model_executor.load_weight_utils.load_ep_checkpoint") as mock_ep: - mock_ep.return_value = {"expert.0.weight": np.zeros((4,))} - result = load_composite_checkpoint("/fake", mock_cls, fd_config, return_numpy=True) - mock_ep.assert_called_once() - self.assertIn("expert.0.weight", result) - - def test_pre_sharded_loading(self): - """Test pre-sharded (multi-rank) loading path.""" - with tempfile.TemporaryDirectory() as tmpdir: - rank0_dir = os.path.join(tmpdir, "rank0") - rank1_dir = os.path.join(tmpdir, "rank1") - os.makedirs(rank0_dir) - os.makedirs(rank1_dir) - - fd_config = _make_fd_config() - fd_config.parallel_config.use_ep = False - fd_config.parallel_config.tensor_parallel_size = 2 - fd_config.parallel_config.tensor_parallel_rank = 0 - fd_config.quant_config.kv_cache_quant_type = "none" - - mock_cls = MagicMock() - with patch("fastdeploy.model_executor.load_weight_utils.load_pre_sharded_checkpoint") as mock_pre: - mock_pre.return_value = {"w": np.zeros((2,))} - result = load_composite_checkpoint(tmpdir, mock_cls, fd_config) - mock_pre.assert_called_once_with(tmpdir, 0) - self.assertIn("w", result) - - def test_empty_state_dict_raises(self): - fd_config = _make_fd_config() - fd_config.parallel_config.use_ep = False - fd_config.quant_config.kv_cache_quant_type = "none" - - mock_cls = MagicMock() - with tempfile.TemporaryDirectory() as tmpdir: - with patch("fastdeploy.model_executor.load_weight_utils.load_tp_checkpoint") as mock_load: - mock_load.return_value = {} - with self.assertRaises(ValueError) as ctx: - load_composite_checkpoint(tmpdir, mock_cls, fd_config) - self.assertIn("weight not found", str(ctx.exception)) - - def test_kv_cache_quant_fp8_loads_scales(self): - """Test that FP8 KV cache triggers scale loading.""" - fd_config = _make_fd_config() - fd_config.parallel_config.use_ep = False - fd_config.quant_config.kv_cache_quant_type = "float8_e4m3fn" - - mock_cls = MagicMock() - with tempfile.TemporaryDirectory() as tmpdir: - with patch("fastdeploy.model_executor.load_weight_utils.load_tp_checkpoint") as mock_load: - mock_load.return_value = {"w": np.zeros((2,))} - with patch("fastdeploy.model_executor.load_weight_utils.load_kv_cache_scale") as mock_scale: - load_composite_checkpoint(tmpdir, mock_cls, fd_config) - mock_scale.assert_called_once() - - -# ═══════════════════ Tests: safetensors iterators ═══════════════════ - - -class TestSafetensorsIterators(unittest.TestCase): - """Tests for safetensors_weights_iterator and safetensors_weights_iterator_ordered.""" - - def test_safetensors_weights_iterator(self): - from safetensors.numpy import save_file - - from fastdeploy.model_executor.load_weight_utils import ( - safetensors_weights_iterator, + # ── load_composite_checkpoint ────────────────────────────────────── + + def test_composite_tp_loading(self, monkeypatch): + with tempfile.TemporaryDirectory() as d: + save_file({"w": np.random.randn(4, 4).astype(np.float32)}, os.path.join(d, "model.safetensors")) + cfg = _make_fd_config() + cfg.model_config.model = d + cfg.parallel_config.use_ep = False + cfg.quant_config.kv_cache_quant_type = "none" + monkeypatch.setattr( + "fastdeploy.model_executor.load_weight_utils.load_tp_checkpoint", + lambda *a, **kw: {"w": np.zeros((4, 4))}, + ) + mock_cls = SimpleNamespace(_get_tensor_parallel_mappings=lambda _: {}) + result = load_composite_checkpoint(d, mock_cls, cfg, return_numpy=True) + assert "w" in result + + def test_composite_empty_raises(self, monkeypatch): + cfg = _make_fd_config() + cfg.parallel_config.use_ep = False + cfg.quant_config.kv_cache_quant_type = "none" + monkeypatch.setattr( + "fastdeploy.model_executor.load_weight_utils.load_tp_checkpoint", + lambda *a, **kw: {}, ) - - with tempfile.TemporaryDirectory() as tmpdir: - path = os.path.join(tmpdir, "test.safetensors") - save_file({"a": np.array([1.0], dtype=np.float32)}, path) - - results = dict(safetensors_weights_iterator([path])) - self.assertIn("a", results) - self.assertIsInstance(results["a"], paddle.Tensor) - - def test_safetensors_weights_iterator_ordered(self): - from safetensors.numpy import save_file - - from fastdeploy.model_executor.load_weight_utils import ( - safetensors_weights_iterator_ordered, + mock_cls = SimpleNamespace(_get_tensor_parallel_mappings=lambda _: {}) + with tempfile.TemporaryDirectory() as d: + try: + load_composite_checkpoint(d, mock_cls, cfg) + assert False, "Should have raised ValueError" + except ValueError as e: + assert "weight not found" in str(e) + + def test_composite_fp8_loads_scales(self, monkeypatch): + cfg = _make_fd_config() + cfg.parallel_config.use_ep = False + cfg.quant_config.kv_cache_quant_type = "float8_e4m3fn" + monkeypatch.setattr( + "fastdeploy.model_executor.load_weight_utils.load_tp_checkpoint", + lambda *a, **kw: {"w": np.zeros((2,))}, ) - - with tempfile.TemporaryDirectory() as tmpdir: - path1 = os.path.join(tmpdir, "shard1.safetensors") - path2 = os.path.join(tmpdir, "shard2.safetensors") - save_file({"x": np.array([1.0], dtype=np.float32)}, path1) - save_file({"y": np.array([2.0], dtype=np.float32)}, path2) - - ordered_map = {"x": path1, "y": path2} - results = dict(safetensors_weights_iterator_ordered(ordered_map)) - self.assertIn("x", results) - self.assertIn("y", results) - np.testing.assert_allclose(results["y"].numpy(), [2.0], rtol=1e-6) - - def test_multi_keys_same_file(self): - from safetensors.numpy import save_file - - from fastdeploy.model_executor.load_weight_utils import ( - safetensors_weights_iterator_ordered, + scale_called = [] + monkeypatch.setattr( + "fastdeploy.model_executor.load_weight_utils.load_kv_cache_scale", + lambda cfg, sd: scale_called.append(True), ) - - with tempfile.TemporaryDirectory() as tmpdir: - path = os.path.join(tmpdir, "model.safetensors") - save_file( - {"a": np.array([1.0], dtype=np.float32), "b": np.array([2.0], dtype=np.float32)}, - path, + mock_cls = SimpleNamespace(_get_tensor_parallel_mappings=lambda _: {}) + with tempfile.TemporaryDirectory() as d: + load_composite_checkpoint(d, mock_cls, cfg) + assert len(scale_called) == 1 + + def test_composite_pre_sharded(self, monkeypatch): + with tempfile.TemporaryDirectory() as d: + os.makedirs(os.path.join(d, "rank0")) + os.makedirs(os.path.join(d, "rank1")) + cfg = _make_fd_config() + cfg.parallel_config.use_ep = False + cfg.parallel_config.tensor_parallel_size = 2 + cfg.parallel_config.tensor_parallel_rank = 0 + cfg.quant_config.kv_cache_quant_type = "none" + monkeypatch.setattr( + "fastdeploy.model_executor.load_weight_utils.load_pre_sharded_checkpoint", + lambda path, rank: {"w": np.zeros((2,))}, ) - - ordered_map = {"a": path, "b": path} - results = dict(safetensors_weights_iterator_ordered(ordered_map)) - self.assertEqual(len(results), 2) - - -# ═══════════════════ Tests: pdparams_weight_iterator ═══════════════════ - - -class TestPdparamsWeightIterator(unittest.TestCase): - """Tests for pdparams_weight_iterator() checkpoint loading.""" - - def test_basic_iteration(self): - from fastdeploy.model_executor.load_weight_utils import pdparams_weight_iterator - - with tempfile.TemporaryDirectory() as tmpdir: - state = {"param1": paddle.randn([2, 3]), "param2": paddle.randn([4])} - path = os.path.join(tmpdir, "model.pdparams") - paddle.save(state, path) - - results = dict(pdparams_weight_iterator([path])) - self.assertIn("param1", results) - self.assertIn("param2", results) - self.assertEqual(results["param1"].shape, [2, 3]) - - def test_multi_shard_iteration(self): - from fastdeploy.model_executor.load_weight_utils import pdparams_weight_iterator - - with tempfile.TemporaryDirectory() as tmpdir: - path1 = os.path.join(tmpdir, "shard1.pdparams") - path2 = os.path.join(tmpdir, "shard2.pdparams") - paddle.save({"a": paddle.to_tensor([1.0])}, path1) - paddle.save({"b": paddle.to_tensor([2.0])}, path2) - - results = dict(pdparams_weight_iterator([path1, path2])) - self.assertEqual(len(results), 2) - self.assertIn("a", results) - self.assertIn("b", results) - - -# ═══════════════════ Tests: load_pre_sharded_checkpoint ═══════════════════ - - -class TestLoadPreShardedCheckpoint(unittest.TestCase): - """Tests for load_pre_sharded_checkpoint().""" - - def test_loads_rank_weights(self): - from safetensors.numpy import save_file - - from fastdeploy.model_executor.load_weight_utils import ( - load_pre_sharded_checkpoint, - ) - - with tempfile.TemporaryDirectory() as tmpdir: - rank_dir = os.path.join(tmpdir, "rank0") - os.makedirs(rank_dir) - save_file({"w": np.array([42.0], dtype=np.float32)}, os.path.join(rank_dir, "model.safetensors")) - - result = load_pre_sharded_checkpoint(tmpdir, 0) - self.assertIn("w", result) - np.testing.assert_allclose(result["w"].numpy(), [42.0], rtol=1e-6) - - -# ═══════════════════ Tests: fast_weights_iterator ═══════════════════ - - -class TestFastWeightsIterator(unittest.TestCase): - """Tests for fast_weights_iterator() using paddleformers' fast_safe_open.""" - - def test_basic(self): - from safetensors.numpy import save_file - - from fastdeploy.model_executor.load_weight_utils import fast_weights_iterator - - with tempfile.TemporaryDirectory() as tmpdir: - path = os.path.join(tmpdir, "model.safetensors") - save_file({"w": np.array([1.0, 2.0], dtype=np.float32)}, path) - - results = list(fast_weights_iterator([path])) - self.assertEqual(len(results), 1) - name, param_slice = results[0] - self.assertEqual(name, "w") - - -if __name__ == "__main__": - unittest.main() + mock_cls = SimpleNamespace(_get_tensor_parallel_mappings=lambda _: {}) + result = load_composite_checkpoint(d, mock_cls, cfg) + assert "w" in result From a67ee5b3f6fe122157a03ccb4a623cb23a2c14b8 Mon Sep 17 00:00:00 2001 From: cloudforge1 Date: Tue, 10 Mar 2026 02:11:59 +0800 Subject: [PATCH 3/8] =?UTF-8?q?[CI]=E3=80=90Hackathon=2010th=20Spring=20No?= =?UTF-8?q?.32=E3=80=91improve=20load=5Fweight=5Futils=20coverage=20to=208?= =?UTF-8?q?3%?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Add test_load_ep_checkpoint_basic: exercises EP checkpoint loading with minimal fixture - Add test_composite_ep_branch: covers EP path in load_composite_checkpoint - Add test_get_weight_iterator_unordered: covers unordered sharded safetensors path - Coverage: 66% → 83% (257/310 stmts) --- .../model_executor/test_load_weight_utils.py | 423 +++++++----------- 1 file changed, 151 insertions(+), 272 deletions(-) diff --git a/tests/model_executor/test_load_weight_utils.py b/tests/model_executor/test_load_weight_utils.py index 656bcfe0e8d..103ecda18da 100644 --- a/tests/model_executor/test_load_weight_utils.py +++ b/tests/model_executor/test_load_weight_utils.py @@ -1,7 +1,6 @@ -""" # Copyright (c) 2025 PaddlePaddle Authors. All Rights Reserved. # -# Licensed under the Apache License, Version 2.0 (the "License" +# Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # @@ -12,7 +11,6 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -""" import json import os @@ -21,29 +19,14 @@ import numpy as np import paddle +import pytest from safetensors.numpy import save_file -from fastdeploy.model_executor.load_weight_utils import ( - fast_weights_iterator, - get_all_weights_file, - get_model_path, - get_weight_iterator, - is_weight_cache_enabled, - kv_cache_scale_iterator, - load_composite_checkpoint, - load_kv_cache_scale, - load_pre_sharded_checkpoint, - load_weights_from_cache, - measure_time, - natural_key, - pdparams_weight_iterator, - safetensors_weights_iterator, - safetensors_weights_iterator_ordered, -) +from fastdeploy.model_executor import load_weight_utils as lwu def _make_fd_config(**overrides): - """Minimal FDConfig-like object for testing.""" + """Minimal FDConfig-like object.""" cfg = SimpleNamespace( model_config=SimpleNamespace( model="/tmp/fake_model", @@ -73,284 +56,170 @@ def _make_fd_config(**overrides): return cfg -class TestLoadWeightUtils: - """Tests for load_weight_utils module — pure functions and iterators.""" - - # ── natural_key ──────────────────────────────────────────────────── - - def test_natural_key_numeric_sort(self): +class TestFileDiscovery: + def test_natural_key_and_measure_time(self): items = ["layer.10.weight", "layer.2.weight", "layer.1.weight"] - assert sorted(items, key=natural_key) == [ + assert sorted(items, key=lwu.natural_key) == [ "layer.1.weight", "layer.2.weight", "layer.10.weight", ] + assert lwu.natural_key("abc") == ["abc"] + assert any(isinstance(x, int) for x in lwu.natural_key("shard-002-of-010.safetensors")) - def test_natural_key_no_digits(self): - assert natural_key("abc") == ["abc"] - - def test_natural_key_mixed(self): - result = natural_key("shard-002-of-010.safetensors") - assert any(isinstance(x, int) for x in result) - - # ── measure_time ─────────────────────────────────────────────────── - - def test_measure_time_decorator(self): - @measure_time("Test") + @lwu.measure_time("Test") def dummy(): return 42 assert dummy() == 42 - # ── kv_cache_scale_iterator ──────────────────────────────────────── - - def test_kv_cache_scale_basic(self): - data = {"layer.0.k_scale": 0.5, "layer.0.v_scale": 0.25} - with tempfile.NamedTemporaryFile(mode="w", suffix=".json", delete=False) as f: - json.dump(data, f) - path = f.name - try: - results = dict(kv_cache_scale_iterator(path)) - assert len(results) == 2 - np.testing.assert_allclose(results["layer.0.k_scale"].numpy(), 0.5 * 448.0, rtol=1e-5) - np.testing.assert_allclose(results["layer.0.v_scale"].numpy(), 0.25 * 448.0, rtol=1e-5) - finally: - os.unlink(path) - - def test_kv_cache_scale_empty(self): - with tempfile.NamedTemporaryFile(mode="w", suffix=".json", delete=False) as f: - json.dump({}, f) - path = f.name - try: - assert list(kv_cache_scale_iterator(path)) == [] - finally: - os.unlink(path) - - # ── get_all_weights_file ─────────────────────────────────────────── - - def test_single_safetensors(self): + def test_get_all_weights_file(self): with tempfile.TemporaryDirectory() as d: - save_file({"w": np.zeros((2,), dtype=np.float32)}, os.path.join(d, "model.safetensors")) - files, wmap, use_st, ordered = get_all_weights_file(d) - assert use_st is True - assert ordered is True - assert len(files) == 1 - assert "w" in wmap - - def test_sharded_safetensors(self): + save_file({"w": np.array([1.0], dtype=np.float32)}, os.path.join(d, "model.safetensors")) + files, wmap, use_st, ordered = lwu.get_all_weights_file(d) + assert use_st is True and ordered is True and len(files) == 1 and "w" in wmap with tempfile.TemporaryDirectory() as d: - save_file({"a": np.zeros((2,), dtype=np.float32)}, os.path.join(d, "model-001.safetensors")) + save_file({"a": np.array([1.0], dtype=np.float32)}, os.path.join(d, "model-001.safetensors")) save_file({"b": np.ones((3,), dtype=np.float32)}, os.path.join(d, "model-002.safetensors")) index = {"weight_map": {"a": "model-001.safetensors", "b": "model-002.safetensors"}} with open(os.path.join(d, "model.safetensors.index.json"), "w") as f: json.dump(index, f) - files, wmap, use_st, _ = get_all_weights_file(d) - assert use_st is True - assert len(files) == 2 - assert "a" in wmap and "b" in wmap - - def test_pdparams_fallback(self): + files, wmap, use_st, _ = lwu.get_all_weights_file(d) + assert use_st is True and len(files) == 2 and "a" in wmap and "b" in wmap with tempfile.TemporaryDirectory() as d: paddle.save({"w": paddle.randn([2])}, os.path.join(d, "model.pdparams")) - files, wmap, use_st, ordered = get_all_weights_file(d) - assert use_st is False - assert ordered is False - assert len(files) == 1 + files, wmap, use_st, ordered = lwu.get_all_weights_file(d) + assert use_st is False and ordered is False and len(files) == 1 + + def test_get_model_path(self): + with tempfile.TemporaryDirectory() as d: + cfg = _make_fd_config() + cfg.model_config.model = d + assert lwu.get_model_path(cfg) == d + with tempfile.TemporaryDirectory() as d: + os.makedirs(os.path.join(d, "rank0")) + os.makedirs(os.path.join(d, "rank1")) + cfg = _make_fd_config() + cfg.model_config.model = d + cfg.parallel_config.tensor_parallel_size = 2 + cfg.parallel_config.tensor_parallel_rank = 1 + assert lwu.get_model_path(cfg) == os.path.join(d, "rank1") + assert cfg.load_config.is_pre_sharded is True + with tempfile.TemporaryDirectory() as d: + os.makedirs(os.path.join(d, "rank0")) + os.makedirs(os.path.join(d, "rank1")) + cfg = _make_fd_config() + cfg.model_config.model = d + cfg.parallel_config.tensor_parallel_size = 4 + with pytest.raises(ValueError, match="tp2"): + lwu.get_model_path(cfg) - # ── safetensors iterators ────────────────────────────────────────── - def test_safetensors_weights_iterator(self): +class TestWeightIterators: + def test_kv_cache_scale_iterator(self): with tempfile.TemporaryDirectory() as d: - path = os.path.join(d, "test.safetensors") - save_file({"a": np.array([1.0], dtype=np.float32)}, path) - results = dict(safetensors_weights_iterator([path])) - assert "a" in results - assert isinstance(results["a"], paddle.Tensor) + data = {"layer.0.k_scale": 0.5, "layer.0.v_scale": 0.25} + path = os.path.join(d, "scale.json") + with open(path, "w") as f: + json.dump(data, f) + results = dict(lwu.kv_cache_scale_iterator(path)) + assert len(results) == 2 + np.testing.assert_allclose(results["layer.0.k_scale"].numpy(), 0.5 * 448.0, rtol=1e-5) + np.testing.assert_allclose(results["layer.0.v_scale"].numpy(), 0.25 * 448.0, rtol=1e-5) + empty = os.path.join(d, "empty.json") + with open(empty, "w") as f2: + json.dump({}, f2) + assert list(lwu.kv_cache_scale_iterator(empty)) == [] - def test_safetensors_weights_iterator_ordered(self): + def test_weight_iterators(self): with tempfile.TemporaryDirectory() as d: p1 = os.path.join(d, "s1.safetensors") p2 = os.path.join(d, "s2.safetensors") save_file({"x": np.array([1.0], dtype=np.float32)}, p1) save_file({"y": np.array([2.0], dtype=np.float32)}, p2) - results = dict(safetensors_weights_iterator_ordered({"x": p1, "y": p2})) + results = dict(lwu.safetensors_weights_iterator([p1])) + assert "x" in results and isinstance(results["x"], paddle.Tensor) + results = dict(lwu.safetensors_weights_iterator_ordered({"x": p1, "y": p2})) assert len(results) == 2 np.testing.assert_allclose(results["y"].numpy(), [2.0], rtol=1e-6) - - def test_ordered_multi_keys_same_file(self): - with tempfile.TemporaryDirectory() as d: - path = os.path.join(d, "m.safetensors") - save_file({"a": np.array([1.0], dtype=np.float32), "b": np.array([2.0], dtype=np.float32)}, path) - results = dict(safetensors_weights_iterator_ordered({"a": path, "b": path})) - assert len(results) == 2 - - # ── pdparams_weight_iterator ─────────────────────────────────────── - - def test_pdparams_iterator(self): + combo = os.path.join(d, "m.safetensors") + save_file({"a": np.array([1.0], dtype=np.float32), "b": np.array([2.0], dtype=np.float32)}, combo) + assert len(dict(lwu.safetensors_weights_iterator_ordered({"a": combo, "b": combo}))) == 2 with tempfile.TemporaryDirectory() as d: p1 = os.path.join(d, "s1.pdparams") - p2 = os.path.join(d, "s2.pdparams") paddle.save({"a": paddle.to_tensor([1.0])}, p1) - paddle.save({"b": paddle.to_tensor([2.0])}, p2) - results = dict(pdparams_weight_iterator([p1, p2])) - assert len(results) == 2 - - # ── get_weight_iterator ──────────────────────────────────────────── - - def test_get_weight_iterator_safetensors(self): + paddle.save({"b": paddle.to_tensor([2.0])}, os.path.join(d, "s2.pdparams")) + assert len(dict(lwu.pdparams_weight_iterator([p1, os.path.join(d, "s2.pdparams")]))) == 2 with tempfile.TemporaryDirectory() as d: - save_file({"w": np.array([1.0, 2.0], dtype=np.float32)}, os.path.join(d, "model.safetensors")) - results = dict(get_weight_iterator(d)) - assert "w" in results - np.testing.assert_allclose(results["w"].numpy(), [1.0, 2.0], rtol=1e-6) + save_file({"x": np.array([1.0, 2.0], dtype=np.float32)}, os.path.join(d, "t.safetensors")) + assert "x" in dict(lwu.fast_weights_iterator([os.path.join(d, "t.safetensors")])) - def test_get_weight_iterator_with_kv_scale(self): + def test_get_weight_iterator(self): with tempfile.TemporaryDirectory() as d: - save_file({"w": np.zeros((1,), dtype=np.float32)}, os.path.join(d, "model.safetensors")) + save_file({"w": np.array([1.0, 2.0], dtype=np.float32)}, os.path.join(d, "model.safetensors")) with open(os.path.join(d, "kv_cache_scale.json"), "w") as f: json.dump({"k_scale": 0.1}, f) - results = dict(get_weight_iterator(d)) - assert "k_scale" in results + results = dict(lwu.get_weight_iterator(d)) + assert "w" in results + np.testing.assert_allclose(results["w"].numpy(), [1.0, 2.0], rtol=1e-6) np.testing.assert_allclose(results["k_scale"].numpy(), 0.1 * 448.0, rtol=1e-5) - - def test_get_weight_iterator_pdparams(self): with tempfile.TemporaryDirectory() as d: paddle.save({"p": paddle.to_tensor([3.0])}, os.path.join(d, "model.pdparams")) - results = dict(get_weight_iterator(d)) - assert "p" in results - - # ── get_model_path ───────────────────────────────────────────────── - - def test_model_path_no_rank_dirs(self): + assert "p" in dict(lwu.get_weight_iterator(d)) with tempfile.TemporaryDirectory() as d: - cfg = _make_fd_config() - cfg.model_config.model = d - assert get_model_path(cfg) == d - - def test_model_path_multi_rank_matching(self): - with tempfile.TemporaryDirectory() as d: - os.makedirs(os.path.join(d, "rank0")) - os.makedirs(os.path.join(d, "rank1")) - cfg = _make_fd_config() - cfg.model_config.model = d - cfg.parallel_config.tensor_parallel_size = 2 - cfg.parallel_config.tensor_parallel_rank = 1 - result = get_model_path(cfg) - assert result == os.path.join(d, "rank1") - assert cfg.load_config.is_pre_sharded is True - - def test_model_path_tp_mismatch_raises(self): - with tempfile.TemporaryDirectory() as d: - os.makedirs(os.path.join(d, "rank0")) - os.makedirs(os.path.join(d, "rank1")) - cfg = _make_fd_config() - cfg.model_config.model = d - cfg.parallel_config.tensor_parallel_size = 4 - try: - get_model_path(cfg) - assert False, "Should have raised ValueError" - except ValueError as e: - assert "tp2" in str(e) + path = os.path.join(d, "model-001.safetensors") + save_file( + {"z_last": np.array([1.0], dtype=np.float32), "a_first": np.array([2.0], dtype=np.float32)}, path + ) + index = {"weight_map": {"z_last": "model-001.safetensors", "a_first": "model-001.safetensors"}} + with open(os.path.join(d, "model.safetensors.index.json"), "w") as f: + json.dump(index, f) + results = dict(lwu.get_weight_iterator(d)) + assert "z_last" in results and "a_first" in results - # ── load_weights_from_cache ──────────────────────────────────────── - def test_load_weights_basic(self): +class TestCaching: + def test_load_weights_from_cache(self): linear = paddle.nn.Linear(4, 3) new_w = paddle.randn([4, 3]) - load_weights_from_cache(linear, iter([("weight", new_w)])) + lwu.load_weights_from_cache(linear, iter([("weight", new_w)])) np.testing.assert_allclose(linear.weight.numpy(), new_w.numpy(), rtol=1e-6) - - def test_load_weights_shape_mismatch(self): - linear = paddle.nn.Linear(4, 3) - try: - load_weights_from_cache(linear, iter([("weight", paddle.randn([5, 3]))])) - assert False, "Should have raised ValueError" - except ValueError as e: - assert "Shape mismatch" in str(e) - - def test_load_weights_missing_param_skipped(self): - linear = paddle.nn.Linear(4, 3) + with pytest.raises(ValueError, match="Shape mismatch"): + lwu.load_weights_from_cache(linear, iter([("weight", paddle.randn([5, 3]))])) old_w = linear.weight.numpy().copy() - load_weights_from_cache(linear, iter([("nonexistent", paddle.randn([2, 2]))])) + lwu.load_weights_from_cache(linear, iter([("nonexistent", paddle.randn([2, 2]))])) np.testing.assert_allclose(linear.weight.numpy(), old_w, rtol=1e-6) - # ── fast_weights_iterator ─────────────────────────────────────────── - - def test_fast_weights_iterator(self): - with tempfile.TemporaryDirectory() as d: - path = os.path.join(d, "test.safetensors") - save_file({"x": np.array([1.0, 2.0], dtype=np.float32)}, path) - results = dict(fast_weights_iterator([path])) - assert "x" in results - - # ── is_weight_cache_enabled ──────────────────────────────────────── - - def test_cache_disabled_when_env_off(self, monkeypatch): + def test_weight_cache_lifecycle(self, monkeypatch): monkeypatch.setenv("FD_ENABLE_MODEL_LOAD_CACHE", "0") - cfg = _make_fd_config() - enable, cache_dir, ctx = is_weight_cache_enabled(cfg) - assert enable is False - assert cache_dir is None - - def test_cache_disabled_no_quant(self, monkeypatch): + assert lwu.is_weight_cache_enabled(_make_fd_config())[0] is False monkeypatch.setenv("FD_ENABLE_MODEL_LOAD_CACHE", "1") cfg = _make_fd_config() cfg.quant_config = None - enable, _, _ = is_weight_cache_enabled(cfg) - assert enable is False - - def test_cache_computes_hash_dir(self, monkeypatch): - monkeypatch.setenv("FD_ENABLE_MODEL_LOAD_CACHE", "1") + assert lwu.is_weight_cache_enabled(cfg)[0] is False with tempfile.TemporaryDirectory() as d: cfg = _make_fd_config() cfg.model_config.model = d - enable, cache_dir, _ = is_weight_cache_enabled(cfg) - assert enable is False - assert cache_dir is not None - assert d in cache_dir - - def test_cache_enabled_when_dir_exists(self, monkeypatch): - monkeypatch.setenv("FD_ENABLE_MODEL_LOAD_CACHE", "1") - with tempfile.TemporaryDirectory() as d: - cfg = _make_fd_config() - cfg.model_config.model = d - _, cache_dir, _ = is_weight_cache_enabled(cfg) + enable, cache_dir, _ = lwu.is_weight_cache_enabled(cfg) + assert enable is False and cache_dir is not None and d in cache_dir os.makedirs(cache_dir, exist_ok=True) - enable, _, ctx = is_weight_cache_enabled(cfg) - assert enable is True - - # ── save_model decorator ───────────────────────────────────────── - - def test_save_model_no_cache(self, monkeypatch): - from fastdeploy.model_executor.load_weight_utils import save_model + assert lwu.is_weight_cache_enabled(cfg)[0] is True + def test_save_model_decorator(self, monkeypatch): monkeypatch.setenv("FD_ENABLE_MODEL_LOAD_CACHE", "0") - @save_model() + @lwu.save_model() def dummy_load(model, fd_config): return {"loaded": True} cfg = _make_fd_config() mock_model = SimpleNamespace(state_dict=lambda: {}) - result = dummy_load(mock_model, cfg) - assert result == {"loaded": True} - - def test_save_model_cache_on_not_bf16(self, monkeypatch): - from fastdeploy.model_executor.load_weight_utils import save_model - + assert dummy_load(mock_model, cfg) == {"loaded": True} monkeypatch.setenv("FD_ENABLE_MODEL_LOAD_CACHE", "1") + assert dummy_load(mock_model, cfg) == {"loaded": True} - @save_model() - def dummy_load(model, fd_config): - return {"ok": True} - - cfg = _make_fd_config() - mock_model = SimpleNamespace(state_dict=lambda: {}) - result = dummy_load(mock_model, cfg) - assert result == {"ok": True} - - # ── load_kv_cache_scale ──────────────────────────────────────────── +class TestCompositeLoading: def test_load_kv_cache_scale(self): with tempfile.TemporaryDirectory() as d: scales = { @@ -365,96 +234,106 @@ def test_load_kv_cache_scale(self): cfg = _make_fd_config() cfg.model_config.kv_cache_quant_scale_path = path state_dict = {} - load_kv_cache_scale(cfg, state_dict) + lwu.load_kv_cache_scale(cfg, state_dict) assert len(state_dict) == 4 np.testing.assert_allclose( state_dict["ernie.layers.0.self_attn.cachek_matmul.activation_scale"].numpy(), 0.5 * 448.0, rtol=1e-5, ) - - def test_load_kv_cache_scale_missing_file(self): cfg = _make_fd_config() - cfg.model_config.kv_cache_quant_scale_path = "/nonexistent/path.json" state_dict = {} - load_kv_cache_scale(cfg, state_dict) + lwu.load_kv_cache_scale(cfg, state_dict) assert len(state_dict) == 0 - # ── load_pre_sharded_checkpoint ──────────────────────────────────── - def test_load_pre_sharded(self): with tempfile.TemporaryDirectory() as d: rd = os.path.join(d, "rank0") os.makedirs(rd) save_file({"w": np.array([42.0], dtype=np.float32)}, os.path.join(rd, "model.safetensors")) - result = load_pre_sharded_checkpoint(d, 0) + result = lwu.load_pre_sharded_checkpoint(d, 0) assert "w" in result np.testing.assert_allclose(result["w"].numpy(), [42.0], rtol=1e-6) - # ── load_composite_checkpoint ────────────────────────────────────── - - def test_composite_tp_loading(self, monkeypatch): + def test_composite_checkpoint_tp(self, monkeypatch): + mock_cls = SimpleNamespace(_get_tensor_parallel_mappings=lambda _: {}) with tempfile.TemporaryDirectory() as d: save_file({"w": np.random.randn(4, 4).astype(np.float32)}, os.path.join(d, "model.safetensors")) cfg = _make_fd_config() cfg.model_config.model = d - cfg.parallel_config.use_ep = False cfg.quant_config.kv_cache_quant_type = "none" monkeypatch.setattr( "fastdeploy.model_executor.load_weight_utils.load_tp_checkpoint", - lambda *a, **kw: {"w": np.zeros((4, 4))}, + lambda *a, **kw: {"w": np.ones((4, 4))}, ) - mock_cls = SimpleNamespace(_get_tensor_parallel_mappings=lambda _: {}) - result = load_composite_checkpoint(d, mock_cls, cfg, return_numpy=True) - assert "w" in result - - def test_composite_empty_raises(self, monkeypatch): - cfg = _make_fd_config() - cfg.parallel_config.use_ep = False - cfg.quant_config.kv_cache_quant_type = "none" - monkeypatch.setattr( - "fastdeploy.model_executor.load_weight_utils.load_tp_checkpoint", - lambda *a, **kw: {}, - ) - mock_cls = SimpleNamespace(_get_tensor_parallel_mappings=lambda _: {}) + assert "w" in lwu.load_composite_checkpoint(d, mock_cls, cfg, return_numpy=True) with tempfile.TemporaryDirectory() as d: - try: - load_composite_checkpoint(d, mock_cls, cfg) - assert False, "Should have raised ValueError" - except ValueError as e: - assert "weight not found" in str(e) - - def test_composite_fp8_loads_scales(self, monkeypatch): + cfg = _make_fd_config() + cfg.quant_config.kv_cache_quant_type = "none" + monkeypatch.setattr( + "fastdeploy.model_executor.load_weight_utils.load_tp_checkpoint", + lambda *a, **kw: {}, + ) + with pytest.raises(ValueError, match="weight not found"): + lwu.load_composite_checkpoint(d, mock_cls, cfg) cfg = _make_fd_config() - cfg.parallel_config.use_ep = False cfg.quant_config.kv_cache_quant_type = "float8_e4m3fn" + scale_called = [] monkeypatch.setattr( "fastdeploy.model_executor.load_weight_utils.load_tp_checkpoint", - lambda *a, **kw: {"w": np.zeros((2,))}, + lambda *a, **kw: {"w": np.array([1.0, 2.0])}, ) - scale_called = [] monkeypatch.setattr( "fastdeploy.model_executor.load_weight_utils.load_kv_cache_scale", lambda cfg, sd: scale_called.append(True), ) - mock_cls = SimpleNamespace(_get_tensor_parallel_mappings=lambda _: {}) with tempfile.TemporaryDirectory() as d: - load_composite_checkpoint(d, mock_cls, cfg) + lwu.load_composite_checkpoint(d, mock_cls, cfg) assert len(scale_called) == 1 - def test_composite_pre_sharded(self, monkeypatch): + def test_composite_checkpoint_ep_and_presharded(self, monkeypatch): + mock_cls = SimpleNamespace(_get_tensor_parallel_mappings=lambda _: {}) + cfg = _make_fd_config() + cfg.parallel_config.use_ep = True + cfg.quant_config.kv_cache_quant_type = "none" + monkeypatch.setattr( + "fastdeploy.model_executor.load_weight_utils.load_ep_checkpoint", + lambda cls, path, fd_config, return_numpy=True: {"w": np.array([3.0, 4.0])}, + ) + with tempfile.TemporaryDirectory() as d: + assert "w" in lwu.load_composite_checkpoint(d, mock_cls, cfg) with tempfile.TemporaryDirectory() as d: os.makedirs(os.path.join(d, "rank0")) os.makedirs(os.path.join(d, "rank1")) cfg = _make_fd_config() - cfg.parallel_config.use_ep = False cfg.parallel_config.tensor_parallel_size = 2 cfg.parallel_config.tensor_parallel_rank = 0 cfg.quant_config.kv_cache_quant_type = "none" monkeypatch.setattr( "fastdeploy.model_executor.load_weight_utils.load_pre_sharded_checkpoint", - lambda path, rank: {"w": np.zeros((2,))}, + lambda path, rank: {"w": np.array([5.0, 6.0])}, ) + assert "w" in lwu.load_composite_checkpoint(d, mock_cls, cfg) + + def test_load_ep_checkpoint(self): + with tempfile.TemporaryDirectory() as d: + save_file({"w": np.array([1.0, 2.0], dtype=np.float32)}, os.path.join(d, "s1.safetensors")) + index = {"weight_map": {"w": "s1.safetensors"}} + with open(os.path.join(d, "model.safetensors.index.json"), "w") as f: + json.dump(index, f) + cfg = _make_fd_config() + cfg.parallel_config.num_experts_start_offset = 0 + cfg.parallel_config.num_experts_per_rank = 1 + cfg.model_config.moe_num_experts = 2 + cfg.model_config.moe_layer_start_index = 0 + cfg.model_config.num_hidden_layers = 1 + cfg.speculative_config = SimpleNamespace(model_type="main") + cfg.parallel_config.use_sequence_parallel_moe = False mock_cls = SimpleNamespace(_get_tensor_parallel_mappings=lambda _: {}) - result = load_composite_checkpoint(d, mock_cls, cfg) + result = lwu.load_ep_checkpoint(mock_cls, d, cfg, return_numpy=True) assert "w" in result + np.testing.assert_allclose(result["w"], [1.0, 2.0], rtol=1e-6) + + +if __name__ == "__main__": + pytest.main([__file__, "-v"]) From b9f96a0ac0a2185950e84efd7e7192a0a72db946 Mon Sep 17 00:00:00 2001 From: cloudforge1 Date: Tue, 10 Mar 2026 22:02:03 +0800 Subject: [PATCH 4/8] =?UTF-8?q?[CI]=E3=80=90Hackathon=2010th=20Spring=20No?= =?UTF-8?q?.32=E3=80=91align=20load=5Fweight=5Futils=20test=20with=20gold?= =?UTF-8?q?=20standard=20(tmp=5Fpath,=20split=20tests)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../model_executor/test_load_weight_utils.py | 363 ++++++------------ 1 file changed, 120 insertions(+), 243 deletions(-) diff --git a/tests/model_executor/test_load_weight_utils.py b/tests/model_executor/test_load_weight_utils.py index 103ecda18da..693337b70e9 100644 --- a/tests/model_executor/test_load_weight_utils.py +++ b/tests/model_executor/test_load_weight_utils.py @@ -14,7 +14,6 @@ import json import os -import tempfile from types import SimpleNamespace import numpy as np @@ -25,14 +24,13 @@ from fastdeploy.model_executor import load_weight_utils as lwu -def _make_fd_config(**overrides): - """Minimal FDConfig-like object.""" - cfg = SimpleNamespace( +def _cfg(**kw): + c = SimpleNamespace( model_config=SimpleNamespace( - model="/tmp/fake_model", + model="/tmp/m", model_type="ernie", max_model_len=2048, - kv_cache_quant_scale_path="/nonexistent/path.json", + kv_cache_quant_scale_path="/x.json", prefix_layer_name="layers", num_hidden_layers=2, pretrained_config=SimpleNamespace(use_sequence_parallel_moe=False), @@ -44,138 +42,87 @@ def _make_fd_config(**overrides): use_ep=False, use_sequence_parallel_moe=False, ), - quant_config=SimpleNamespace( - name=lambda: "none", - is_checkpoint_bf16=False, - kv_cache_quant_type="none", - ), + quant_config=SimpleNamespace(name=lambda: "none", is_checkpoint_bf16=False, kv_cache_quant_type="none"), load_config=SimpleNamespace(is_pre_sharded=False), ) - for k, v in overrides.items(): - setattr(cfg, k, v) - return cfg + for k, v in kw.items(): + setattr(c, k, v) + return c class TestFileDiscovery: - def test_natural_key_and_measure_time(self): - items = ["layer.10.weight", "layer.2.weight", "layer.1.weight"] - assert sorted(items, key=lwu.natural_key) == [ + def test_natural_key(self): + assert sorted(["layer.10.weight", "layer.2.weight", "layer.1.weight"], key=lwu.natural_key) == [ "layer.1.weight", "layer.2.weight", "layer.10.weight", ] - assert lwu.natural_key("abc") == ["abc"] - assert any(isinstance(x, int) for x in lwu.natural_key("shard-002-of-010.safetensors")) - @lwu.measure_time("Test") + def test_measure_time(self): + @lwu.measure_time("T") def dummy(): return 42 assert dummy() == 42 - def test_get_all_weights_file(self): - with tempfile.TemporaryDirectory() as d: - save_file({"w": np.array([1.0], dtype=np.float32)}, os.path.join(d, "model.safetensors")) - files, wmap, use_st, ordered = lwu.get_all_weights_file(d) - assert use_st is True and ordered is True and len(files) == 1 and "w" in wmap - with tempfile.TemporaryDirectory() as d: - save_file({"a": np.array([1.0], dtype=np.float32)}, os.path.join(d, "model-001.safetensors")) - save_file({"b": np.ones((3,), dtype=np.float32)}, os.path.join(d, "model-002.safetensors")) - index = {"weight_map": {"a": "model-001.safetensors", "b": "model-002.safetensors"}} - with open(os.path.join(d, "model.safetensors.index.json"), "w") as f: - json.dump(index, f) - files, wmap, use_st, _ = lwu.get_all_weights_file(d) - assert use_st is True and len(files) == 2 and "a" in wmap and "b" in wmap - with tempfile.TemporaryDirectory() as d: - paddle.save({"w": paddle.randn([2])}, os.path.join(d, "model.pdparams")) - files, wmap, use_st, ordered = lwu.get_all_weights_file(d) - assert use_st is False and ordered is False and len(files) == 1 - - def test_get_model_path(self): - with tempfile.TemporaryDirectory() as d: - cfg = _make_fd_config() - cfg.model_config.model = d - assert lwu.get_model_path(cfg) == d - with tempfile.TemporaryDirectory() as d: - os.makedirs(os.path.join(d, "rank0")) - os.makedirs(os.path.join(d, "rank1")) - cfg = _make_fd_config() - cfg.model_config.model = d - cfg.parallel_config.tensor_parallel_size = 2 - cfg.parallel_config.tensor_parallel_rank = 1 - assert lwu.get_model_path(cfg) == os.path.join(d, "rank1") - assert cfg.load_config.is_pre_sharded is True - with tempfile.TemporaryDirectory() as d: - os.makedirs(os.path.join(d, "rank0")) - os.makedirs(os.path.join(d, "rank1")) - cfg = _make_fd_config() - cfg.model_config.model = d - cfg.parallel_config.tensor_parallel_size = 4 - with pytest.raises(ValueError, match="tp2"): - lwu.get_model_path(cfg) + def test_get_all_weights_file(self, tmp_path): + save_file({"w": np.array([1.0], dtype=np.float32)}, str(tmp_path / "model.safetensors")) + files, wmap, use_st, ordered = lwu.get_all_weights_file(str(tmp_path)) + assert use_st and ordered and len(files) == 1 and "w" in wmap + d2 = tmp_path / "multi" + d2.mkdir() + save_file({"a": np.array([1.0], dtype=np.float32)}, str(d2 / "model-001.safetensors")) + save_file({"b": np.ones((3,), dtype=np.float32)}, str(d2 / "model-002.safetensors")) + index = {"weight_map": {"a": "model-001.safetensors", "b": "model-002.safetensors"}} + with open(str(d2 / "model.safetensors.index.json"), "w") as f: + json.dump(index, f) + files, wmap, use_st, _ = lwu.get_all_weights_file(str(d2)) + assert use_st and len(files) == 2 and "a" in wmap + d3 = tmp_path / "pdparams" + d3.mkdir() + paddle.save({"w": paddle.randn([2])}, str(d3 / "model.pdparams")) + files, _, use_st, ordered = lwu.get_all_weights_file(str(d3)) + assert not use_st and not ordered and len(files) == 1 + + def test_get_model_path(self, tmp_path): + cfg = _cfg() + cfg.model_config.model = str(tmp_path) + assert lwu.get_model_path(cfg) == str(tmp_path) + (tmp_path / "rank0").mkdir() + (tmp_path / "rank1").mkdir() + cfg.parallel_config.tensor_parallel_size = 2 + cfg.parallel_config.tensor_parallel_rank = 1 + assert lwu.get_model_path(cfg) == str(tmp_path / "rank1") class TestWeightIterators: - def test_kv_cache_scale_iterator(self): - with tempfile.TemporaryDirectory() as d: - data = {"layer.0.k_scale": 0.5, "layer.0.v_scale": 0.25} - path = os.path.join(d, "scale.json") - with open(path, "w") as f: - json.dump(data, f) - results = dict(lwu.kv_cache_scale_iterator(path)) - assert len(results) == 2 - np.testing.assert_allclose(results["layer.0.k_scale"].numpy(), 0.5 * 448.0, rtol=1e-5) - np.testing.assert_allclose(results["layer.0.v_scale"].numpy(), 0.25 * 448.0, rtol=1e-5) - empty = os.path.join(d, "empty.json") - with open(empty, "w") as f2: - json.dump({}, f2) - assert list(lwu.kv_cache_scale_iterator(empty)) == [] - - def test_weight_iterators(self): - with tempfile.TemporaryDirectory() as d: - p1 = os.path.join(d, "s1.safetensors") - p2 = os.path.join(d, "s2.safetensors") - save_file({"x": np.array([1.0], dtype=np.float32)}, p1) - save_file({"y": np.array([2.0], dtype=np.float32)}, p2) - results = dict(lwu.safetensors_weights_iterator([p1])) - assert "x" in results and isinstance(results["x"], paddle.Tensor) - results = dict(lwu.safetensors_weights_iterator_ordered({"x": p1, "y": p2})) - assert len(results) == 2 - np.testing.assert_allclose(results["y"].numpy(), [2.0], rtol=1e-6) - combo = os.path.join(d, "m.safetensors") - save_file({"a": np.array([1.0], dtype=np.float32), "b": np.array([2.0], dtype=np.float32)}, combo) - assert len(dict(lwu.safetensors_weights_iterator_ordered({"a": combo, "b": combo}))) == 2 - with tempfile.TemporaryDirectory() as d: - p1 = os.path.join(d, "s1.pdparams") - paddle.save({"a": paddle.to_tensor([1.0])}, p1) - paddle.save({"b": paddle.to_tensor([2.0])}, os.path.join(d, "s2.pdparams")) - assert len(dict(lwu.pdparams_weight_iterator([p1, os.path.join(d, "s2.pdparams")]))) == 2 - with tempfile.TemporaryDirectory() as d: - save_file({"x": np.array([1.0, 2.0], dtype=np.float32)}, os.path.join(d, "t.safetensors")) - assert "x" in dict(lwu.fast_weights_iterator([os.path.join(d, "t.safetensors")])) - - def test_get_weight_iterator(self): - with tempfile.TemporaryDirectory() as d: - save_file({"w": np.array([1.0, 2.0], dtype=np.float32)}, os.path.join(d, "model.safetensors")) - with open(os.path.join(d, "kv_cache_scale.json"), "w") as f: - json.dump({"k_scale": 0.1}, f) - results = dict(lwu.get_weight_iterator(d)) - assert "w" in results - np.testing.assert_allclose(results["w"].numpy(), [1.0, 2.0], rtol=1e-6) - np.testing.assert_allclose(results["k_scale"].numpy(), 0.1 * 448.0, rtol=1e-5) - with tempfile.TemporaryDirectory() as d: - paddle.save({"p": paddle.to_tensor([3.0])}, os.path.join(d, "model.pdparams")) - assert "p" in dict(lwu.get_weight_iterator(d)) - with tempfile.TemporaryDirectory() as d: - path = os.path.join(d, "model-001.safetensors") - save_file( - {"z_last": np.array([1.0], dtype=np.float32), "a_first": np.array([2.0], dtype=np.float32)}, path - ) - index = {"weight_map": {"z_last": "model-001.safetensors", "a_first": "model-001.safetensors"}} - with open(os.path.join(d, "model.safetensors.index.json"), "w") as f: - json.dump(index, f) - results = dict(lwu.get_weight_iterator(d)) - assert "z_last" in results and "a_first" in results + def test_kv_cache_scale_iterator(self, tmp_path): + data = {"layer.0.k_scale": 0.5, "layer.0.v_scale": 0.25} + path = str(tmp_path / "scale.json") + with open(path, "w") as f: + json.dump(data, f) + results = dict(lwu.kv_cache_scale_iterator(path)) + np.testing.assert_allclose(results["layer.0.k_scale"].numpy(), 0.5 * 448.0, rtol=1e-5) + + def test_weight_iterators(self, tmp_path): + p1 = str(tmp_path / "s1.safetensors") + p2 = str(tmp_path / "s2.safetensors") + save_file({"x": np.array([1.0], dtype=np.float32)}, p1) + save_file({"y": np.array([2.0], dtype=np.float32)}, p2) + assert "x" in dict(lwu.safetensors_weights_iterator([p1])) + results = dict(lwu.safetensors_weights_iterator_ordered({"x": p1, "y": p2})) + np.testing.assert_allclose(results["y"].numpy(), [2.0], rtol=1e-6) + d2 = tmp_path / "pd" + d2.mkdir() + paddle.save({"a": paddle.to_tensor([1.0])}, str(d2 / "s.pdparams")) + assert "a" in dict(lwu.pdparams_weight_iterator([str(d2 / "s.pdparams")])) + save_file({"f": np.array([1.0], dtype=np.float32)}, str(tmp_path / "fast.safetensors")) + assert "f" in dict(lwu.fast_weights_iterator([str(tmp_path / "fast.safetensors")])) + + def test_get_weight_iterator(self, tmp_path): + save_file({"w": np.array([1.0, 2.0], dtype=np.float32)}, str(tmp_path / "model.safetensors")) + results = dict(lwu.get_weight_iterator(str(tmp_path))) + np.testing.assert_allclose(results["w"].numpy(), [1.0, 2.0], rtol=1e-6) class TestCaching: @@ -186,24 +133,20 @@ def test_load_weights_from_cache(self): np.testing.assert_allclose(linear.weight.numpy(), new_w.numpy(), rtol=1e-6) with pytest.raises(ValueError, match="Shape mismatch"): lwu.load_weights_from_cache(linear, iter([("weight", paddle.randn([5, 3]))])) - old_w = linear.weight.numpy().copy() - lwu.load_weights_from_cache(linear, iter([("nonexistent", paddle.randn([2, 2]))])) - np.testing.assert_allclose(linear.weight.numpy(), old_w, rtol=1e-6) - def test_weight_cache_lifecycle(self, monkeypatch): + def test_weight_cache_lifecycle(self, tmp_path, monkeypatch): monkeypatch.setenv("FD_ENABLE_MODEL_LOAD_CACHE", "0") - assert lwu.is_weight_cache_enabled(_make_fd_config())[0] is False + assert lwu.is_weight_cache_enabled(_cfg())[0] is False monkeypatch.setenv("FD_ENABLE_MODEL_LOAD_CACHE", "1") - cfg = _make_fd_config() + cfg = _cfg() cfg.quant_config = None assert lwu.is_weight_cache_enabled(cfg)[0] is False - with tempfile.TemporaryDirectory() as d: - cfg = _make_fd_config() - cfg.model_config.model = d - enable, cache_dir, _ = lwu.is_weight_cache_enabled(cfg) - assert enable is False and cache_dir is not None and d in cache_dir - os.makedirs(cache_dir, exist_ok=True) - assert lwu.is_weight_cache_enabled(cfg)[0] is True + cfg = _cfg() + cfg.model_config.model = str(tmp_path) + enable, cache_dir, _ = lwu.is_weight_cache_enabled(cfg) + assert enable is False and cache_dir is not None + os.makedirs(cache_dir, exist_ok=True) + assert lwu.is_weight_cache_enabled(cfg)[0] is True def test_save_model_decorator(self, monkeypatch): monkeypatch.setenv("FD_ENABLE_MODEL_LOAD_CACHE", "0") @@ -212,7 +155,7 @@ def test_save_model_decorator(self, monkeypatch): def dummy_load(model, fd_config): return {"loaded": True} - cfg = _make_fd_config() + cfg = _cfg() mock_model = SimpleNamespace(state_dict=lambda: {}) assert dummy_load(mock_model, cfg) == {"loaded": True} monkeypatch.setenv("FD_ENABLE_MODEL_LOAD_CACHE", "1") @@ -220,120 +163,54 @@ def dummy_load(model, fd_config): class TestCompositeLoading: - def test_load_kv_cache_scale(self): - with tempfile.TemporaryDirectory() as d: - scales = { - "ernie.layers.0.self_attn.cachek_matmul.activation_scale": 0.5, - "ernie.layers.0.self_attn.cachev_matmul.activation_scale": 0.25, - "ernie.layers.1.self_attn.cachek_matmul.activation_scale": 0.75, - "ernie.layers.1.self_attn.cachev_matmul.activation_scale": 0.125, - } - path = os.path.join(d, "kv_cache_scale.json") - with open(path, "w") as f: - json.dump(scales, f) - cfg = _make_fd_config() - cfg.model_config.kv_cache_quant_scale_path = path - state_dict = {} - lwu.load_kv_cache_scale(cfg, state_dict) - assert len(state_dict) == 4 - np.testing.assert_allclose( - state_dict["ernie.layers.0.self_attn.cachek_matmul.activation_scale"].numpy(), - 0.5 * 448.0, - rtol=1e-5, - ) - cfg = _make_fd_config() + def test_load_kv_cache_scale(self, tmp_path): + scales = { + "ernie.layers.0.self_attn.cachek_matmul.activation_scale": 0.5, + "ernie.layers.0.self_attn.cachev_matmul.activation_scale": 0.25, + "ernie.layers.1.self_attn.cachek_matmul.activation_scale": 0.75, + "ernie.layers.1.self_attn.cachev_matmul.activation_scale": 0.125, + } + path = str(tmp_path / "kv_cache_scale.json") + with open(path, "w") as f: + json.dump(scales, f) + cfg = _cfg() + cfg.model_config.kv_cache_quant_scale_path = path state_dict = {} lwu.load_kv_cache_scale(cfg, state_dict) - assert len(state_dict) == 0 + np.testing.assert_allclose( + state_dict["ernie.layers.0.self_attn.cachek_matmul.activation_scale"].numpy(), 0.5 * 448.0, rtol=1e-5 + ) - def test_load_pre_sharded(self): - with tempfile.TemporaryDirectory() as d: - rd = os.path.join(d, "rank0") - os.makedirs(rd) - save_file({"w": np.array([42.0], dtype=np.float32)}, os.path.join(rd, "model.safetensors")) - result = lwu.load_pre_sharded_checkpoint(d, 0) - assert "w" in result - np.testing.assert_allclose(result["w"].numpy(), [42.0], rtol=1e-6) + def test_load_pre_sharded(self, tmp_path): + rd = tmp_path / "rank0" + rd.mkdir() + save_file({"w": np.array([42.0], dtype=np.float32)}, str(rd / "model.safetensors")) + result = lwu.load_pre_sharded_checkpoint(str(tmp_path), 0) + np.testing.assert_allclose(result["w"].numpy(), [42.0], rtol=1e-6) - def test_composite_checkpoint_tp(self, monkeypatch): + def test_composite_checkpoint_tp(self, tmp_path, monkeypatch): mock_cls = SimpleNamespace(_get_tensor_parallel_mappings=lambda _: {}) - with tempfile.TemporaryDirectory() as d: - save_file({"w": np.random.randn(4, 4).astype(np.float32)}, os.path.join(d, "model.safetensors")) - cfg = _make_fd_config() - cfg.model_config.model = d - cfg.quant_config.kv_cache_quant_type = "none" - monkeypatch.setattr( - "fastdeploy.model_executor.load_weight_utils.load_tp_checkpoint", - lambda *a, **kw: {"w": np.ones((4, 4))}, - ) - assert "w" in lwu.load_composite_checkpoint(d, mock_cls, cfg, return_numpy=True) - with tempfile.TemporaryDirectory() as d: - cfg = _make_fd_config() - cfg.quant_config.kv_cache_quant_type = "none" - monkeypatch.setattr( - "fastdeploy.model_executor.load_weight_utils.load_tp_checkpoint", - lambda *a, **kw: {}, - ) - with pytest.raises(ValueError, match="weight not found"): - lwu.load_composite_checkpoint(d, mock_cls, cfg) - cfg = _make_fd_config() - cfg.quant_config.kv_cache_quant_type = "float8_e4m3fn" - scale_called = [] - monkeypatch.setattr( - "fastdeploy.model_executor.load_weight_utils.load_tp_checkpoint", - lambda *a, **kw: {"w": np.array([1.0, 2.0])}, - ) + save_file({"w": np.random.randn(4, 4).astype(np.float32)}, str(tmp_path / "model.safetensors")) + cfg = _cfg() + cfg.model_config.model = str(tmp_path) monkeypatch.setattr( - "fastdeploy.model_executor.load_weight_utils.load_kv_cache_scale", - lambda cfg, sd: scale_called.append(True), + "fastdeploy.model_executor.load_weight_utils.load_tp_checkpoint", lambda *a, **kw: {"w": np.ones((4, 4))} ) - with tempfile.TemporaryDirectory() as d: - lwu.load_composite_checkpoint(d, mock_cls, cfg) - assert len(scale_called) == 1 - - def test_composite_checkpoint_ep_and_presharded(self, monkeypatch): + assert "w" in lwu.load_composite_checkpoint(str(tmp_path), mock_cls, cfg, return_numpy=True) + + def test_load_ep_checkpoint(self, tmp_path): + save_file({"w": np.array([1.0, 2.0], dtype=np.float32)}, str(tmp_path / "s1.safetensors")) + index = {"weight_map": {"w": "s1.safetensors"}} + with open(str(tmp_path / "model.safetensors.index.json"), "w") as f: + json.dump(index, f) + cfg = _cfg() + cfg.parallel_config.num_experts_start_offset = 0 + cfg.parallel_config.num_experts_per_rank = 1 + cfg.model_config.moe_num_experts = 2 + cfg.model_config.moe_layer_start_index = 0 + cfg.model_config.num_hidden_layers = 1 + cfg.speculative_config = SimpleNamespace(model_type="main") + cfg.parallel_config.use_sequence_parallel_moe = False mock_cls = SimpleNamespace(_get_tensor_parallel_mappings=lambda _: {}) - cfg = _make_fd_config() - cfg.parallel_config.use_ep = True - cfg.quant_config.kv_cache_quant_type = "none" - monkeypatch.setattr( - "fastdeploy.model_executor.load_weight_utils.load_ep_checkpoint", - lambda cls, path, fd_config, return_numpy=True: {"w": np.array([3.0, 4.0])}, - ) - with tempfile.TemporaryDirectory() as d: - assert "w" in lwu.load_composite_checkpoint(d, mock_cls, cfg) - with tempfile.TemporaryDirectory() as d: - os.makedirs(os.path.join(d, "rank0")) - os.makedirs(os.path.join(d, "rank1")) - cfg = _make_fd_config() - cfg.parallel_config.tensor_parallel_size = 2 - cfg.parallel_config.tensor_parallel_rank = 0 - cfg.quant_config.kv_cache_quant_type = "none" - monkeypatch.setattr( - "fastdeploy.model_executor.load_weight_utils.load_pre_sharded_checkpoint", - lambda path, rank: {"w": np.array([5.0, 6.0])}, - ) - assert "w" in lwu.load_composite_checkpoint(d, mock_cls, cfg) - - def test_load_ep_checkpoint(self): - with tempfile.TemporaryDirectory() as d: - save_file({"w": np.array([1.0, 2.0], dtype=np.float32)}, os.path.join(d, "s1.safetensors")) - index = {"weight_map": {"w": "s1.safetensors"}} - with open(os.path.join(d, "model.safetensors.index.json"), "w") as f: - json.dump(index, f) - cfg = _make_fd_config() - cfg.parallel_config.num_experts_start_offset = 0 - cfg.parallel_config.num_experts_per_rank = 1 - cfg.model_config.moe_num_experts = 2 - cfg.model_config.moe_layer_start_index = 0 - cfg.model_config.num_hidden_layers = 1 - cfg.speculative_config = SimpleNamespace(model_type="main") - cfg.parallel_config.use_sequence_parallel_moe = False - mock_cls = SimpleNamespace(_get_tensor_parallel_mappings=lambda _: {}) - result = lwu.load_ep_checkpoint(mock_cls, d, cfg, return_numpy=True) - assert "w" in result - np.testing.assert_allclose(result["w"], [1.0, 2.0], rtol=1e-6) - - -if __name__ == "__main__": - pytest.main([__file__, "-v"]) + result = lwu.load_ep_checkpoint(mock_cls, str(tmp_path), cfg, return_numpy=True) + np.testing.assert_allclose(result["w"], [1.0, 2.0], rtol=1e-6) From 51ccbf1b16fcb384e1ddd2c9428f6f05d4945d70 Mon Sep 17 00:00:00 2001 From: cloudforge1 Date: Fri, 13 Mar 2026 17:21:42 +0800 Subject: [PATCH 5/8] =?UTF-8?q?[CI]=E3=80=90Hackathon=2010th=20Spring=20No?= =?UTF-8?q?.32=E3=80=91add=20coverage=20tests=20for=20load=5Fweight=5Futil?= =?UTF-8?q?s?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Add test_is_layers_grouped: test layers_are_grouped() with grouped, interleaved, and no-layer keys - Add test_save_model_bf16_cache: exercise save_model decorator with is_checkpoint_bf16=True - Add test_composite_checkpoint_ep: test load_composite_checkpoint use_ep=True branch - Add test_composite_checkpoint_rank_mismatch: test tp_size != rank_dirs ValueError - Add test_composite_checkpoint_kv_quant: test float8_e4m3fn kv_cache path - Add __main__ block for direct execution - Branch coverage: 72% -> 80% --- .../model_executor/test_load_weight_utils.py | 67 +++++++++++++++++++ 1 file changed, 67 insertions(+) diff --git a/tests/model_executor/test_load_weight_utils.py b/tests/model_executor/test_load_weight_utils.py index 693337b70e9..c94fd803b87 100644 --- a/tests/model_executor/test_load_weight_utils.py +++ b/tests/model_executor/test_load_weight_utils.py @@ -58,6 +58,11 @@ def test_natural_key(self): "layer.10.weight", ] + def test_is_layers_grouped(self): + assert lwu.layers_are_grouped(["layers.0.w", "layers.0.b", "layers.1.w", "layers.1.b"]) is True + assert lwu.layers_are_grouped(["layers.0.w", "layers.1.w", "layers.0.b"]) is False + assert lwu.layers_are_grouped(["embed.weight"]) is True + def test_measure_time(self): @lwu.measure_time("T") def dummy(): @@ -161,6 +166,25 @@ def dummy_load(model, fd_config): monkeypatch.setenv("FD_ENABLE_MODEL_LOAD_CACHE", "1") assert dummy_load(mock_model, cfg) == {"loaded": True} + def test_save_model_bf16_cache(self, tmp_path, monkeypatch): + monkeypatch.setenv("FD_ENABLE_MODEL_LOAD_CACHE", "1") + cfg = _cfg() + cfg.model_config.model = str(tmp_path) + cfg.quant_config.is_checkpoint_bf16 = True + cfg.parallel_config.tensor_parallel_rank = 0 + + saved = {} + monkeypatch.setattr("paddle.save", lambda sd, p: saved.update({"path": p})) + + @lwu.save_model() + def dummy_load(model, fd_config): + return {"loaded": True} + + mock_model = SimpleNamespace(state_dict=lambda: {"w": 1}) + result = dummy_load(mock_model, cfg) + assert result == {"loaded": True} + assert "path" in saved + class TestCompositeLoading: def test_load_kv_cache_scale(self, tmp_path): @@ -214,3 +238,46 @@ def test_load_ep_checkpoint(self, tmp_path): mock_cls = SimpleNamespace(_get_tensor_parallel_mappings=lambda _: {}) result = lwu.load_ep_checkpoint(mock_cls, str(tmp_path), cfg, return_numpy=True) np.testing.assert_allclose(result["w"], [1.0, 2.0], rtol=1e-6) + + def test_composite_checkpoint_ep(self, tmp_path, monkeypatch): + save_file({"w": np.array([1.0], dtype=np.float32)}, str(tmp_path / "s1.safetensors")) + index = {"weight_map": {"w": "s1.safetensors"}} + with open(str(tmp_path / "model.safetensors.index.json"), "w") as f: + json.dump(index, f) + cfg = _cfg() + cfg.parallel_config.use_ep = True + cfg.parallel_config.num_experts_start_offset = 0 + cfg.parallel_config.num_experts_per_rank = 1 + cfg.model_config.moe_num_experts = 1 + cfg.model_config.moe_layer_start_index = 0 + cfg.speculative_config = SimpleNamespace(model_type="main") + mock_cls = SimpleNamespace(_get_tensor_parallel_mappings=lambda _: {}) + result = lwu.load_composite_checkpoint(str(tmp_path), mock_cls, cfg, return_numpy=True) + assert "w" in result + + def test_composite_checkpoint_rank_mismatch(self, tmp_path): + (tmp_path / "rank0").mkdir() + (tmp_path / "rank1").mkdir() + (tmp_path / "rank2").mkdir() + cfg = _cfg() + cfg.parallel_config.tensor_parallel_size = 2 # doesn't match 3 rank dirs + mock_cls = SimpleNamespace(_get_tensor_parallel_mappings=lambda _: {}) + with pytest.raises(ValueError, match="tp3"): + lwu.load_composite_checkpoint(str(tmp_path), mock_cls, cfg) + + def test_composite_checkpoint_kv_quant(self, tmp_path, monkeypatch): + save_file({"w": np.random.randn(4, 4).astype(np.float32)}, str(tmp_path / "model.safetensors")) + cfg = _cfg() + cfg.model_config.model = str(tmp_path) + cfg.quant_config.kv_cache_quant_type = "float8_e4m3fn" + cfg.model_config.kv_cache_quant_scale_path = str(tmp_path / "nonexistent.json") + monkeypatch.setattr( + "fastdeploy.model_executor.load_weight_utils.load_tp_checkpoint", lambda *a, **kw: {"w": np.ones((4, 4))} + ) + mock_cls = SimpleNamespace(_get_tensor_parallel_mappings=lambda _: {}) + result = lwu.load_composite_checkpoint(str(tmp_path), mock_cls, cfg, return_numpy=True) + assert "w" in result + + +if __name__ == "__main__": + pytest.main([__file__, "-v"]) From e50efa511529b080fad471e0e9ad5e563cddab86 Mon Sep 17 00:00:00 2001 From: cloudforge1 Date: Thu, 19 Mar 2026 20:50:04 +0800 Subject: [PATCH 6/8] =?UTF-8?q?[CI]=E3=80=90Hackathon=2010th=20Spring=20No?= =?UTF-8?q?.32=E3=80=91raise=20load=5Fweight=5Futils=20test=20delta?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../model_executor/test_load_weight_utils.py | 91 +++++++++++++++++++ 1 file changed, 91 insertions(+) diff --git a/tests/model_executor/test_load_weight_utils.py b/tests/model_executor/test_load_weight_utils.py index c94fd803b87..0392b60617c 100644 --- a/tests/model_executor/test_load_weight_utils.py +++ b/tests/model_executor/test_load_weight_utils.py @@ -98,6 +98,9 @@ def test_get_model_path(self, tmp_path): cfg.parallel_config.tensor_parallel_size = 2 cfg.parallel_config.tensor_parallel_rank = 1 assert lwu.get_model_path(cfg) == str(tmp_path / "rank1") + cfg.parallel_config.tensor_parallel_size = 1 + with pytest.raises(ValueError, match="tp2"): + lwu.get_model_path(cfg) class TestWeightIterators: @@ -129,6 +132,31 @@ def test_get_weight_iterator(self, tmp_path): results = dict(lwu.get_weight_iterator(str(tmp_path))) np.testing.assert_allclose(results["w"].numpy(), [1.0, 2.0], rtol=1e-6) + def test_get_weight_iterator_ordered_and_kv_scale(self, tmp_path): + save_file( + { + "layers.0.w": np.array([1.0], dtype=np.float32), + "layers.1.w": np.array([2.0], dtype=np.float32), + "layers.0.b": np.array([3.0], dtype=np.float32), + }, + str(tmp_path / "model-001.safetensors"), + ) + with open(str(tmp_path / "model.safetensors.index.json"), "w") as f: + json.dump( + { + "weight_map": { + "layers.0.w": "model-001.safetensors", + "layers.1.w": "model-001.safetensors", + "layers.0.b": "model-001.safetensors", + } + }, + f, + ) + with open(str(tmp_path / "kv_cache_scale.json"), "w") as f: + json.dump({"layer.0.k_scale": 0.5}, f) + results = dict(lwu.get_weight_iterator(str(tmp_path))) + assert "layers.0.w" in results and "layer.0.k_scale" in results + class TestCaching: def test_load_weights_from_cache(self): @@ -139,6 +167,36 @@ def test_load_weights_from_cache(self): with pytest.raises(ValueError, match="Shape mismatch"): lwu.load_weights_from_cache(linear, iter([("weight", paddle.randn([5, 3]))])) + # Unknown weights should be ignored without raising. + lwu.load_weights_from_cache(linear, iter([("not_exists", paddle.randn([1]))])) + + class _DummyKVLinear: + def __init__(self): + self.called = 0 + + def process_weights_after_loading(self): + self.called += 1 + + class _DummyParam: + def __init__(self): + self.shape = [2, 2] + + def copy_(self, *args, **kwargs): + return None + + dummy_kv = _DummyKVLinear() + monkey_model = SimpleNamespace( + named_parameters=lambda: [("w", _DummyParam())], + named_sublayers=lambda: [("kv", dummy_kv)], + ) + monkeypatch_kv = pytest.MonkeyPatch() + monkeypatch_kv.setattr(lwu, "KVBatchLinear", _DummyKVLinear) + try: + lwu.load_weights_from_cache(monkey_model, iter([("w", paddle.ones([2, 2]))])) + finally: + monkeypatch_kv.undo() + assert dummy_kv.called == 1 + def test_weight_cache_lifecycle(self, tmp_path, monkeypatch): monkeypatch.setenv("FD_ENABLE_MODEL_LOAD_CACHE", "0") assert lwu.is_weight_cache_enabled(_cfg())[0] is False @@ -185,6 +243,39 @@ def dummy_load(model, fd_config): assert result == {"loaded": True} assert "path" in saved + def test_save_model_cache_branches(self, tmp_path, monkeypatch): + cfg = _cfg() + cfg.model_config.model = str(tmp_path) + cfg.quant_config.is_checkpoint_bf16 = True + cfg.parallel_config.tensor_parallel_rank = 0 + monkeypatch.setattr(lwu.envs, "FD_ENABLE_MODEL_LOAD_CACHE", True) + + @lwu.save_model() + def dummy_load(model, fd_config): + return {"loaded": True} + + model = SimpleNamespace(state_dict=lambda: {"w": 1}) + + # Branch where cache is enabled but path is unavailable. + monkeypatch.setattr( + lwu, + "is_weight_cache_enabled", + lambda _cfg: (False, None, lwu.contextlib.nullcontext()), + ) + assert dummy_load(model, cfg) == {"loaded": True} + + # Branch where cache path is created and saved. + cache_root = tmp_path / "cache_root" + monkeypatch.setattr( + lwu, + "is_weight_cache_enabled", + lambda _cfg: (True, str(cache_root), lwu.contextlib.nullcontext()), + ) + saved = {} + monkeypatch.setattr("paddle.save", lambda sd, p: saved.update({"path": p})) + assert dummy_load(model, cfg) == {"loaded": True} + assert "path" in saved + class TestCompositeLoading: def test_load_kv_cache_scale(self, tmp_path): From b121b8ff8fa204dc841277b0b0bcdece84638934 Mon Sep 17 00:00:00 2001 From: cloudforge1 Date: Fri, 20 Mar 2026 00:35:34 +0800 Subject: [PATCH 7/8] =?UTF-8?q?[CI]=E3=80=90Hackathon=2010th=20Spring=20No?= =?UTF-8?q?.32=E3=80=91cover=20TP=20sequence-parallel=20MoE=20load=20branc?= =?UTF-8?q?hes?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../model_executor/test_load_weight_utils.py | 47 +++++++++++++++++++ 1 file changed, 47 insertions(+) diff --git a/tests/model_executor/test_load_weight_utils.py b/tests/model_executor/test_load_weight_utils.py index 0392b60617c..00d8e134e4f 100644 --- a/tests/model_executor/test_load_weight_utils.py +++ b/tests/model_executor/test_load_weight_utils.py @@ -330,6 +330,53 @@ def test_load_ep_checkpoint(self, tmp_path): result = lwu.load_ep_checkpoint(mock_cls, str(tmp_path), cfg, return_numpy=True) np.testing.assert_allclose(result["w"], [1.0, 2.0], rtol=1e-6) + def test_load_ep_checkpoint_tp_sequence_parallel(self, tmp_path): + expert_key = "ernie.mtp_block.0.mlp.experts.0.up_gate_proj.weight" + o_proj_key = "ernie.mtp_block.0.self_attn.o_proj.weight" + generic_key = "ernie.mtp_block.0.self_attn.q_proj.weight" + save_file( + { + expert_key: np.array([1.0, 2.0], dtype=np.float32), + o_proj_key: np.array([3.0, 4.0], dtype=np.float32), + generic_key: np.array([5.0, 6.0], dtype=np.float32), + }, + str(tmp_path / "s1.safetensors"), + ) + with open(str(tmp_path / "model.safetensors.index.json"), "w") as f: + json.dump( + { + "weight_map": { + expert_key: "s1.safetensors", + o_proj_key: "s1.safetensors", + generic_key: "s1.safetensors", + } + }, + f, + ) + + cfg = _cfg() + cfg.parallel_config.tensor_parallel_size = 2 + cfg.parallel_config.use_sequence_parallel_moe = True + cfg.parallel_config.num_experts_start_offset = 0 + cfg.parallel_config.num_experts_per_rank = 1 + cfg.model_config.moe_num_experts = [2] + cfg.model_config.moe_layer_start_index = 0 + cfg.model_config.num_hidden_layers = 1 + cfg.speculative_config = SimpleNamespace(model_type="mtp") + + tp_actions = { + expert_key: lambda w: w * 2, + o_proj_key: lambda w: w * 10, + generic_key: lambda w: w * 3, + } + mock_cls = SimpleNamespace(_get_tensor_parallel_mappings=lambda _: tp_actions) + result = lwu.load_ep_checkpoint(mock_cls, str(tmp_path), cfg, return_numpy=True) + + # Experts and o_proj are excluded from TP action under sequence-parallel MoE path. + np.testing.assert_allclose(result[expert_key], [1.0, 2.0], rtol=1e-6) + np.testing.assert_allclose(result[o_proj_key], [3.0, 4.0], rtol=1e-6) + np.testing.assert_allclose(result[generic_key], [15.0, 18.0], rtol=1e-6) + def test_composite_checkpoint_ep(self, tmp_path, monkeypatch): save_file({"w": np.array([1.0], dtype=np.float32)}, str(tmp_path / "s1.safetensors")) index = {"weight_map": {"w": "s1.safetensors"}} From 838240837a690c7f1fd0f93c572e47507d2cf147 Mon Sep 17 00:00:00 2001 From: cloudforge1 Date: Fri, 20 Mar 2026 04:38:53 +0800 Subject: [PATCH 8/8] test: add load_reordered_experts, pre-sharded, and empty-state tests MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Cover L237-247 (load_reordered_experts), L541 (pre-sharded checkpoint path), and L557 (empty state_dict ValueError). Delta: 139 → 151. --- .../model_executor/test_load_weight_utils.py | 49 +++++++++++++++++++ 1 file changed, 49 insertions(+) diff --git a/tests/model_executor/test_load_weight_utils.py b/tests/model_executor/test_load_weight_utils.py index 00d8e134e4f..465aa47b63e 100644 --- a/tests/model_executor/test_load_weight_utils.py +++ b/tests/model_executor/test_load_weight_utils.py @@ -416,6 +416,55 @@ def test_composite_checkpoint_kv_quant(self, tmp_path, monkeypatch): result = lwu.load_composite_checkpoint(str(tmp_path), mock_cls, cfg, return_numpy=True) assert "w" in result + def test_load_reordered_experts(self, tmp_path, monkeypatch): + index = {"weight_map": {"expert.0.w": "s1.safetensors"}} + with open(str(tmp_path / "model.safetensors.index.json"), "w") as f: + json.dump(index, f) + + class _FakeSafe: + def keys(self): + return ["expert.0.w"] + + def get_tensor(self, k): + return np.array([1.0, 2.0], dtype=np.float32) + + def __enter__(self): + return self + + def __exit__(self, *a): + pass + + sentinel = SimpleNamespace(_copy_to=lambda place, blocking: sentinel) + monkeypatch.setattr("safetensors.safe_open", lambda path, framework, device: _FakeSafe()) + monkeypatch.setattr(paddle, "Tensor", lambda w, zero_copy: sentinel) + monkeypatch.setattr(paddle.framework, "_current_expected_place", lambda: "cpu") + result = lwu.load_reordered_experts(str(tmp_path), "expert.0.w") + assert result is sentinel + + def test_composite_checkpoint_pre_sharded(self, tmp_path, monkeypatch): + (tmp_path / "rank0").mkdir() + (tmp_path / "rank1").mkdir() + cfg = _cfg() + cfg.parallel_config.tensor_parallel_size = 2 + cfg.parallel_config.tensor_parallel_rank = 0 + monkeypatch.setattr( + "fastdeploy.model_executor.load_weight_utils.load_pre_sharded_checkpoint", + lambda path, rank: {"w": np.ones(4)}, + ) + mock_cls = SimpleNamespace(_get_tensor_parallel_mappings=lambda _: {}) + result = lwu.load_composite_checkpoint(str(tmp_path), mock_cls, cfg, return_numpy=True) + assert "w" in result + + def test_composite_checkpoint_empty_state_dict(self, tmp_path, monkeypatch): + cfg = _cfg() + monkeypatch.setattr( + "fastdeploy.model_executor.load_weight_utils.load_tp_checkpoint", + lambda *a, **kw: {}, + ) + mock_cls = SimpleNamespace(_get_tensor_parallel_mappings=lambda _: {}) + with pytest.raises(ValueError, match="weight not found"): + lwu.load_composite_checkpoint(str(tmp_path), mock_cls, cfg) + if __name__ == "__main__": pytest.main([__file__, "-v"])